Kafka 简介

kafka 是一款分布式消息发布和订阅的系统,具有高性能和高吞吐率。

Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

这里写图片描述

核心概念:

  • Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker

  • Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition
    Partition是物理上的概念,每个Topic包含一个或多个Partition.

  • Producer
    负责发布消息到Kafka broker

  • Consumer
    消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

  • 消息的发布(publish)称作producer,消息的订阅(subscribe)称作consumer,中间的存储阵列称作broker。

  • 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发。

  • producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理。

  • broker端不维护数据的消费状态,提升了性能。

  • 直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

  • Kafka使用scala编写,可以运行在JVM上。

Kafaka 安装

可以按照 CHD5.11 离线安装或者升级 Spark2.x 详细步骤 教程进行安装。

角色分配:

这里写图片描述

安装出现如下问题及解决方案:

WARN Log Found a corrupted index file, /var/local/kafka/data/__consumer_offsets-33/00000000000000000000.index, deleting and rebuilding index...


上午10点10:04.842分	FATAL	KafkaServer	
Fatal error during KafkaServer startup. Prepare to shutdown
kafka.common.InconsistentBrokerIdException: Configured broker.id 118 doesn't match stored broker.id 145 in meta.properties. If you moved your data, make sure your configured broker.id matches. If you intend to create a new broker, you should remove all data in your data directories (log.dirs).
	at kafka.server.KafkaServer.getBrokerId(KafkaServer.scala:648)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:187)
	at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:37)
	at kafka.Kafka$.main(Kafka.scala:67)
	at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:76)
	at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala)
	
	
// 解决方案	
vi /opt/cloudera/parcels/KAFKA/etc/kafka/conf.dist/server.properties
看到:	
log.dirs=/tmp/kafka-logs

删除 log.dirs 目录

结果发现 /tmp/kafka-logs没有,全部在/tmp目录下,故执行 rm -rf /tmp/kafka*

在删除log 文件, rm -Rf /var/log/kafka*


// 然后重新安装kafka就可以了

Kafka 简单使用

Kafka command-line tools are located in /usr/bin:

kafka-topics

// 创建 topic test
# kafka-topics --create --zookeeper master:2181 --replication-factor 2 --partitions 4 --topic test
Created topic "test".

// 查看 topics
# kafka-topics --zookeeper master:2181 --list
test

// 删除 topic test
kafka-topics --delete --zookeeper master:2181 --topic test

kafka-console-consumer

Read data from a Kafka topic and write it to standard output. For example:

// 在slave1 上启动 consumer
kafka-console-consumer --zookeeper slave1:2181 --topic test

kafka-console-producer

Read data from standard output and write it to a Kafka topic. For example:

# kafka-console-producer --broker-list slave1:9092,slave2:9092 --topic test

然后在屏幕上输入数据,在consumer端可以接收到数据。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐