(一)安装与配置

安装列表

  1. kafka_2.10-0.8.1.1.tar
  2. jdk1.7.0_51.tar

1 安装jdk

如果jdk已安装,可以跳过;没有安装的可以参照:http://blog.csdn.net/ouyang111222/article/details/50344135

2 安装kafka

1.2.1 解压kafka

我的集群有3台机器,ip分别为ip1、ip2、ip3,将kafka_2.10-0.8.1.1.tar分别拷贝至每个机器的/apps/svr/目录下,进行解压

tar -xf kafka_2.10-0.8.1.1.tar

这里写图片描述

1.2.2 配置/config/server.properties

(1) broker.id

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=

broker.id为依次增长的:0、1、2、3、4,集群中唯一id;本文3台机器的对应关系设置如下:

ip1---->0
ip2---->1
ip3---->2

(2) port

# The port the socket server listens on
port=9092

默认设置为9092

(3) host.name

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=ip1

将host.name设置为所在机器的ip

(4) zookeeper.connect

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181

设置zookeeper servers的列表,由于上述集群采用kafka自带的zookeeper,设置为ip1:2181,ip2:2181,ip3:2181即可

(5) num.partitions

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=10

默认分区数为10

还有很多其他参数,这里就不一一介绍

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
log.dirs=/platform1/kafka-logs

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

1.2.3 配置/config/zookeeper.properties

添加3台zookeeper server

这里写图片描述

刚开始忘记了配置其他的了,就直接启动zk了

bin/zookeeper-server-start.sh config/zookeeper.propertie

结果就报错了:

这里写图片描述

想起来zookeeper.properties中有一个配置,需要创建一个/home/zookeeper目录

dataDir=/home/zookeeper

那就建立吧:

mkdir -p /home/zookeeper

继续运行,还报错:

这里写图片描述

myid没有配置对应的id,在zookeeper.properties文件中我刚刚设置server.1、server.2、server.3;
即ip1机器是server.1,那myid file就设置1;ip2机器是server.2,那myid file就设置2;ip3机器是server.3,那myid file就设置3。

ok了!

1.2.4 启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

ps:每台zookeeper都起来!

这里写图片描述

1.2.5 启动kafka

./kafka-server-start.sh ../config/server.properties &

这里写图片描述

(二)测试kafka

2.1 创建topic、显示topic、查看详细信息

创建topic

bin/kafka-topics.sh --zookeeper ip1:2181,ip2:2181,ip3:2181 --topic mytopic --replication-factor 1 --partitions 1 --create

显示topic

bin/kafka-topics.sh --zookeeper ip1:2181,ip2:2181,ip3:2181 --list

查看详细信息

bin/kafka-topics.sh --describe --zookeeper ip1:2181,ip2:2181,ip3:2181

2.2 单机联通 测试

生产数据

./kafka-console-producer.sh  --broker-list ip1:9092  --topic mytopic

运行之后打印如下日志:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

原因:缺少slf4j-nop-1.7.6.jar

解决方法:

下载slf4j-1.7.6.zip

wget http://www.slf4j.org/dist/slf4j-1.7.6.zip

解压

unzip slf4j-1.7.6.zip

把slf4j-nop-1.7.6.jar 包cp到kafka libs目录下面

再次运行:

这里写图片描述

接收消息:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning

2.3 分布式联通 测试

Producer都放在服务器server1上,ip地址为ip1
Consumer放在服务器server2上,ip地址为ip2

Server1上运行产生数据:

./kafka-console-producer.sh  --broker-list ip1:9092  --topic mytopic

Server2上运行接收消息:

./kafka-console-consumer.sh --zookeeper ip1:2181 --topic mytopic --from-beginning

2.4 Java生产消费kafka

生产代码:

public class kafkaProducer  extends Thread{

    private String topic;  

    public kafkaProducer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        Producer producer = createProducer();  
        int i=0;  
        while(true){  
            producer.send(new KeyedMessage<Integer, String>(topic, "message: " + i++));  
            try {  
                TimeUnit.SECONDS.sleep(1);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    private Producer createProducer() {  
        Properties properties = new Properties();  
        properties.put("serializer.class", StringEncoder.class.getName());  
        properties.put("metadata.broker.list", "ip1:9092,ip2:9092,ip3:9092");
        return new Producer<Integer, String>(new ProducerConfig(properties));  
     }  


    public static void main(String[] args) {  
        new kafkaProducer("mytopic").start(); 

    }  

}

消费代码:

public class kafkaConsumer extends Thread{

    private String topic;  

    public kafkaConsumer(String topic){  
        super();  
        this.topic = topic;  
    }  


    @Override  
    public void run() {  
        ConsumerConnector consumer = createConsumer();  
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
        topicCountMap.put(topic, 1); // 一次从主题中获取一个数据  
         Map<String, List<KafkaStream<byte[], byte[]>>>  messageStreams = consumer.createMessageStreams(topicCountMap);  
         KafkaStream<byte[], byte[]> stream = messageStreams.get(topic).get(0);// 获取每次接收到的这个数据  
         ConsumerIterator<byte[], byte[]> iterator =  stream.iterator();  
         while(iterator.hasNext()){  
             String message = new String(iterator.next().message());  
             System.out.println("接收到: " + message);  
         }  
    }  

    private ConsumerConnector createConsumer() {  
        Properties properties = new Properties();  
        properties.put("zookeeper.connect", "ip1:2181,ip2:2181,ip3:2181");//声明zk  
        properties.put("group.id", "group1");// 必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据  
        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));  
     }  


    public static void main(String[] args) {  
        new kafkaConsumer("mytopic").start();// 使用kafka集群中创建好的主题 mytopic  

    }  
}

(三)相关错误

错误1

发送数据时报如下的错误

kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.

解决方法:

在配置文件中有advertised.host.name或者advertised.listeners(因版本不同而已),该字段的值是生产者和消费者使用的。如果没有设置,则会取host.name的值,默认情况下,该值为localhost,所以这里要修改一下。

这里写图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐