一、linux下环境搭建

1  解压kafka压缩包

安装目录

/app/data/kafka

解压

tar -xzf kafka_2.11-0.9.0.0.tgz

2  配置kafka环境变量

root用户:#vi /etc/profile 或

cdh5用户:$vi~/.bashrc  

以上两个任选一种,增加以下环境变量:

export   KAFKA_HOME=/app/data/kafka/kafka_2.11-0.9.0.0        

export   PATH=$PATH:$KAFKA_HOME/bin

生效修改:#source /etc/profile  或 $source~/.bashrc

3  修改配置文件  

修改文件 config/server.properties

 

broker.id= 0    (其他机器依次递增,不可重复)

 

port=9092      (设置broker端口,默认9092)

 

Log.dirs = /home/hadoop/kafka-logs   (可改可不改,改的话chmod755)

 

zookeeper.connect=hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182          (使用自己的zookeeper时,需要此项配置)

4  拷贝kafka到所有机器

scp  -r  kafka_2.11-0.9.0.0  hadoop1:/app/data/kafka

scp  -r  kafka_2.11-0.9.0.0  hadoop2:/app/data/kafka

scp  -r  kafka_2.11-0.9.0.0  hadoop3:/app/data/kafka

 

5  启动kafka

首先在所有机器上启动zookeeper

   zkServer.sh start

所有机器上启动kafka(到kafka目录下执行)

   kafka-server-start.sh  -daemon  config/server.properties  &

6  停止kafka

   kafka-server-stop.sh  -daemon  config/server.properties  &

       zkServer.sh stop

7  测试

创建 一个叫“test”的topic  设定1个副本1个分区

bin/kafka-topics.sh --create --zookeeper localhost:42182 --replication-factor 1 --partitions 1 --topic test

 

查询 topic 列表

bin/kafka-topics.sh --list --zookeeper localhost:42182 Test

 

查看名叫“test”的topic 描述信息

bin/kafka-topics.sh --describe --zookeeper localhost:42182 --topic test

 

向名叫“test”的topic 发送测试消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

 

读取名叫“test”的topic 里的消息

bin/kafka-console-consumer.sh --zookeeper localhost:42182 --topic test --from-beginning

二、搭建开发环境

1  添加jar包

去linux上拿下经过update的kafka文件夹libs目录下的jar包,不要从直接从官网上下载的压缩包里找。

然后导入eclipse。

2  配置程序

首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:

package com.teamsun.kafka.m001;

public interface KafkaProperties {

final static StringzkConnect = "hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182";

final static StringgroupId = "group1";

final static Stringtopic = "test";

final static StringkafkaServerURL = "hadoop0,hadoop1,hadoop2,hadoop3";

final static int kafkaServerPort = 9092;

final static int kafkaProducerBufferSize = 64 * 1024;

final static int connectionTimeOut = 20000;

final static int reconnectInterval = 10000;

// final static String topic2 = "topic2";

// final static String topic3 = "topic3";

final static StringclientId = "SimpleConsumerDemoClient";

}

Producer

package com.teamsun.kafka.m001;

import java.util.Properties;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

public class KafkaProducerextends Thread {

private final kafka.javaapi.producer.Producer<Integer, String>producer;

private final Stringtopic;

private final Propertiesprops = new Properties();

 

public KafkaProducer(String topic) {

props.put("serializer.class","kafka.serializer.StringEncoder");

props.put("metadata.broker.list",

"hadoop0:9092,hadoop1:9092,hadoop2:9092,hadoop3:9092");

producer = new kafka.javaapi.producer.Producer<Integer, String>(

new ProducerConfig(props));

this.topic = topic;

}

 

@Override

public void run() {

int messageNo = 1;

while (true) {

String messageStr = new String("Message_" + messageNo);

System.out.println("Send:" + messageStr);

producer.send(new KeyedMessage<Integer, String>(topic, messageStr));

messageNo++;

try {

sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

Consumer

package com.teamsun.kafka.m001;

 

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.Properties;

import com.teamsun.kafka.m001.KafkaProperties;

import kafka.consumer.ConsumerConfig;

import kafka.consumer.ConsumerIterator;

import kafka.consumer.KafkaStream;

import kafka.javaapi.consumer.ConsumerConnector;

 

public class KafkaConsumerextends Thread {

private final ConsumerConnectorconsumer;

private final Stringtopic;

 

public KafkaConsumer(String topic) {

consumer = kafka.consumer.Consumer

.createJavaConsumerConnector(createConsumerConfig());

this.topic = topic;

}

 

private static ConsumerConfig createConsumerConfig() {

Properties props = new Properties();

props.put("zookeeper.connect", KafkaProperties.zkConnect);

props.put("group.id", KafkaProperties.groupId);

props.put("zookeeper.session.timeout.ms","40000");

props.put("zookeeper.sync.time.ms", "200");

props.put("auto.commit.interval.ms", "1000");

return new ConsumerConfig(props);

}

 

@Override

public void run() {

Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

topicCountMap.put(topic, new Integer(1));

Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =consumer

.createMessageStreams(topicCountMap);

KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);

ConsumerIterator<byte[],byte[]> it = stream.iterator();

while (it.hasNext()) {

System.out.println("receive:" +new String(it.next().message()));

try {

sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

}

简单的发送接收

package com.teamsun.kafka.m001;

 

public class KafkaConsumerProducerTest {

public static void main(String[] args) {

KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);

producerThread.start();

KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);

consumerThread.start();

}

}

注:需要在windows环境下配置hosts文件C:\Windows\System32\drivers\etc

192.168.5.128 hadoop0

192.168.5.129 hadoop1

192.168.5.130 hadoop2

192.168.5.131 hadoop3

 

 

 

---
更多文章关注公众号

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐