Flume1.6sink不兼容ES5

用flume从kafka消费消息,然后用ElasticsearchSink往ES写入数据。ES从1.7.1升级到5.4.1后,flume的代码不改启动后sink大量报错,所有数据都写入失败。JDK升到1.8,elasticsearch版本升到5.4.1,Log4j也加上,还是不行。


  
  
    
   
   
    
    org.apache.logging.log4j
   
   
    
   
   
    
    log4j-api
   
   
    
   
   
    
    2.8.2
   
   

  
  

  
  
    
   
   
    
    org.apache.logging.log4j
   
   
    
   
   
    
    log4j-core
   
   
    
   
   
    
    2.8.2
   
   

  
  

查了下flume最新版本1.7也不支持ES2.X(https://stackoverflow.com/questions/36614488/flume-1-6-compatibility-with-elasticsearch-2-3-1)。
GitHub上有个项目可以支持ES2.x(https://github.com/lucidfrontier45/ElasticsearchSink2)。
其实sink很简单,只是把kafka里拉出来的数据,简单解析后写入ES,为了解决兼容问题,考虑下面2个方案

  • 放弃使用flume-ng-elasticsearch-sink包,重写个ESSink
  • 修改flume-ng-elasticsearch-sink包源代码,自行升级1.6.1

方案1重写ESSink

flume的架构还是比较简单,ESSink的实现,修改configure(Context context)初始化方法,去掉多余的初始化参数;在start()方法里构造esClient连接, 批次完成后一次提交;最后在process()方法里直接用esClient提交。
旧代码:
esClient.addEvent(event, indexNameBuilder, logType, ttlMs);
...
client.execute();
新代码:bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event), logType).setSource(leidaSerializer.getContentBuilder(event)));
...
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
bulkRequestBuilder.request().requests().clear();

ESSink的start()方法修改代码

            // 设置集群名称
            Settings settings = Settings.builder().put("cluster.name", clusterName).build();
            // 创建client
            for (String esIpTcpport : serverAddresses) {
                String[] hostPort = esIpTcpport.trim().split(":");
                try {
                    if (null == esClient) {
                        esClient = new PreBuiltTransportClient(settings)
                            .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
                    } else {
                        esClient = esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
                    }
                } catch (UnknownHostException e) {
                    // e.printStackTrace();
                }
            }
			// 批量
            bulkRequestBuilder = esClient.prepareBulk();
ESSink的process()方法修改主要代码
public Status process() {
        Status status = Status.READY;
        Channel channel = getChannel();
        Transaction txn = channel.getTransaction();
        Event event = null;
        try {
            txn.begin();
            int count;
            for (count = 0; count < batchSize; ++count) {
                event = channel.take();
                if (event == null) {
                    break;
                }
                if (event.getBody().length == 0) {
                    continue;
                }
                //esClient.addEvent(event, indexNameBuilder, logType, ttlMs);
                bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event),
                        logType).setSource(leidaSerializer.getContentBuilder(event)));
            }

            if (count <= 0) {
                sinkCounter.incrementBatchEmptyCount();
                counterGroup.incrementAndGet("channel.underflow");
                status = Status.BACKOFF;
            } else {
                if (count < batchSize) {
                    sinkCounter.incrementBatchUnderflowCount();
                } else {
                    sinkCounter.incrementBatchCompleteCount();
                }

                sinkCounter.addToEventDrainAttemptCount(count);
                //esClient.execute();
                BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
                bulkRequestBuilder.request().requests().clear();
                if (bulkResponse.hasFailures()){
                    //System.out.println("failure");
                }
            }

            sinkCounter.addToEventDrainSuccessCount(count);
            counterGroup.incrementAndGet("transaction.success");
        } catch (Throwable ex) {
            ....
        } finally {
            ...
        }
        return status;
}
重新打包,运行正常,数据能正常写入ES

方案2修改flume-ng-elasticsearch-sink包源代码

下载源码导入maven工程,elasticsearch改成5.4.1的2个包,然后修改编译和兼容错误


  
  
    
   
   
    
    org.elasticsearch
   
   
    
   
   
    
    elasticsearch
   
   
    
   
   
    
    5.4.1
   
   

  
  

  
  
	
   
   
    
    org.elasticsearch.client
   
   
	
   
   
    
    transport
   
   
	
   
   
    
    5.4.1
   
   

  
  

修改 ElasticSearchTransportClient 类的configureHostnames()方法,InetSocketTransportAddress的构造方法签名改了。

      serverAddresses[i] = new InetSocketTransportAddress(host, port);
改成如下
      try {
        serverAddresses[i] = new InetSocketTransportAddress(InetAddress.getByName(host), port);
      } catch (UnknownHostException e) {
        e.printStackTrace();
      }
修改 ElasticSearchTransportClient 类的addEvent()方法,修改如下,另外去掉indexRequestBuilder.setTTL(ttlMs),ES5已不支持ttl:
       indexRequestBuilder = client
           .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
           .setSource(serializer.getContentBuilder(event).bytes());
改成如下
       indexRequestBuilder = client
           .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
           .setSource(serializer.getContentBuilder(event));// .bytes()
ElasticSearchTransportClient 类的openClient()方法也有编译错误,改成如下
    Settings settings = Settings.builder()
        .put("cluster.name", clusterName).build();

//    TransportClient transportClient = new TransportClient(settings);
//    for (InetSocketTransportAddress host : serverAddresses) {
//      transportClient.addTransportAddress(host);
//    }

    TransportClient transportClient = null;
    for (InetSocketTransportAddress host : serverAddresses) {
            if (null == transportClient) {
                transportClient = new PreBuiltTransportClient(settings).addTransportAddress(host);
            } else {
                transportClient = transportClient.addTransportAddress(host);
            }
    }
    if (client != null) {
      client.close();
    }
    client = transportClient;
ElasticSearchTransportClient 类里的openLocalDiscoveryClient()方法一般是测试用的,直接注释掉里的代码,只留一个空方法

ElasticSearchRestClient 类的addEvent()方法里修改一行

bulkBuilder.append(content.toBytesArray().toUtf8());
改后
bulkBuilder.append(content.utf8ToString());//.toBytesArray().toUtf8();

ElasticSearchLogStashEventSerializer 类的appendHeaders()方法有编译错误,改成如下

    //Map
  
  
   
    headers = Maps.newHashMap(event.getHeaders());
    Map
   
   
    
     headers = new HashMap<>(event.getHeaders());

   
   
  
  

ElasticSearchEventSerializer 类的getContentBuilder()方法返回值改成XContentBuilder。
我们用到了ElasticSearchEventSerializer这个类,ES5.4新客户端setSource()参数是XContentBuilder类型的,不改这个运行时报下面错误:

java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
        at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:431)
        at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:418)
        at org.elasticsearch.action.index.IndexRequestBuilder.setSource(IndexRequestBuilder.java:210)

ContentBuilderUtil 类的addComplexField()方法需要修改一行代码:

      XContentFactory.xContent(contentType).createParser(data);
改成如下
      XContentFactory.xContent(contentType).createParser(NamedXContentRegistry.EMPTY, data);//.createParser(data);
编译错误都没有了,重新打包发布。运行正常能往es5.4集群写入数据。其他可能还有要改的,我的flume2es没用到,暂时没有报错出来

个人觉得还是用第一种方式好,简单可控,不依赖flume-ng-elasticsearch-sink包

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐