zookeeper+kafka安装以及kafka+spark streaming 的简单整合
一.zookeeper的安装下载zookeeperhttp://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/下载3.4.6版本安装配置(前面安装过hadoop以及spark,安装在原来的位置)将zookeeper-3.4.6解压到/home/wh/目录下,并改名为zookeepertar -zxvf zookeeper-3.4.6.t
一.zookeeper的安装
下载zookeeper
安装配置(前面安装过hadoop以及spark,安装在原来的位置)
将zookeeper-3.4.6解压到/home/wh/目录下,并改名为zookeeper
tar -zxvf zookeeper-3.4.6.tar.gz -C /home/wh/
mv zookeeper-3.4.6 zookeeper修改配置
cp /conf/zoo_sample.cfg /conf/zoo.cfg
vim /conf/zoo.cfg(修改配置如下):
tickTime=2000
initLimmit=10
syncLimit=5
dataDir=/home/wh/zookeeper/data
clientPort=2181
server.1=192.168.16.131:7000:7001
server.2=192.168.16.132:7000:7001
server.3=192.168.16.133:7000:7001
将zookeeper拷贝至其他两台机器上
tar -zcf ~/zookeeper.master.tar.gz ./zookeeper
scp ./zookeeper.master.tar.gz node1:/home/wh
tar -zxvf ~/zookeeper.tar.gz -C /home/wh/在每个zk server配置文件的dataDir所对应的目录下,必须创建一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同(这个非常重要)
master节点:vim zookeeper/data/myid 1
node1节点:vim zookeeper/data/myid 2
node2节点:vim zookeeper/data/myid 3
启动zookeeper(进入zookeeper目录,每个节点都要启动)
bin/zkServer.sh start
可通过jps查看启动进程
可以启动客户端测试一下:
bin/zkCli.sh -server 192.168.16.131:2181
成功后出现如下提示:
[zk: 192.168.16.131:2181(CONNECTED) 0]
二.安装kafka
下载kafka
安装配置
将kafka_2.10-0.10.0.0解压到/home/wh/目录下,并改名为kafka
tar -zxvf kafka_2.10-0.10.0.0.tgz -C /home/wh/
mv kafka_2.10-0.10.0.0 kafka修改配置
vim /kafka/config/server.properties
broker.id=131(node1节点:132 node2节点:133)
host.name=192.168.16.131(node1节点,node2节点)
zookeeper.connect=192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181
按上面的步骤,将kafka拷贝至node1,node2节点(注意修改IP地址)
验证安装
- 启动zookeeper(如果上面已经启动,则不用,每个节点都要启动)
bin/zkServer.sh start
启动kafka(每个节点都要启动)
bin/kafka-server-start.sh config/server.properties
创建topic
bin/kafka-topics.sh –create –zookeeper 192.168.16.131:2181 –replication-factor 3 –partitions 1–topic test
查看topic详细信息
bin/kafka-topics.sh –describe –zookeeper 192.168.16.131:2181
结果:Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 132 Replicas: 132,133,131 Isr: 132,133,131
Topic: test Partition: 1 Leader: 133 Replicas: 133,131,132 Isr: 132,133,131
Topic: test Partition: 2 Leader: 131 Replicas: 131,132,133 Isr: 132,133,131发送消息
bin/kafka-console-producer.sh –broker-list 192.168.16.132:9092 –topic test
回车后随便输入接受消息
bin/kafka-console-consumer.sh –zookeeper 192.168.16.133:2181 –topic test –from-beginning
会显示刚刚输入的东西
- 启动zookeeper(如果上面已经启动,则不用,每个节点都要启动)
三.kafka的java代码实现
注意:自己是在VMware中安装的三个虚拟机,实现kafka的java代码时是在windows7中运行的(当然也可以在虚拟机中运行,我是将其打包为jar包实现的)
- 生产者(Producer 当文件修改后不能加载后序增加内容):
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class kafkaProducer extends Thread{
private String topic;
public kafkaProducer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
Producer producer = createProducer();
//int i=0;
String i ="hello world";
while(true){
producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i));
System.out.println("正在输出"+i);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private Producer createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181");//声明zk
properties.put("serializer.class", StringEncoder.class.getName());
properties.put("metadata.broker.list", "192.168.16.131:9092,192.168.16.132:9093,192.168.16.133:9094");// 声明kafka broker
return new Producer<Integer, String>(new ProducerConfig(properties));
}
public static void main(String[] args) {
new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test
}
}
- 生产者(Producer 此处实现文件的连续读取,读取过程中文件内容在文件末尾增加后可以读取)
/**
* Created by wh on 2016/6/16.
*/
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.io.File;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
public class kafkaProducer extends Thread{
private String topic;
public kafkaProducer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
Producer producer = createProducer();
int LineNum = 0;
int i = 0;
while(true) {
try {
//读取日志文件
InputStreamReader file = new InputStreamReader(new FileInputStream(new File("/var/log/apt/history.log")));
BufferedReader reader = new BufferedReader(file);
//判断读取的文件是否有下一行
while (reader.readLine() != null) {
//if判断主要是使程序能够在文件内容增加的情况下能够连续读取文件的后序内容
if(i!=LineNum){
i++;
continue;
}
//发送topic消息,topic为kafka中定义的topic
producer.send(new KeyedMessage<Integer, String>(topic, "message:" + reader.readLine()));
System.out.println("正在输出" + reader.readLine());
//LineNum为已读取的文件行数
LineNum++;
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
file.close();
} catch (Exception e) {
e.printStackTrace();
}
i = 0;
}
}
private Producer createProducer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181");//声明zookeeper
properties.put("serializer.class", StringEncoder.class.getName());//此处应该是序列化,(有点茫)
properties.put("metadata.broker.list", "192.168.16.131:9092,192.168.16.132:9093,192.168.16.133:9094");// 声明kafka broker
return new Producer<Integer, String>(new ProducerConfig(properties));
}
public static void main(String[] args) {
new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test
}
}
- 消费者(Consumer):
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class kafkaConsumer extends Thread{
private String topic;
public kafkaConsumer(String topic){
super();
this.topic = topic;
}
@Override
public void run() {
ConsumerConnector consumer = createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1); // 一次从主题中获取一个数据
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
while(iterator.hasNext()){
String message = new String(iterator.next().message());
System.out.println("接收到: " + message);
}
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", "192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181");//声明zk
properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
public static void main(String[] args) {
new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test
}
}
所需要的jar包(要根据自己的版本而定 http://pan.baidu.com/s/1i45qlMX)
scala-library-2.10.6.jar
kafka_2.10-0.10.0.0.jar
kafka_2.10-0.10.0.0-javadoc.jar
kafka_2.10-0.10.0.0-scaladoc.jar
log4j-1.2.17.jar
kafka-clients-0.10.0.0.jar
slf4j-log4j12-1.7.21.jar
slf4j-api-1.7.21.jar
metrics-core-2.2.0.jar
zkclient-0.8.jar
zookeeper-3.4.6.jar
运行
- 需要先运行kafkaConsumer.java,再运行kafkaProducer.java
- kafkaProducer控制台有如下输出
正在输出hello world
正在输出hello world
正在输出hello world
..... - kafkaConsumer控制台有如下输出
接收到:hello world
接收到:hello world
接收到:hello world
.....
四.kafka+spark streaming的简单整合(实现简单的词频统计)
这里主要是用的kafka与spark streaming整合的第二种方法,第一种方法不知道为什么没有结果显示
消息生产者代码还是上面的代码
修改kafka中的一些配置
producer的配置:
vim /kafka/config/producer.properties
broker.list=192.168.16.131:9092,192.168.16.131:9093,192.168.16.131:9094
producer.type=asyncconsumer的配置:
vim /kafka/config/consumer.properties
zookeeper.connect=192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181
group.id=group1
运行spark自带的example
bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
brokers is a list of one or more Kafka brokers
topics is a list of one or more kafka topics to consume from
此处执行如下:
bin/run-example streaming.JavaDirectKafkaWordCount 192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181 test
运行效果如下:
-------------------------------------------
Time: 1465810482000 ms
-------------------------------------------
(hello,2)
(world,2)
16/06/13 17:34:42 INFO scheduler.JobScheduler: Finished job streaming job 1465810482000 ms.0 from job set of time 1465810482000 ms
16/06/13 17:34:42 INFO scheduler.JobScheduler: Total delay: 0.406 s for time 1465810482000 ms (execution: 0.358 s)
16/06/13 17:34:42 INFO rdd.ShuffledRDD: Removing RDD 679 from persistence list
16/06/13 17:34:42 INFO rdd.MapPartitionsRDD: Removing RDD 678 from persistence list
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_242_piece0 on localhost:43157 in memory (size: 2.6 KB, free: 517.3 MB)
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned accumulator 244
16/06/13 17:34:42 INFO rdd.MapPartitionsRDD: Removing RDD 677 from persistence list
16/06/13 17:34:42 INFO rdd.MapPartitionsRDD: Removing RDD 676 from persistence list
16/06/13 17:34:42 INFO kafka.KafkaRDD: Removing RDD 675 from persistence list
16/06/13 17:34:42 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/06/13 17:34:42 INFO scheduler.InputInfoTracker: remove old batch metadata: 1465810478000 ms
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 679
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 678
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 677
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 676
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 675
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_243_piece0 on localhost:43157 in memory (size: 1810.0 B, free: 517.3 MB)
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned shuffle 122
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned accumulator 245
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_244_piece0 on localhost:43157 in memory (size: 2.6 KB, free: 517.3 MB)
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned accumulator 246
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_245_piece0 on localhost:43157 in memory (size: 1810.0 B, free: 517.3 MB)
更多推荐
所有评论(0)