1、安装kafka
再整合之前首先包保证kafka是正常的。  所以先部署kafka,再写一个 kafka的生产者的代码,在命令行上 启动一个kafaka的消费者, 这样使用代码产生数据,在控制台消费,即可验证kafaka是否正常

版本kafka_2.10-0.8.1.1
启动:
bin/kafka-server-start.sh -daemon config/server.properties
查看所有主题
bin/kafka-topics.sh --zookeeper localhost:2181 --list
创建主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic
创建消费者:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning

2、整合flume和kafka
注意: 使用版本1.5.2 ,该版本没有直接sink到kafka,为把日志sink到kafka中,需要通过自定义sink来完成。定义kafkasink需要继承AbstractSink类实现
2.1 引入flume和kafka的依赖包
<!-- flume整合kafka -->
<dependency>
     <groupId>org.apache.flume</groupId>
     <artifactId>flume-ng-core</artifactId>
     <version>1.5.2</version>
</dependency>
<dependency>
     <groupId>org.apache.kafka</groupId>
     <artifactId>kafka_2.10</artifactId>
     <version>0.8.2.0</version>
     <exclusions>
               <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
               </exclusion>
     </exclusions>
</dependency>

2.2 开发自定义的sink
参考官网文档apache-flume-1.5.2-bin/docs/FlumeDeveloperGuide.html#sink,实现自定义kafakasink的代码如下,
注: 自定义的MySink的代码为:
package com.yun.flumeNG;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

/**
* 自定义KafkaSink,读取Flume的channel的数据,存储到Kafaka中
*
* @author shenfl
*
*/
public class MySink extends AbstractSink implements Configurable {

     private String myProp ;
     Producer<String, String> producer;
     /**
     * 在组件初始化的时候执行一次
     */
     public void configure(Context context){
          // init producer
          Properties props = new Properties();
          props.put("metadata.broker.list", "192.168.2.20:9092");// 此处配置的是kafka的端口
          props.put("serializer.class", "kafka.serializer.StringEncoder");// 配置value的序列化类
          ProducerConfig config = new ProducerConfig(props);
          this.producer = new Producer<String, String>(config);
          // Store myProp for later retrieval by process() method
         String myProp = context.getString("myProp", "mytopic");//指定要消费的专题,默认为mytopic
          this.setMyProp(myProp) ;
     }

     public Status process() throws EventDeliveryException {
          Status status = null;
          // Start transaction
          Channel ch = getChannel();
          Transaction txn = ch.getTransaction();
          txn.begin();
          try {
               // Channel operations you want to do
               Event event = ch.take();
               String eventBody = new String(event.getBody());
               KeyedMessage<String, String> message = new KeyedMessage<String, String>(this.getMyProp(),eventBody);
               // Send the Event to the external repository.这里的external为kafka
               producer.send(message);
               txn.commit();
               status = Status.READY;
          } catch (Throwable t) {
               txn.rollback();
               // Log exception, handle individual exceptions as needed
               status = Status.BACKOFF;
               // re-throw all Errors
               if (t instanceof Error) {
                    throw (Error) t;
               }
          } finally {
               txn.close();
          }
          return status;
     }
     public String getMyProp() {
          return myProp;
     }
     public void setMyProp(String myProp) {
          this.myProp = myProp;
     }
}
2.3 修改配置文件,使得flume-kafka-conf.properties 的sink为自定义的sink
$ vi flume-kafka-conf.properties
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

# 定义channel
agent1.channels.ch1.type = memory

# 定义source
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# 定义sink
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = com.yun.flumeNG.MySink

2.4 flume整合kafka,需要把四个包到flume/lib/
(1)自定义的 MySink,然后打成jar包拷贝到flume/lib目录下
编写完成代码后,下面就可以打包了,打包的时候需要使用到maven的打包插件,打包之后把生成的带依赖的jar包拷贝到flume的lib目录下即可,在pom文件中添加如下配置
<!-- 使用到maven的打包插件,完成打包依赖 -->
<build>
	<plugins>
		<plugin>
			<artifactId>maven-assembly-plugin</artifactId>
			<configuration>
				<descriptorRefs>
					<descriptorRef>jar-with-dependencies</descriptorRef>
				</descriptorRefs>
				<archive>
					<manifest>
						<mainClass></mainClass>
					</manifest>
				</archive>
			</configuration>
			<executions>
				<execution>
					<id>make-assembly</id>
					<phase>package</phase>
					<goals>
						<goal>single</goal>
					</goals>
				</execution>
			</executions>
		</plugin>
		<!-- compiler插件, 设定JDK版本 -->
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>2.3.2</version>
			<configuration>
				<encoding>UTF-8</encoding>
				<source>1.7</source>
				<target>1.7</target>
				<showWarnings>true</showWarnings>
			</configuration>
		</plugin>
	</plugins>
</build>
通过maven命令maven package -Dmaven.test.skip进行打包,如图所示:

D:\bigdata\jfyun>mvn package -Dmaven.test.skip

[INFO] net/ already added, skipping
[INFO] META-INF/MANIFEST.MF already added, skipping
[INFO] META-INF/ already added, skipping
[INFO] META-INF/MANIFEST.MF already added, skipping
[INFO] META-INF/maven/ already added, skipping
[INFO] META-INF/ already added, skipping
[INFO] META-INF/MANIFEST.MF already added, skipping
[INFO] org/ already added, skipping
[INFO] -----------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] -----------------------------------------------------------------------
[INFO] Total time: 02:45 min
[INFO] Finished at: 2015-09-29T20:16:32+08:00
[INFO] Final Memory: 66M/756M

打包成功后查看对应的目录:

(2) 拷贝kafka下的3个文件
$ cp kafka_2.10-0.8.2.0/libs/kafka_2.10-0.8.2.0.jar flume-1.5.2/lib/
$ cp kafka_2.10-0.8.2.0/libs/scala-library-2.10.4.jar flume-1.5.2/lib/
$ cp kafka_2.10-0.8.2.0/libs/metrics-core-2.2.0.jar flume-1.5.2/lib/
2.5 验证整合结果
(1) 启动kafka
启动:
bin/kafka-server-start.sh -daemon config/server.properties
查看kafka消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic  --from-beginning
(2) 启动flume
bin/flume-ng agent --conf conf --conf-file conf/flume-kafka-conf.properties --name agent1 -Dflume.root.logger=INFO,console
(3)执行模拟数据程序
public static void main(String[] args){
          while(true ){
             Log logger = LogFactory.getLog(LogProducer.class);
              try {
                  long s1   = System.currentTimeMillis();
                 Thread. sleep(1000);
                  long s2 = System.currentTimeMillis() - s1;
                  logger.info( "省公司鉴权接口:" +"http://bj.auth.com" +",响应时间:" +s2 +",当前时间:" +System.currentTimeMillis ());
             } catch (InterruptedException e ) {
                  e.printStackTrace();
             }
         }
    }
(4) 通过消费者查看信息bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
通过控制台我们发现,log4j、flume-ng、kafka整合成功。

注意: 把内容考虑到文本中,我们会发现没输出一行,都有一行控制。

那么产生空格的原因和解决方案是什么呢?

产生空格的原因: 在自定义Sink时候 默认的serializer会每写一行在行尾添加一个换行符,我们日志本身带有换行符,这样会导致每条日志后面多一个空行,修改配置不要自动添加换行符;
解决方案: agent1.sinks.log-sink1.serializer.appendNewline = false
修改完成后flume-kafka-conf.properties 配置文件为:
$ vi flume-kafka-conf.properties
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1

# 定义channel
agent1.channels.ch1.type = memory

# 定义source
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414

# 定义sink,appendNewline 默认为true,表示自定义的MySink每输出一行新增一个空白行
agent1.sinks.log-sink1.channel = ch1
agent1.sinks.log-sink1.type = com.yun.flumeNG.MySink
agent1.sinks.log-sink1.serializer.appendNewline = false
尝试了一下,不好用,还需要重试尝试解决。既然原因是MySink 读取flume中的channel数据,然后通过 serializer对数据序列化,就是序列化的时候加上了换行,顺着这个思路我们直接修改MySink代码,标红的部分的代码 ,再次运行结果,ok了。具体如下: 
public Status process() throws EventDeliveryException {
         Status status = null;

        // Start transaction
        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();
        try {
          Event event = ch.take();
          byte[] byte_message = event .getBody();
          //serializer 序列化数据的时候有个换行符,这里去掉换行符。避免 kafka读取 flume的channel数据出现空行
          String property = System.getProperty("line.separator"); //获取\r\n换行符
          KeyedMessage<String, String> message = new KeyedMessage<String, String>(this .myProp , new String(byte_message,"UTF-8" ).replaceAll(property, ""));
          producer.send( message);
          txn.commit();
          status = Status. READY;
        } catch (Throwable t ) {
               txn.rollback();
               status = Status. BACKOFF;
               if (t instanceof Error) {
                 throw (Error)t ;
               }
        } finally {
               txn.close();
        }
        return status ;
      }
再次查看结果为,ok了,终于解决了。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐