Kafka集群安装
一、linux下环境搭建1 解压kafka压缩包安装目录/app/data/kafka解压tar -xzf kafka_2.11-0.9.0.0.tgz2 配置kafka环境变量root用户:#vi /etc/profile或cdh5用户:$vi~/.bashrc以上两个任选一种,增加以下环境变量:export KAFKA_HOME=/ap
一、linux下环境搭建
1 解压kafka压缩包
安装目录
/app/data/kafka
解压
tar -xzf kafka_2.11-0.9.0.0.tgz
2 配置kafka环境变量
root用户:#vi /etc/profile 或
cdh5用户:$vi~/.bashrc
以上两个任选一种,增加以下环境变量:
export KAFKA_HOME=/app/data/kafka/kafka_2.11-0.9.0.0
export PATH=$PATH:$KAFKA_HOME/bin
生效修改:#source /etc/profile 或 $source~/.bashrc
3 修改配置文件
修改文件 config/server.properties
broker.id= 0 (其他机器依次递增,不可重复)
port=9092 (设置broker端口,默认9092)
Log.dirs = /home/hadoop/kafka-logs (可改可不改,改的话chmod755)
zookeeper.connect=hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182 (使用自己的zookeeper时,需要此项配置)
4 拷贝kafka到所有机器
scp -r kafka_2.11-0.9.0.0 hadoop1:/app/data/kafka
scp -r kafka_2.11-0.9.0.0 hadoop2:/app/data/kafka
scp -r kafka_2.11-0.9.0.0 hadoop3:/app/data/kafka
5 启动kafka
首先在所有机器上启动zookeeper
zkServer.sh start
在所有机器上启动kafka(到kafka目录下执行)
kafka-server-start.sh -daemon config/server.properties &
6 停止kafka
kafka-server-stop.sh -daemon config/server.properties &
zkServer.sh stop
7 测试
创建 一个叫“test”的topic 设定1个副本1个分区
bin/kafka-topics.sh --create --zookeeper localhost:42182 --replication-factor 1 --partitions 1 --topic test
查询 topic 列表
bin/kafka-topics.sh --list --zookeeper localhost:42182 Test
查看名叫“test”的topic 描述信息
bin/kafka-topics.sh --describe --zookeeper localhost:42182 --topic test
向名叫“test”的topic 发送测试消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
读取名叫“test”的topic 里的消息
bin/kafka-console-consumer.sh --zookeeper localhost:42182 --topic test --from-beginning
二、搭建开发环境
1 添加jar包
去linux上拿下经过update的kafka文件夹libs目录下的jar包,不要从直接从官网上下载的压缩包里找。
然后导入eclipse。
2 配置程序
首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:
package com.teamsun.kafka.m001;
public interface KafkaProperties {
final static StringzkConnect = "hadoop0:42182,hadoop1:42182,hadoop2:42182,hadoop3:42182";
final static StringgroupId = "group1";
final static Stringtopic = "test";
final static StringkafkaServerURL = "hadoop0,hadoop1,hadoop2,hadoop3";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 20000;
final static int reconnectInterval = 10000;
// final static String topic2 = "topic2";
// final static String topic3 = "topic3";
final static StringclientId = "SimpleConsumerDemoClient";
}
Producer
package com.teamsun.kafka.m001;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class KafkaProducerextends Thread {
private final kafka.javaapi.producer.Producer<Integer, String>producer;
private final Stringtopic;
private final Propertiesprops = new Properties();
public KafkaProducer(String topic) {
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("metadata.broker.list",
"hadoop0:9092,hadoop1:9092,hadoop2:9092,hadoop3:9092");
producer = new kafka.javaapi.producer.Producer<Integer, String>(
new ProducerConfig(props));
this.topic = topic;
}
@Override
public void run() {
int messageNo = 1;
while (true) {
String messageStr = new String("Message_" + messageNo);
System.out.println("Send:" + messageStr);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer
package com.teamsun.kafka.m001;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.teamsun.kafka.m001.KafkaProperties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumerextends Thread {
private final ConsumerConnectorconsumer;
private final Stringtopic;
public KafkaConsumer(String topic) {
consumer = kafka.consumer.Consumer
.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms","40000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
@Override
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[],byte[]>>> consumerMap =consumer
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[],byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive:" +new String(it.next().message()));
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
简单的发送接收
package com.teamsun.kafka.m001;
public class KafkaConsumerProducerTest {
public static void main(String[] args) {
KafkaProducer producerThread = new KafkaProducer(KafkaProperties.topic);
producerThread.start();
KafkaConsumer consumerThread = new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}
注:需要在windows环境下配置hosts文件C:\Windows\System32\drivers\etc
192.168.5.128 hadoop0
192.168.5.129 hadoop1
192.168.5.130 hadoop2
192.168.5.131 hadoop3
---
更多文章关注公众号
更多推荐
所有评论(0)