Elasticsearch5.4集群(三)Flume1.6sink兼容
Flume1.6不兼容用flume从kafka消费消息,然后用ElasticsearchSink往ES写入数据。ES从1.7.1升级到5.4.1后,flume的代码不改启动后sink大量报错,所有数据都写入失败。JDK升到1.8,elasticsearch版本升到5.4.1,Log4j也加上,还是不行。org.apache.logging.log4jlog4j-api
Flume1.6sink不兼容ES5
用flume从kafka消费消息,然后用ElasticsearchSink往ES写入数据。ES从1.7.1升级到5.4.1后,flume的代码不改启动后sink大量报错,所有数据都写入失败。JDK升到1.8,elasticsearch版本升到5.4.1,Log4j也加上,还是不行。
org.apache.logging.log4j
log4j-api
2.8.2
org.apache.logging.log4j
log4j-core
2.8.2
查了下flume最新版本1.7也不支持ES2.X(https://stackoverflow.com/questions/36614488/flume-1-6-compatibility-with-elasticsearch-2-3-1)。
GitHub上有个项目可以支持ES2.x(https://github.com/lucidfrontier45/ElasticsearchSink2)。
其实sink很简单,只是把kafka里拉出来的数据,简单解析后写入ES,为了解决兼容问题,考虑下面2个方案
- 放弃使用flume-ng-elasticsearch-sink包,重写个ESSink
- 修改flume-ng-elasticsearch-sink包源代码,自行升级1.6.1
方案1重写ESSink
flume的架构还是比较简单,ESSink的实现,修改configure(Context context)初始化方法,去掉多余的初始化参数;在start()方法里构造esClient连接, 批次完成后一次提交;最后在process()方法里直接用esClient提交。
旧代码:
esClient.addEvent(event, indexNameBuilder, logType, ttlMs);
...
client.execute();
新代码:bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event), logType).setSource(leidaSerializer.getContentBuilder(event)));
...
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
bulkRequestBuilder.request().requests().clear();
ESSink的start()方法修改代码
// 设置集群名称
Settings settings = Settings.builder().put("cluster.name", clusterName).build();
// 创建client
for (String esIpTcpport : serverAddresses) {
String[] hostPort = esIpTcpport.trim().split(":");
try {
if (null == esClient) {
esClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
} else {
esClient = esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
}
} catch (UnknownHostException e) {
// e.printStackTrace();
}
}
// 批量
bulkRequestBuilder = esClient.prepareBulk();
ESSink的process()方法修改主要代码
public Status process() {
Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
Event event = null;
try {
txn.begin();
int count;
for (count = 0; count < batchSize; ++count) {
event = channel.take();
if (event == null) {
break;
}
if (event.getBody().length == 0) {
continue;
}
//esClient.addEvent(event, indexNameBuilder, logType, ttlMs);
bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event),
logType).setSource(leidaSerializer.getContentBuilder(event)));
}
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
counterGroup.incrementAndGet("channel.underflow");
status = Status.BACKOFF;
} else {
if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(count);
//esClient.execute();
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
bulkRequestBuilder.request().requests().clear();
if (bulkResponse.hasFailures()){
//System.out.println("failure");
}
}
sinkCounter.addToEventDrainSuccessCount(count);
counterGroup.incrementAndGet("transaction.success");
} catch (Throwable ex) {
....
} finally {
...
}
return status;
}
重新打包,运行正常,数据能正常写入ES
方案2修改flume-ng-elasticsearch-sink包源代码
下载源码导入maven工程,elasticsearch改成5.4.1的2个包,然后修改编译和兼容错误
org.elasticsearch
elasticsearch
5.4.1
org.elasticsearch.client
transport
5.4.1
修改 ElasticSearchTransportClient 类的configureHostnames()方法,InetSocketTransportAddress的构造方法签名改了。
serverAddresses[i] = new InetSocketTransportAddress(host, port);
改成如下
try {
serverAddresses[i] = new InetSocketTransportAddress(InetAddress.getByName(host), port);
} catch (UnknownHostException e) {
e.printStackTrace();
}
修改 ElasticSearchTransportClient 类的addEvent()方法,修改如下,另外去掉indexRequestBuilder.setTTL(ttlMs),ES5已不支持ttl:
indexRequestBuilder = client
.prepareIndex(indexNameBuilder.getIndexName(event), indexType)
.setSource(serializer.getContentBuilder(event).bytes());
改成如下
indexRequestBuilder = client
.prepareIndex(indexNameBuilder.getIndexName(event), indexType)
.setSource(serializer.getContentBuilder(event));// .bytes()
ElasticSearchTransportClient 类的openClient()方法也有编译错误,改成如下
Settings settings = Settings.builder()
.put("cluster.name", clusterName).build();
// TransportClient transportClient = new TransportClient(settings);
// for (InetSocketTransportAddress host : serverAddresses) {
// transportClient.addTransportAddress(host);
// }
TransportClient transportClient = null;
for (InetSocketTransportAddress host : serverAddresses) {
if (null == transportClient) {
transportClient = new PreBuiltTransportClient(settings).addTransportAddress(host);
} else {
transportClient = transportClient.addTransportAddress(host);
}
}
if (client != null) {
client.close();
}
client = transportClient;
ElasticSearchTransportClient 类里的openLocalDiscoveryClient()方法一般是测试用的,直接注释掉里的代码,只留一个空方法
ElasticSearchRestClient 类的addEvent()方法里修改一行
bulkBuilder.append(content.toBytesArray().toUtf8());
改后
bulkBuilder.append(content.utf8ToString());//.toBytesArray().toUtf8();
ElasticSearchLogStashEventSerializer 类的appendHeaders()方法有编译错误,改成如下
//Map
headers = Maps.newHashMap(event.getHeaders());
Map
headers = new HashMap<>(event.getHeaders());
ElasticSearchEventSerializer 类的getContentBuilder()方法返回值改成XContentBuilder。
我们用到了ElasticSearchEventSerializer这个类,ES5.4新客户端setSource()参数是XContentBuilder类型的,不改这个运行时报下面错误:
java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:431)
at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:418)
at org.elasticsearch.action.index.IndexRequestBuilder.setSource(IndexRequestBuilder.java:210)
ContentBuilderUtil 类的addComplexField()方法需要修改一行代码:
XContentFactory.xContent(contentType).createParser(data);
改成如下
XContentFactory.xContent(contentType).createParser(NamedXContentRegistry.EMPTY, data);//.createParser(data);
编译错误都没有了,重新打包发布。运行正常能往es5.4集群写入数据。其他可能还有要改的,我的flume2es没用到,暂时没有报错出来
个人觉得还是用第一种方式好,简单可控,不依赖flume-ng-elasticsearch-sink包
更多推荐
所有评论(0)