读取文件写入kafka
测试过程是将报文(json格式)写入kafka。为了提高测试效率,写个java程序,直接读取本地的json文件,再运行一下代码。1.json文件用sublime text3打开,格式化快捷键ctrl+command+j。修改哪个字段非常方便。2.java 函数,发送数据到kafka//从本地文件中读取数据发往kafkapublic static void pro...
·
测试过程是将报文(json格式)写入kafka。为了提高测试效率,写个java程序,直接读取本地的json文件,再运行一下代码。
1.json文件用sublime text3打开,格式化快捷键ctrl+command+j。修改哪个字段非常方便。
2.java 函数,发送数据到kafka
//从本地文件中读取数据发往kafka
public static void produceData() {
String topic = "client_stable_topic_local";
List list = null;
try {
//读取本地json文件
list = FileUtils.readLines(new File("/Users/yiqin/Desktop/client.json"));
} catch (IOException e) {
e.printStackTrace();
}
StringBuffer buffer = new StringBuffer();
for (Object line : list) {
buffer.append(line.toString().trim());//删除字符串的头尾空白符
}
JSONObject jsonObject = JSON.parseObject(buffer.toString());
//替换rid
String uuidStr = getUUID();
jsonObject.getJSONObject("data").put("rid",uuidStr);
//增加1个随机的字段
String str = getRandomString(5);
jsonObject.getJSONObject("data").getJSONObject("properties").put(str,str);
ProducerRecord<String, String> msg = new ProducerRecord<String,String>(topic,jsonObject.toJSONString());
procuder.send(msg);
procuder.flush();
}
/**
* 生成一个随机字符串
* @param stringLength
* @return
*/
public static String getRandomString(int stringLength){
String string = "abcdefghijklmnopqrstuvwxyz";
StringBuffer sb = new StringBuffer();
for (int i = 0; i < stringLength; i++) {
int index = (int) Math.floor(Math.random() * string.length());//向下取整0-25
sb.append(string.charAt(index));
}
return sb.toString();
}
public static String getUUID(){
String uuid = UUID.randomUUID().toString();
return uuid;
}
这里用的是com.alibaba.fastjson
更多推荐
已为社区贡献1条内容
所有评论(0)