Kafka-安装和使用
一、下载安装包1、选择所需版本下载[root@ywxtdb opt]# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz2、解压安装包[root@ywxtdb opt]# tar -xvf kafka_2.12-2.1.0.tgz二、修改配置[root@ywxtdb config]# vi /opt/kafk
一、下载安装包
1、选择所需版本下载
[root@ywxtdb opt]# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
2、解压安装包
[root@ywxtdb opt]# tar -xvf kafka_2.12-2.1.0.tgz
二、修改配置
[root@ywxtdb config]# vi /opt/kafka_2.12-2.1.0/config/server.properties
主要修改:
broker.id=0
listeners=PLAINTEXT://192.168.1.128:9092
log.dirs=/var/kafka-logs
zookeeper.connect=localhost:2181
集群:
zookeeper.connect=192.168.91.128:2181,192.168.91.129:2181,129.168.91.130:2181/kafka
三、配置环境变量
[root@ywxtdb config]# vi /etc/profile
export KAFKA_HOME=/opt/kafka_2.12-2.1.0
export PATH=$PATH:$KAFKA_HOME/bin
四、启动kafka
启动kafka前,记得先启动zookeeper。
[root@ywxtdb config]# kafka-server-start.sh ./server.properties
启动正常后,可以看到zookeeper下面创建了一个kafka节点
[zk: localhost:2181(CONNECTED) 16] ls /
[kafka, zookeeper]
五、创建topics
1、查看关于topics的相关脚本参数
其中REQUIRED是必填的参数
[root@ywxtdb ~]# kafka-topics.sh
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase
from one, then you should configure the number of parallel GC threads appropriately using
-XX:ParallelGCThreads=N
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option).
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--exclude-internal exclude internal topics when running
list or describe command. The
internal topics will be listed by
default
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting
topics, the action will only execute
if the topic exists
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being created.
--topic <String: topic> The topic to be create, alter or
describe. Can also accept a regular
expression except for --create option
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--zookeeper <String: hosts> REQUIRED: The connection string for
the zookeeper connection in the form
host:port. Multiple hosts can be
given to allow fail-over.
2、执行创建命令
kafka-topics.sh 处理topic的脚本,他需要依赖zookeeper去创建topic,需要加上zk的配置。
--create 说明当前是创建topic
--topic 要创建的topic名称
--partitions 创建的分区数,主要看你节点数量来配置
--replication-factor 创建分区的附本数量
[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic bobby --partitions 2 --replication-factor 2
Created topic "bobby".
3、查询创建的topic
[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --list bobby
bobby
4、查看描述信息
[root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --describe --topic bobby
Topic:bobby PartitionCount:2 ReplicationFactor:2 Configs:
Topic: bobby Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: bobby Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
六、启动consumer
1、查看下关于consumer处理脚本的参数
其中REQUIRED是必填的参数
[root@ywxtdb ~]# kafka-console-consumer.sh
The console consumer is a tool that reads data from Kafka and outputs it to standard output.
Option Description
------ -----------
--bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
connect to>
--consumer-property <String: A mechanism to pass user-defined
consumer_prop> properties in the form key=value to
the consumer.
--consumer.config <String: config file> Consumer config properties file. Note
that [consumer-property] takes
precedence over this config.
--enable-systest-events Log lifecycle events of the consumer
in addition to logging consumed
messages. (This is specific for
system tests.)
--formatter <String: class> The name of a class to use for
formatting kafka messages for
display. (default: kafka.tools.
DefaultMessageFormatter)
--from-beginning If the consumer does not already have
an established offset to consume
from, start with the earliest
message present in the log rather
than the latest message.
--group <String: consumer group id> The consumer group id of the consumer.
--isolation-level <String> Set to read_committed in order to
filter out transactional messages
which are not committed. Set to
read_uncommittedto read all
messages. (default: read_uncommitted)
--key-deserializer <String:
deserializer for key>
--max-messages <Integer: num_messages> The maximum number of messages to
consume before exiting. If not set,
consumption is continual.
--offset <String: consume offset> The offset id to consume from (a non-
negative number), or 'earliest'
which means from beginning, or
'latest' which means from end
(default: latest)
--partition <Integer: partition> The partition to consume from.
Consumption starts from the end of
the partition unless '--offset' is
specified.
--property <String: prop> The properties to initialize the
message formatter. Default
properties include:
print.timestamp=true|false
print.key=true|false
print.value=true|false
key.separator=<key.separator>
line.separator=<line.separator>
key.deserializer=<key.deserializer>
value.deserializer=<value.
deserializer>
Users can also pass in customized
properties for their formatter; more
specifically, users can pass in
properties keyed with 'key.
deserializer.' and 'value.
deserializer.' prefixes to configure
their deserializers.
--skip-message-on-error If there is an error when processing a
message, skip it instead of halt.
--timeout-ms <Integer: timeout_ms> If specified, exit if no message is
available for consumption for the
specified interval.
--topic <String: topic> The topic id to consume on.
--value-deserializer <String:
deserializer for values>
--whitelist <String: whitelist> Whitelist of topics to include for
consumption.
2、执行启动脚本
客户端无需依赖zookeeper,使用bootstrap-server启动,
--topic topic的名称
--group 创建一个分组,带上分组名
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bobby --group demo
3、检查group
[root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
demo
demo1
查看指定group情况
[root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group demo
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
bobby 0 16 16 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
bobby 1 25 25 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
[root@node01 kafka-logs]#
七、启动producer
1、查看关于producer处理的脚本参数
其中REQUIRED是必填的参数
[root@ywxtdb ~]# kafka-console-producer.sh
Read data from standard input and publish it to Kafka.
Option Description
------ -----------
--batch-size <Integer: size> Number of messages to send in a single
batch if they are not being sent
synchronously. (default: 200)
--broker-list <String: broker-list> REQUIRED: The broker list string in
the form HOST1:PORT1,HOST2:PORT2.
--compression-codec [String: The compression codec: either 'none',
compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'.
If specified without value, then it
defaults to 'gzip'
--line-reader <String: reader_class> The class name of the class to use for
reading lines from standard in. By
default each line is read as a
separate message. (default: kafka.
tools.
ConsoleProducer$LineMessageReader)
--max-block-ms <Long: max block on The max time that the producer will
send> block for during a send request
(default: 60000)
--max-memory-bytes <Long: total memory The total memory used by the producer
in bytes> to buffer records waiting to be sent
to the server. (default: 33554432)
--max-partition-memory-bytes <Long: The buffer size allocated for a
memory in bytes per partition> partition. When records are received
which are smaller than this size the
producer will attempt to
optimistically group them together
until this size is reached.
(default: 16384)
--message-send-max-retries <Integer> Brokers can fail receiving the message
for multiple reasons, and being
unavailable transiently is just one
of them. This property specifies the
number of retires before the
producer give up and drop this
message. (default: 3)
--metadata-expiry-ms <Long: metadata The period of time in milliseconds
expiration interval> after which we force a refresh of
metadata even if we haven't seen any
leadership changes. (default: 300000)
--producer-property <String: A mechanism to pass user-defined
producer_prop> properties in the form key=value to
the producer.
--producer.config <String: config file> Producer config properties file. Note
that [producer-property] takes
precedence over this config.
--property <String: prop> A mechanism to pass user-defined
properties in the form key=value to
the message reader. This allows
custom configuration for a user-
defined message reader.
--request-required-acks <String: The required acks of the producer
request required acks> requests (default: 1)
--request-timeout-ms <Integer: request The ack timeout of the producer
timeout ms> requests. Value must be non-negative
and non-zero (default: 1500)
--retry-backoff-ms <Integer> Before each retry, the producer
refreshes the metadata of relevant
topics. Since leader election takes
a bit of time, this property
specifies the amount of time that
the producer waits before refreshing
the metadata. (default: 100)
--socket-buffer-size <Integer: size> The size of the tcp RECV size.
(default: 102400)
--sync If set message send requests to the
brokers are synchronously, one at a
time as they arrive.
--timeout <Integer: timeout_ms> If set and the producer is running in
asynchronous mode, this gives the
maximum amount of time a message
will queue awaiting sufficient batch
size. The value is given in ms.
(default: 1000)
--topic <String: topic> REQUIRED: The topic id to produce
messages to.
2、执行命令
--topic topic名称
--broker-list 指定kafka节点,关联当下的broker
当producer输入hello后
[root@node01 bin]# kafka-console-producer.sh --topic bobby --broker-list node3:9092
>111
可以看到consumer端已经消费到111
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic bobby --group demo
111
一个生产者 , 一个消费者
一个生产者,两组消费者,demo组有两个消费者,demo1组有一个消费者
生产者
demo组1号消费者
demo组2号消费者
demo1组消费者
结论:
1、不同组可以消费相同的数据,一个组内不能重复消费数据。
2、不同的分区不能重复消费数据,一个分区不能同时给一个组内的多个消费者消费,
多个分区可以给一个消费者消费。
更多推荐
所有评论(0)