转自:http://ee.riaos.com/?p=20006353 

     kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

    1) zk-0

    调整配置文件:

clientPort=2181
server.0=127.0.0.1:2888:3888
server.1=127.0.0.1:2889:3889
server.2=127.0.0.1:2890:3890
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

./zkServer.sh start

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

clientPort=2182
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

clientPort=2183
##只需要修改上述配置,其他配置保留默认值

    启动zookeeper

 

./zkServer.sh start

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

broker.id=0
port=9092
num.network.threads=2
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dir=./logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=536870912

log.cleanup.interval.mins=10
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zookeeper.connection.timeout.ms=1000000
kafka.metrics.polling.interval.secs=5
kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter
kafka.csv.metrics.dir=/tmp/kafka_metrics
kafka.csv.metrics.reporter.enabled=false

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

> cd kafka-0
> ./sbt update
> ./sbt package
> ./sbt assembly-package-dependency 

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

> JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

broker.id=1
port=9093
##其他配置和kafka-0保持一致

    然后和kafka-0一样执行打包命令,然后启动此broker.

> JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。

 

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.test</groupId>
    <artifactId>test-kafka</artifactId>
    <packaging>jar</packaging>

    <name>test-kafka</name>
    <url>http://maven.apache.org</url>
	<version>1.0.0</version>
    <dependencies>
    	<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.14</version>
		</dependency>
    	<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.8.0</artifactId>
			<version>0.8.0-beta1</version>
			<exclusions>
				<exclusion>
					<groupId>log4j</groupId>
					<artifactId>log4j</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.scala-lang</groupId>
			<artifactId>scala-library</artifactId>
			<version>2.8.1</version>
		</dependency>
		<dependency>
			<groupId>com.yammer.metrics</groupId>
			<artifactId>metrics-core</artifactId>
			<version>2.2.0</version>
		</dependency>
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.3</version>
		</dependency>
    </dependencies>
    <build>
    	<finalName>test-kafka-1.0</finalName>
        <resources>
            <resource>
                <directory>src/main/resources</directory>
                <filtering>true</filtering>
            </resource>
        </resources>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.5</source>
                    <target>1.5</target>
                    <encoding>gb2312</encoding>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-resources-plugin</artifactId>
                <version>2.2</version>
                <configuration>
                    <encoding>gbk</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

#partitioner.class=
metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093
##,127.0.0.1:9093
producer.type=sync
compression.codec=0
serializer.class=kafka.serializer.StringEncoder
##在producer.type=async时有效
#batch.num.messages=100

    2) LogProducer.java代码样例

package com.test.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class LogProducer {

	private Producer<String,String> inner;
	public LogProducer() throws Exception{
		Properties properties = new Properties();
		properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
		ProducerConfig config = new ProducerConfig(properties);
		inner = new Producer<String, String>(config);
	}

	
	public void send(String topicName,String message) {
		if(topicName == null || message == null){
			return;
		}
		KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);
		inner.send(km);
	}
	
	public void send(String topicName,Collection<String> messages) {
		if(topicName == null || messages == null){
			return;
		}
		if(messages.isEmpty()){
			return;
		}
		List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();
		for(String entry : messages){
			KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);
			kms.add(km);
		}
		inner.send(kms);
	}
	
	public void close(){
		inner.close();
	}
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		LogProducer producer = null;
		try{
			producer = new LogProducer();
			int i=0;
			while(true){
				producer.send("test-topic", "this is a sample" + i);
				i++;
				Thread.sleep(2000);
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			if(producer != null){
				producer.close();
			}
		}

	}

}

 

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
##,127.0.0.1:2182,127.0.0.1:2183
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000

    2) LogConsumer.java代码样例

package com.test.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class LogConsumer {

	private ConsumerConfig config;
	private String topic;
	private int partitionsNum;
	private MessageExecutor executor;
	private ConsumerConnector connector;
	private ExecutorService threadPool;
	public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{
		Properties properties = new Properties();
		properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
		config = new ConsumerConfig(properties);
		this.topic = topic;
		this.partitionsNum = partitionsNum;
		this.executor = executor;
	}
	
	public void start() throws Exception{
		connector = Consumer.createJavaConsumerConnector(config);
		Map<String,Integer> topics = new HashMap<String,Integer>();
		topics.put(topic, partitionsNum);
		Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);
		List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);
		threadPool = Executors.newFixedThreadPool(partitionsNum);
		for(KafkaStream<byte[], byte[]> partition : partitions){
			threadPool.execute(new MessageRunner(partition));
		} 
	}

    	
	public void close(){
		try{
			threadPool.shutdownNow();
		}catch(Exception e){
			//
		}finally{
			connector.shutdown();
		}
		
	}
	
	class MessageRunner implements Runnable{
		private KafkaStream<byte[], byte[]> partition;
		
		MessageRunner(KafkaStream<byte[], byte[]> partition) {
			this.partition = partition;
		}
		
		public void run(){
			ConsumerIterator<byte[], byte[]> it = partition.iterator();
			while(it.hasNext()){
				MessageAndMetadata<byte[],byte[]> item = it.next();
				System.out.println("partiton:" + item.partition());
				System.out.println("offset:" + item.offset());
				executor.execute(new String(item.message()));//UTF-8
			}
		}
	}
	
	interface MessageExecutor {
		
		public void execute(String message);
	}
	
	/**
	 * @param args
	 */
	public static void main(String[] args) {
		LogConsumer consumer = null;
		try{
			MessageExecutor executor = new MessageExecutor() {
				
				public void execute(String message) {
					System.out.println(message);
					
				}
			};
			consumer = new LogConsumer("test-topic", 2, executor);
			consumer.start();
		}catch(Exception e){
			e.printStackTrace();
		}finally{
//			if(consumer != null){
//				consumer.close();
//			}
		}

	}

}

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐