最近在研究kafka先从简单的入手 下面是官网资料

  官网:http://kafka.apache.org/

  官方文档:http://kafka.apache.org/documentation.html#quickstart

第一步 官网下载  kafka_2.11-0.9.0.1.tgz

              解压:

# tar xzf kafka_2.9.2-0.8.1.1.tgz
# cd kafka_2.9.2-0.8.1.1

第二步    配置环境变量

               

  # vi /etc/profile
   export KAFKA_HOME=/home/grid/kafka_2.11-0.9.0.1
  export PATH=$PATH:/home/grid/kafka_2.11-0.9.0.1/bin


第三步   配置文件server.properties 主要参数

               

 broker.id=69
 host.name=192.168.170.69
 advertised.host.name=192.168.170.69
 num.partitions=2
 log.dirs=/home/temp/zookeeper/kafka-logs
 zookeeper.connect=192.168.170.69:2181  


zookeeper.connect这里zookeeper 是单机的 当然自己可以搭建集群 zookeeper.connect=192.168.170.69:2181  后加逗号分隔

第四步   启动服务 之前确保zookeeper 服务是启动的

                 ./bin/kafka-server-start.sh config/server.properties &

启动后的日志

grid@h1 kafka_2.11-0.9.0.1]$ [2016-05-17 16:44:54,067] INFO KafkaConfig values: 
request.timeout.ms = 30000
log.roll.hours = 168
inter.broker.protocol.version = 0.9.0.X
log.preallocate = false
security.inter.broker.protocol = PLAINTEXT
controller.socket.timeout.ms = 30000
broker.id.generation.enable = true
ssl.keymanager.algorithm = SunX509
ssl.key.password = null
log.cleaner.enable = true
ssl.provider = null
num.recovery.threads.per.data.dir = 1
background.threads = 10
unclean.leader.election.enable = true
sasl.kerberos.kinit.cmd = /usr/bin/kinit
replica.lag.time.max.ms = 10000
ssl.endpoint.identification.algorithm = null
auto.create.topics.enable = true
zookeeper.sync.time.ms = 2000
ssl.client.auth = none
ssl.keystore.password = null
log.cleaner.io.buffer.load.factor = 0.9
offsets.topic.compression.codec = 0
log.retention.hours = 168
log.dirs = /home/temp/zookeeper/kafka-logs
ssl.protocol = TLS
log.index.size.max.bytes = 10485760
sasl.kerberos.min.time.before.relogin = 60000
log.retention.minutes = null
connections.max.idle.ms = 600000
ssl.trustmanager.algorithm = PKIX
offsets.retention.minutes = 1440
max.connections.per.ip = 2147483647
replica.fetch.wait.max.ms = 500
metrics.num.samples = 2
port = 9092
offsets.retention.check.interval.ms = 600000
log.cleaner.dedupe.buffer.size = 134217728
log.segment.bytes = 1073741824
group.min.session.timeout.ms = 6000
producer.purgatory.purge.interval.requests = 1000
min.insync.replicas = 1
ssl.truststore.password = null
log.flush.scheduler.interval.ms = 9223372036854775807
socket.receive.buffer.bytes = 102400
leader.imbalance.per.broker.percentage = 10
num.io.threads = 8
zookeeper.connect = 192.168.170.69:2181
queued.max.requests = 500
offsets.topic.replication.factor = 3
replica.socket.timeout.ms = 30000
offsets.topic.segment.bytes = 104857600
replica.high.watermark.checkpoint.interval.ms = 5000
broker.id = 69
ssl.keystore.location = null
listeners = PLAINTEXT://:9092
log.flush.interval.messages = 9223372036854775807
principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
log.retention.ms = null
offsets.commit.required.acks = -1
sasl.kerberos.principal.to.local.rules = [DEFAULT]
group.max.session.timeout.ms = 30000
num.replica.fetchers = 1
advertised.listeners = null
replica.socket.receive.buffer.bytes = 65536
delete.topic.enable = false
log.index.interval.bytes = 4096
metric.reporters = []
compression.type = producer
log.cleanup.policy = delete
controlled.shutdown.max.retries = 3
log.cleaner.threads = 1
quota.window.size.seconds = 1
zookeeper.connection.timeout.ms = 6000
offsets.load.buffer.size = 5242880
zookeeper.session.timeout.ms = 6000
ssl.cipher.suites = null
authorizer.class.name = 
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.service.name = null
controlled.shutdown.enable = true
offsets.topic.num.partitions = 50
quota.window.num = 11
message.max.bytes = 1000012
log.cleaner.backoff.ms = 15000
log.roll.jitter.hours = 0
log.retention.check.interval.ms = 300000
replica.fetch.max.bytes = 1048576
log.cleaner.delete.retention.ms = 86400000
fetch.purgatory.purge.interval.requests = 1000
log.cleaner.min.cleanable.ratio = 0.5
offsets.commit.timeout.ms = 5000
zookeeper.set.acl = false
log.retention.bytes = -1
offset.metadata.max.bytes = 4096
leader.imbalance.check.interval.seconds = 300
quota.consumer.default = 9223372036854775807
log.roll.jitter.ms = null
reserved.broker.max.id = 1000
replica.fetch.backoff.ms = 1000
advertised.host.name = 192.168.170.69
quota.producer.default = 9223372036854775807
log.cleaner.io.buffer.size = 524288
controlled.shutdown.retry.backoff.ms = 5000
log.dir = /tmp/kafka-logs
log.flush.offset.checkpoint.interval.ms = 60000
log.segment.delete.delay.ms = 60000
num.partitions = 2
num.network.threads = 2
socket.request.max.bytes = 104857600
sasl.kerberos.ticket.renew.window.factor = 0.8
log.roll.ms = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
socket.send.buffer.bytes = 102400
log.flush.interval.ms = null
ssl.truststore.location = null
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
default.replication.factor = 1
metrics.sample.window.ms = 30000
auto.leader.rebalance.enable = true
host.name = 192.168.170.69
ssl.truststore.type = JKS
advertised.port = null
max.connections.per.ip.overrides = 
replica.fetch.min.bytes = 1
ssl.keystore.type = JKS
 (kafka.server.KafkaConfig)
[2016-05-17 16:44:54,193] INFO starting (kafka.server.KafkaServer)
[2016-05-17 16:44:54,202] INFO Connecting to zookeeper on 192.168.170.69:2181 (kafka.server.KafkaServer)
[2016-05-17 16:44:54,227] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2016-05-17 16:44:54,238] INFO Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,238] INFO Client environment:host.name=h1 (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,238] INFO Client environment:java.version=1.7.0_51 (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,238] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,238] INFO Client environment:java.home=/usr/jdk1.7.0_51/jre (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,238] INFO Client environment:java.class.path=.:/usr/jdk1.7.0_51/jre/lib/rt.jar:/usr/jdk1.7.0_51/lib/dt.jar:/usr/jdk1.7.0_51/lib/tools.jar:/home/grid/apache-hive-1.0.1-bin/lib:/home/grid/apache-mahout-distribution-0.11.0/lib:/home/grid/hbase-1.1.2/lib:/usr/jdk1.7.0_51/jre/lib/tools.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka-clients-0.9.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/connect-runtime-0.9.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jetty-http-9.2.12.v20150709.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/lz4-1.2.0.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/validation-api-1.1.0.Final.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/javax.ws.rs-api-2.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/hk2-utils-2.4.0-b31.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/snappy-java-1.1.1.7.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/connect-json-0.9.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jackson-module-jaxb-annotations-2.5.4.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/metrics-core-2.2.0.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jetty-security-9.2.12.v20150709.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jersey-guava-2.22.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jetty-servlet-9.2.12.v20150709.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka_2.11-0.9.0.1-scaladoc.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/zkclient-0.7.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/scala-xml_2.11-1.0.4.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jersey-common-2.22.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jersey-server-2.22.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka-tools-0.9.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jetty-io-9.2.12.v20150709.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jersey-client-2.22.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/javax.servlet-api-3.1.0.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jersey-container-servlet-2.22.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jopt-simple-3.2.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jetty-util-9.2.12.v20150709.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/aopalliance-repackaged-2.4.0-b31.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/osgi-resource-locator-1.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka_2.11-0.9.0.1-test.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka_2.11-0.9.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/javax.inject-2.4.0-b31.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jackson-annotations-2.5.0.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jackson-core-2.5.4.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/argparse4j-0.5.0.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jackson-jaxrs-base-2.5.4.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/connect-file-0.9.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/scala-parser-combinators_2.11-1.0.4.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/hk2-locator-2.4.0-b31.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/slf4j-api-1.7.6.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jetty-server-9.2.12.v20150709.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka_2.11-0.9.0.1-javadoc.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/log4j-1.2.17.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/slf4j-log4j12-1.7.6.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jackson-jaxrs-json-provider-2.5.4.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/zookeeper-3.4.6.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/javax.inject-1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jersey-container-servlet-core-2.22.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jackson-databind-2.5.4.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/javassist-3.18.1-GA.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka_2.11-0.9.0.1-sources.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/kafka-log4j-appender-0.9.0.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/javax.annotation-api-1.2.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/hk2-api-2.4.0-b31.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/scala-library-2.11.7.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/jersey-media-jaxb-2.22.1.jar:/home/grid/kafka_2.11-0.9.0.1/bin/../libs/connect-api-0.9.0.1.jar (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,239] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,239] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,240] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,240] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,240] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,240] INFO Client environment:os.version=2.6.18-406.el5 (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,240] INFO Client environment:user.name=grid (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,240] INFO Client environment:user.home=/home/grid (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,240] INFO Client environment:user.dir=/home/grid/kafka_2.11-0.9.0.1 (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,242] INFO Initiating client connection, connectString=192.168.170.69:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@30387103 (org.apache.zookeeper.ZooKeeper)
[2016-05-17 16:44:54,270] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2016-05-17 16:44:54,273] INFO Opening socket connection to server 192.168.170.69/192.168.170.69:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2016-05-17 16:44:54,279] INFO Socket connection established to 192.168.170.69/192.168.170.69:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2016-05-17 16:44:54,290] INFO Session establishment complete on server 192.168.170.69/192.168.170.69:2181, sessionid = 0x154bd8ad2b8000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2016-05-17 16:44:54,293] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2016-05-17 16:44:54,648] INFO Loading logs. (kafka.log.LogManager)
[2016-05-17 16:44:54,698] INFO Completed load of log idoall_testTopic-0 with log end offset 0 (kafka.log.Log)
[2016-05-17 16:44:54,709] INFO Logs loading complete. (kafka.log.LogManager)
[2016-05-17 16:44:54,819] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2016-05-17 16:44:54,822] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2016-05-17 16:44:54,895] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2016-05-17 16:44:54,899] INFO [Socket Server on Broker 69], Started 1 acceptor threads (kafka.network.SocketServer)
[2016-05-17 16:44:54,934] INFO [ExpirationReaper-69], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-05-17 16:44:54,935] INFO [ExpirationReaper-69], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-05-17 16:44:54,998] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-05-17 16:44:55,013] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-05-17 16:44:55,014] INFO 69 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-05-17 16:44:55,690] INFO New leader is 69 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-05-17 16:44:55,701] INFO [GroupCoordinator 69]: Starting up. (kafka.coordinator.GroupCoordinator)
[2016-05-17 16:44:55,705] INFO [ExpirationReaper-69], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-05-17 16:44:55,706] INFO [GroupCoordinator 69]: Startup complete. (kafka.coordinator.GroupCoordinator)
[2016-05-17 16:44:55,715] INFO [Group Metadata Manager on Broker 69]: Removed 0 expired offsets in 14 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-05-17 16:44:55,715] INFO [ExpirationReaper-69], Starting  (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2016-05-17 16:44:55,737] INFO [ThrottledRequestReaper-Produce], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-05-17 16:44:55,738] INFO [ThrottledRequestReaper-Fetch], Starting  (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2016-05-17 16:44:55,746] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2016-05-17 16:44:55,764] INFO Creating /brokers/ids/69 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-05-17 16:44:55,769] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-05-17 16:44:55,771] INFO Registered broker 69 at path /brokers/ids/69 with addresses: PLAINTEXT -> EndPoint(192.168.170.69,9092,PLAINTEXT) (kafka.utils.ZkUtils)
[2016-05-17 16:44:55,785] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-17 16:44:55,785] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser)
[2016-05-17 16:44:55,787] INFO [Kafka Server 69], started (kafka.server.KafkaServer)
[2016-05-17 16:44:56,183] INFO [ReplicaFetcherManager on broker 69] Removed fetcher for partitions [idoall_testTopic,0] (kafka.server.ReplicaFetcherManager)
[2016-05-17 16:44:56,307] INFO [ReplicaFetcherManager on broker 69] Removed fetcher for partitions [idoall_testTopic,0] (kafka.server.ReplicaFetcherManager)
[2016-05-17 16:54:55,702] INFO [Group Metadata Manager on Broker 69]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-05-17 17:04:55,702] INFO [Group Metadata Manager on Broker 69]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-05-17 17:14:55,701] INFO [Group Metadata Manager on Broker 69]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-05-17 17:17:41,732] INFO Topic creation {"version":1,"partitions":{"1":[69],"0":[69]}} (kafka.admin.AdminUtils$)
[2016-05-17 17:17:41,739] INFO [KafkaApi-69] Auto creation of topic test_topic with 2 partitions and replication factor 1 is successful! (kafka.server.KafkaApis)
[2016-05-17 17:17:42,115] INFO [ReplicaFetcherManager on broker 69] Removed fetcher for partitions [test_topic,0],[test_topic,1] (kafka.server.ReplicaFetcherManager)
[2016-05-17 17:17:42,124] INFO Completed load of log test_topic-0 with log end offset 0 (kafka.log.Log)
[2016-05-17 17:17:42,132] INFO Created log for partition [test_topic,0] in /home/temp/zookeeper/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2016-05-17 17:17:42,134] INFO Partition [test_topic,0] on broker 69: No checkpointed highwatermark is found for partition [test_topic,0] (kafka.cluster.Partition)
[2016-05-17 17:17:42,147] INFO Completed load of log test_topic-1 with log end offset 0 (kafka.log.Log)
[2016-05-17 17:17:42,153] INFO Created log for partition [test_topic,1] in /home/temp/zookeeper/kafka-logs with properties {flush.messages -> 9223372036854775807, segment.bytes -> 1073741824, preallocate -> false, cleanup.policy -> delete, delete.retention.ms -> 86400000, segment.ms -> 604800000, min.insync.replicas -> 1, file.delete.delay.ms -> 60000, retention.ms -> 604800000, max.message.bytes -> 1000012, index.interval.bytes -> 4096, segment.index.bytes -> 10485760, retention.bytes -> -1, segment.jitter.ms -> 0, min.cleanable.dirty.ratio -> 0.5, compression.type -> producer, unclean.leader.election.enable -> true, flush.ms -> 9223372036854775807}. (kafka.log.LogManager)
[2016-05-17 17:17:42,154] INFO Partition [test_topic,1] on broker 69: No checkpointed highwatermark is found for partition [test_topic,1] (kafka.cluster.Partition)
[2016-05-17 17:24:55,701] INFO [Group Metadata Manager on Broker 69]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-05-17 17:34:55,702] INFO [Group Metadata Manager on Broker 69]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)


以上表示启动成功

第五  步

           

创建topic

 ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test &



列出topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181

test

producer
# Send some messages (发送一些消息)
输入一条信息(Thisis a message: The you smile until forever),并且Ctrl+z退出shell

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message: The you smile until forever



comsumer
# Start a consumer(开启一个消费者)
输入命令之后打印出一些信息,最后面显示了刚刚输入的信息:Thisis a message: The you smile until forever
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message: The you smile until forever


java代码 生产者

package xiefg.testkafka;
 
import java.util.Date;
import java.util.Properties;
import java.text.SimpleDateFormat;   
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
 
/**
 * 
* <p>Title:ProducerTest </p>
* <p>Description: 生产者</p>、
*  @date 2016年5月17日下午5:09:02
 */
public class ProducerTest {
     
     public static void main(String[] args) {
         Properties props = new Properties();
         props.put("zk.connect", "192.168.170.69:2181");
         // serializer.class为消息的序列化类
         props.put("serializer.class", "kafka.serializer.StringEncoder");
         // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例
         props.put("metadata.broker.list", "192.168.170.69:9092");
         // 设置Partition类, 对队列进行合理的划分
         //props.put("partitioner.class", "idoall.testkafka.Partitionertest");
         // ACK机制, 消息发送需要kafka服务端确认
         props.put("request.required.acks", "1");
 
          props.put("num.partitions", "3");
         ProducerConfig config = new ProducerConfig(props);
         Producer<String, String> producer = new Producer<String, String>(config);
         for (int i = 0; i < 10; i++)
         {
           // KeyedMessage<K, V>
           //   K对应Partition Key的类型
           //   V对应消息本身的类型
//   topic: "test", key: "key", message: "message"
           SimpleDateFormat formatter = new SimpleDateFormat   ("yyyy年MM月dd日 HH:mm:ss SSS");       
           Date curDate = new Date(System.currentTimeMillis());//获取当前时间       
           String curTime = formatter.format(curDate);   
            
           String msg = "xiefg.org" + i+"="+curTime;
           String key = i+"";
           producer.send(new KeyedMessage<String, String>("test_topic",key, msg));
         }
       }
}
java代码 消费者者


import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;


import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;


/**
 * 
 * <p>
 * Title:Consumertest
 * </p>
 * <p>
 * Description: 消费者端
 * </p>
 * 
 * @author xiefg
 * @date 2016年5月17日下午5:06:36
 */
public class ConsumerTest extends Thread {


private final ConsumerConnector consumer;//连接
private final String topic;


public static void main(String[] args) {
ConsumerTest consumerThread = new ConsumerTest("test_topic");
consumerThread.start();
}


public ConsumerTest(String topic) {
consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
}


private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put("zookeeper.connect", "192.168.170.69:2181");
// 设置group id
props.put("group.id", "69");
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put("auto.commit.interval.ms", "1000");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}


public void run() {
// 构建具体的流
Map<String, Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer
.createMessageStreams(topickMap);
KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
System.out.println("============【结果】========");
while (it.hasNext()) {
System.err.println("get data:" + new String(it.next().message()));
try {
Thread.sleep(1000);//间隔1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


运行结果:



Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐