一.zookeeper的安装

  • 下载zookeeper

  • 安装配置(前面安装过hadoop以及spark,安装在原来的位置)

    • 将zookeeper-3.4.6解压到/home/wh/目录下,并改名为zookeeper

      tar -zxvf zookeeper-3.4.6.tar.gz -C /home/wh/ 
      mv zookeeper-3.4.6 zookeeper

    • 修改配置

      cp /conf/zoo_sample.cfg   /conf/zoo.cfg  
      vim /conf/zoo.cfg(修改配置如下):
      tickTime=2000 
      initLimmit=10
      syncLimit=5
      dataDir=/home/wh/zookeeper/data
      clientPort=2181
      server.1=192.168.16.131:7000:7001
      server.2=192.168.16.132:7000:7001
      server.3=192.168.16.133:7000:7001

  • 将zookeeper拷贝至其他两台机器上

    tar -zcf ~/zookeeper.master.tar.gz ./zookeeper 
    scp ./zookeeper.master.tar.gz node1:/home/wh
    tar -zxvf ~/zookeeper.tar.gz -C /home/wh/

  • 在每个zk server配置文件的dataDir所对应的目录下,必须创建一个名为myid的文件,其中的内容必须与zoo.cfg中server.x 中的x相同(这个非常重要

    master节点:vim zookeeper/data/myid         1 
    node1节点:vim zookeeper/data/myid 2
    node2节点:vim zookeeper/data/myid 3

  • 启动zookeeper(进入zookeeper目录,每个节点都要启动)

    bin/zkServer.sh start 
    可通过jps查看启动进程
    可以启动客户端测试一下:
    bin/zkCli.sh -server 192.168.16.131:2181
    成功后出现如下提示:
    [zk: 192.168.16.131:2181(CONNECTED) 0]


二.安装kafka

  • 下载kafka

  • 安装配置

    • 将kafka_2.10-0.10.0.0解压到/home/wh/目录下,并改名为kafka

      tar -zxvf kafka_2.10-0.10.0.0.tgz -C /home/wh/ 
      mv kafka_2.10-0.10.0.0 kafka

    • 修改配置

      vim  /kafka/config/server.properties  
      broker.id=131(node1节点:132 node2节点:133)
      host.name=192.168.16.131(node1节点,node2节点)
      zookeeper.connect=192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181

    • 按上面的步骤,将kafka拷贝至node1,node2节点(注意修改IP地址)

  • 验证安装

    • 启动zookeeper(如果上面已经启动,则不用,每个节点都要启动)
      bin/zkServer.sh start
    • 启动kafka(每个节点都要启动)

      bin/kafka-server-start.sh config/server.properties

    • 创建topic

      bin/kafka-topics.sh –create –zookeeper 192.168.16.131:2181 –replication-factor 3 –partitions 1–topic test

    • 查看topic详细信息

      bin/kafka-topics.sh –describe –zookeeper 192.168.16.131:2181 
      结果:Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
      Topic: test Partition: 0 Leader: 132 Replicas: 132,133,131 Isr: 132,133,131
      Topic: test Partition: 1 Leader: 133 Replicas: 133,131,132 Isr: 132,133,131
      Topic: test Partition: 2 Leader: 131 Replicas: 131,132,133 Isr: 132,133,131

    • 发送消息

      bin/kafka-console-producer.sh –broker-list 192.168.16.132:9092 –topic test 
      回车后随便输入

    • 接受消息

      bin/kafka-console-consumer.sh –zookeeper 192.168.16.133:2181 –topic test –from-beginning 
      会显示刚刚输入的东西


三.kafka的java代码实现
注意:自己是在VMware中安装的三个虚拟机,实现kafka的java代码时是在windows7中运行的(当然也可以在虚拟机中运行,我是将其打包为jar包实现的)

  • 生产者(Producer 当文件修改后不能加载后序增加内容):
import java.util.Properties;  
import java.util.concurrent.TimeUnit;  

import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
import kafka.serializer.StringEncoder;  

public class kafkaProducer extends Thread{  

    private String topic;  

    public kafkaProducer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        Producer producer = createProducer();  
        //int i=0;  
        String i ="hello world";
        while(true){  
            producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i));  
            System.out.println("正在输出"+i);
            try {  
                TimeUnit.SECONDS.sleep(1);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    private Producer createProducer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181");//声明zk  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "192.168.16.131:9092,192.168.16.132:9093,192.168.16.133:9094");// 声明kafka broker  
        return new Producer<Integer, String>(new ProducerConfig(properties));  
     }  


    public static void main(String[] args) {  
        new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test   

    }  

}  
  • 生产者(Producer 此处实现文件的连续读取,读取过程中文件内容在文件末尾增加后可以读取)
/**
 * Created by wh on 2016/6/16.
 */
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.io.File;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class kafkaProducer extends Thread{

    private String topic;

    public kafkaProducer(String topic){
        super();
        this.topic = topic;
    }


    @Override
    public void run() {
        Producer producer = createProducer();
        int LineNum = 0;
        int i = 0;
        while(true) {
            try {
                //读取日志文件
                InputStreamReader file = new InputStreamReader(new FileInputStream(new File("/var/log/apt/history.log")));
                BufferedReader reader = new BufferedReader(file);
                //判断读取的文件是否有下一行
                while (reader.readLine() != null) {
                    //if判断主要是使程序能够在文件内容增加的情况下能够连续读取文件的后序内容
                    if(i!=LineNum){
                        i++;
                        continue;
                    }
                    //发送topic消息,topic为kafka中定义的topic
                    producer.send(new KeyedMessage<Integer, String>(topic, "message:" + reader.readLine()));
                    System.out.println("正在输出" + reader.readLine());
                    //LineNum为已读取的文件行数
                    LineNum++;
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                file.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            i = 0;
        }
    }

    private Producer createProducer() {
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181");//声明zookeeper
        properties.put("serializer.class", StringEncoder.class.getName());//此处应该是序列化,(有点茫)
        properties.put("metadata.broker.list", "192.168.16.131:9092,192.168.16.132:9093,192.168.16.133:9094");// 声明kafka broker
        return new Producer<Integer, String>(new ProducerConfig(properties));
    }


    public static void main(String[] args) {
        new kafkaProducer("test").start();// 使用kafka集群中创建好的主题 test
    }
}

  • 消费者(Consumer):
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
import java.util.Properties;  

import kafka.consumer.Consumer;  
import kafka.consumer.ConsumerConfig;  
import kafka.consumer.ConsumerIterator;  
import kafka.consumer.KafkaStream;  
import kafka.javaapi.consumer.ConsumerConnector;  

public class kafkaConsumer extends Thread{  

    private String topic;  

    public kafkaConsumer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        ConsumerConnector consumer = createConsumer();  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, 1); // 一次从主题中获取一个数据  
         Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
         KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据  
         ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
         while(iterator.hasNext()){  
             String message = new String(iterator.next().message());  
             System.out.println("接收到: " + message);  
         }  
    }  

    private ConsumerConnector createConsumer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181");//声明zk  
        properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据  
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
     }  


    public static void main(String[] args) {  
        new kafkaConsumer("test").start();// 使用kafka集群中创建好的主题 test   

    }  

}  
  • 所需要的jar包(要根据自己的版本而定 http://pan.baidu.com/s/1i45qlMX

    scala-library-2.10.6.jar 
    kafka_2.10-0.10.0.0.jar
    kafka_2.10-0.10.0.0-javadoc.jar
    kafka_2.10-0.10.0.0-scaladoc.jar
    log4j-1.2.17.jar
    kafka-clients-0.10.0.0.jar
    slf4j-log4j12-1.7.21.jar
    slf4j-api-1.7.21.jar
    metrics-core-2.2.0.jar
    zkclient-0.8.jar
    zookeeper-3.4.6.jar

  • 运行

    • 需要先运行kafkaConsumer.java,再运行kafkaProducer.java
    • kafkaProducer控制台有如下输出
      正在输出hello world 
      正在输出hello world
      正在输出hello world
      .....
    • kafkaConsumer控制台有如下输出
      接收到:hello world 
      接收到:hello world
      接收到:hello world
      .....

四.kafka+spark streaming的简单整合(实现简单的词频统计)
      这里主要是用的kafka与spark streaming整合的第二种方法,第一种方法不知道为什么没有结果显示

  • 消息生产者代码还是上面的代码

  • 修改kafka中的一些配置

    • producer的配置:

      vim /kafka/config/producer.properties 
      broker.list=192.168.16.131:9092,192.168.16.131:9093,192.168.16.131:9094
      producer.type=async

    • consumer的配置:

      vim /kafka/config/consumer.properties 
      zookeeper.connect=192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181
      group.id=group1

  • 运行spark自带的example

    bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 
    brokers is a list of one or more Kafka brokers
    topics is a list of one or more kafka topics to consume from
    此处执行如下:
    bin/run-example streaming.JavaDirectKafkaWordCount 192.168.16.131:2181,192.168.16.132:2181,192.168.16.133:2181 test

  • 运行效果如下:

-------------------------------------------
Time: 1465810482000 ms
-------------------------------------------
(hello,2)
(world,2)
16/06/13 17:34:42 INFO scheduler.JobScheduler: Finished job streaming job 1465810482000 ms.0 from job set of time 1465810482000 ms
16/06/13 17:34:42 INFO scheduler.JobScheduler: Total delay: 0.406 s for time 1465810482000 ms (execution: 0.358 s)
16/06/13 17:34:42 INFO rdd.ShuffledRDD: Removing RDD 679 from persistence list
16/06/13 17:34:42 INFO rdd.MapPartitionsRDD: Removing RDD 678 from persistence list
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_242_piece0 on localhost:43157 in memory (size: 2.6 KB, free: 517.3 MB)
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned accumulator 244
16/06/13 17:34:42 INFO rdd.MapPartitionsRDD: Removing RDD 677 from persistence list
16/06/13 17:34:42 INFO rdd.MapPartitionsRDD: Removing RDD 676 from persistence list
16/06/13 17:34:42 INFO kafka.KafkaRDD: Removing RDD 675 from persistence list
16/06/13 17:34:42 INFO scheduler.ReceivedBlockTracker: Deleting batches ArrayBuffer()
16/06/13 17:34:42 INFO scheduler.InputInfoTracker: remove old batch metadata: 1465810478000 ms
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 679
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 678
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 677
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 676
16/06/13 17:34:42 INFO storage.BlockManager: Removing RDD 675
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_243_piece0 on localhost:43157 in memory (size: 1810.0 B, free: 517.3 MB)
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned shuffle 122
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned accumulator 245
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_244_piece0 on localhost:43157 in memory (size: 2.6 KB, free: 517.3 MB)
16/06/13 17:34:42 INFO spark.ContextCleaner: Cleaned accumulator 246
16/06/13 17:34:42 INFO storage.BlockManagerInfo: Removed broadcast_245_piece0 on localhost:43157 in memory (size: 1810.0 B, free: 517.3 MB)
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐