1、Create a topic

localhost:bin jack$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".
localhost:bin jack$ kafka-topics.sh --list --zookeeper localhost:2181
test


2、Send some messages

localhost:bin jack$ kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2016-06-22 14:35:14,700] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
This is a message
This is another message


3、Start a consumer

localhost:bin jack$ kafka-console-consumer.sh  --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

如果要最新的数据,可以不带 --from-beginning 参数即可。

4、Setting up a multi-broker cluster(单节点多个Broker

将kafka_2.10-0.8.2.2文件夹再复制两份分别为kafka_2,kafka_3

scp -r kafka_2.10-0.8.2.2 kafka_2 
scp -r kafka_2.10-0.8.2.2 kafka_3

分别修改kafka_2/config/server.properties以及kafka_3/config/server.properties 文件中的broker.id,以及port属性,确保唯一性

kafka_2.10-0.8.2.2/config/server.properties  
broker.id=1 
port=9092 
kafka_2/config/server.properties  
broker.id=2  
port=9093  
kafka_3/config/server.properties  
broker.id=3  
port=9094  

因为多个Broker使用同一个目录,会报错,所以同时修改下log.dirs

log.dirs=/tmp/kafka2-logs
log.dirs=/tmp/kafka3-logs
kafka.common.KafkaException: Failed to acquire lock on file .lock in /tmp/kafka-logs. A Kafka instance in another process or thread is using this directory.
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:98)
	at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:95)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at kafka.log.LogManager.lockLogDirs(LogManager.scala:95)
	at kafka.log.LogManager.<init>(LogManager.scala:57)
	at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:335)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:85)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:29)
	at kafka.Kafka$.main(Kafka.scala:46)
	at kafka.Kafka.main(Kafka.scala)

启动3个Broker

kafka-server-start.sh ../config/server.properties &


创建一个replication factor为3的topic,并查看Topic的状态

localhost:bin jack$ kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Created topic "my-replicated-topic".
localhost:bin jack$ kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3

从上面的内容可以看出,该topic包含1个part,replicationfactor为3,且Node1 是leador

解释如下:

  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader. 

Let's publish a few messages to our new topic:

kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic

my test message 1

my test message 2

^C


Now let's consume these messages:

kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

...

my test message 1

my test message 2

^C


参考:

http://kafka.apache.org/documentation.html#quickstart_send

http://www.centoscn.com/CentosServer/cluster/2015/0312/4863.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐