概念理解:

消息读取方面

kafka中的消息,如果被读取后,可以被重复读取;

如果是被group A的用户读取了,其他组的用户group B组的用户可以读取;

如果被组里user1读取了,组内其他成员B,在从头开始读取时,可以读取到该数据;

Consumer group

http://www.cnblogs.com/huxi2b/p/6223228.html

个人认为,理解consumer group记住下面这三个特性就好了:

consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程

group.id是一个字符串,唯一标识一个consumer group

consumer group下订阅的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)

http://www.cnblogs.com/huxi2b/p/6061110.html

http://www.cnblogs.com/huxi2b/p/6223228.html

本文着重讨论了一下新版本的consumer group的内部设计原理,特别是consumer group与coordinator之间的交互过程,希望对各位有所帮助。

 

每次group进行rebalance之后,generation号都会加1,表示group进入到了一个新的版本,如下图所示: Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,之后成员4加入,再次触发rebalance,group进入Generation 3.

 

group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:

 1、Heartbeat请求:consumer需要定期给coordinator发送心跳来表明自己还活着

2、LeaveGroup请求:主动告诉coordinator我要离开consumer group

3、SyncGroup请求:group leader把分配方案告诉组内所有成员

4、JoinGroup请求:成员请求加入组

5、DescribeGroup请求:显示组的所有信息,包括成员信息,协议名称,分配方案,订阅信息等。通常该请求是给管理员使用

Coordinator在rebalance的时候主要用到了前面4种请求。

 

组成员崩溃(member failure)

可以说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance。okay,

 

难道每次变更,generation都是会加1?

有相同的generation时,就会触发rebalance;

 

Kafka在zk上的存储

slider拉起的kafka在zookeeper中的存储:存储在目录/kafka/client下;

zk的目录下:

 ./zkCli.sh   -server  dcp11

/kafka/client

[zk: dcp11(CONNECTED) 10] ls /kafka/client/kafkajiu0522

[consumers, cluster, config, controller, isr_change_notification, admin, brokers, controller_epoch]

 

ls /kafka/client/kafkajiu0522/brokers/topics   ----该kafka中的topic信息;

ls /kafka/client/kafkajiu0522/admin/delete_topics  ---该kafka中删除的topic信息;

 

 

 

kafka_server_jaas.conf文件的配置内容:

KafkaServer {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=kafka

    keyTab="/etc/security/keytabs/kafkadcp18.keytab"

    principal="kafka/dcp18@DCP.COM";

};

 

KafkaClient {

    com.sun.security.auth.module.Krb5LoginModule required

useTicketCache=true;

};

 

Client {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=zookeeper

    keyTab="/etc/security/keytabs/kafkadcp18.keytab"

    principal="kafka/dcp18@DCP.COM";

};

命令

关于kafka启动停止、topic的增查删

创建topic:

./bin/kafka-topics --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --topic test2 --replication-factor 1 --partitions 2 --create

     bin/kafka-create-topic.sh   --replica 2 --partition 8 --topic test  --zookeeper 192.168.197.170:2181,192.168.197.171:2181

创建名为testtopic 8个分区分别存放数据,数据备份总共2

删除topic:

/data/data1/confluent-3.0.0/bin/kafka-topics --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka  --delete  --topic  topicdf02175

 

查看topic列表:

/data/data1/confluent-3.0.0/bin/kafka-topics --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --list

 ./kafka-topics --zookeeper  DCP187:2181/kafkakerberos   --list

 

启动

nohup sh kafka-server-start ../etc/kafka/server.properties &

    ( 先用kerberos用户登录到环境中:kinit  -kt /root/kafka.keytab  kafka@DCP.COM   );

 

停止:./kafka-server-stop  ../etc/kafka/server.properties &

 slider上创建topic命令注意和kafkaserver的创建有点不同:

 ./kafka-topics   --zookeeper  DCP185:2181,DCP186:2181,DCP187:2181/kafka/client/kafka04061  --topic  topic04102  --replication-factor 1 -partitions 1 -create

 

生产消费:

kerberos认证的,需要加上producer、consumer的对应配置文件:

./kafka-console-producer --broker-list DCP187:9092 --topic test2 --producer.config ../etc/kafka/producer.properties

选择用bootstrap方式消费的:

./kafka-console-consumer.sh  --from-beginning --topic topic05221 --new-consumer --consumer.config ../config/consumer.properties --bootstrap-server  dcp11:9092

 

选择用zookeeper方式消费的:

./kafka-console-consumer --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --from-beginning --topic test2 --new-consumer --consumer.config ./etc/kafka/consumer.properties --bootstrap-server DCP187:9092

 

kafka不同消费方式后,记录存储的数据文件一样:

无kerberos认证的

用zookeeper方式消费:./kafka-console-consumer.sh  --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka/client/kafka04112    --from-beginning --topic topic1;

用bootstrap-server方式消费:./kafka-console-consumer.sh     --bootstrap-server DCP186:39940   --topic topic1     --from-beginning,   --bootstrap-servers必须是kafka集群中的其他一台;

存储的数据文件是:topic1-1和__consumer_offsets-0

存储文件,消费到数据后,每个分区存放一个文件目录;

关于acl权限:设置、查看、删除

给普通用户设置读写权限:

给client用户赋producer写权限:

./kafka-acls --authorizer-properties zookeeper.connect=DCP185:2181,DCP186:2181,DCP187:2181/kafka --add --allow-principal User:client --producer --topic test1

给client用户赋consumer读权限:

./kafka-acls --authorizer-properties zookeeper.connect=ai185:2181,ai186:2181,ai187:2181/kafka1017 --add --allow-principal User:client --consumer --topic test --group test-consumer-group 

删除所有权限

./kafka-acls --authorizer-properties zookeeper.connect=dcp18:2181,dcp16:2181,dcp19:2181/kafkakerberos --remove   --producer --topic topicout05054

查看acl

./kafka-acls --authorizer-properties zookeeper.connect=DCP185:2181,DCP186:2181,DCP187:2181/kafka --list --topic test1

kafka不区分hostname;

 

常见问题及解决方法

启动报错

第一种错误

2017-02-17 17:25:29,224] FATAL Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)

kafka.common.KafkaException: Failed to acquire lock on file .lock in /var/log/kafka-logs. A Kafka instance in another process or thread is using this directory.

    at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:100)

    at kafka.log.LogManager$$anonfun$lockLogDirs$1.apply(LogManager.scala:97)

    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)

    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)

    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)

    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)

    at scala.collection.AbstractTraversable.map(Traversable.scala:104)

    at kafka.log.LogManager.lockLogDirs(LogManager.scala:97)

    at kafka.log.LogManager.<init>(LogManager.scala:59)

    at kafka.server.KafkaServer.createLogManager(KafkaServer.scala:609)

    at kafka.server.KafkaServer.startup(KafkaServer.scala:183)

    at io.confluent.support.metrics.SupportedServerStartable.startup(SupportedServerStartable.java:100)

    at io.confluent.support.metrics.SupportedKafka.main(SupportedKafka.java:49)

 

解决方法:Failed to acquire lock on file .lock in /var/log/kafka-logs.--问题原因是有其他的进程在使用kafka,ps -ef|grep kafka,杀掉使用该目录的进程即可;

第二种错误:对index文件无权限

 

把文件的权限更改为正确的用户名和用户组即可;

 目录/var/log/kafka-logs/,其中__consumer_offsets-29是偏移量;

第三种生产消费报错:jaas连接有问题

kafka_client_jaas.conf文件配置有问题

16环境上

/opt/dataload/filesource_wangjuan/conf下kafka_client_jaas.conf

 

KafkaClient {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/home/client/keytabs/client.keytab"

        serviceName="kafka"

    principal="client/dcp@DCP.COM";

};

 

生产报错

第一种:生产者向topic发送消息失败:

[2017-03-09 09:16:00,982] [ERROR] [startJob_Worker-10] [DCPKafkaProducer.java line:62] produceR向topicdf02211发送信息出现异常

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)

原因是配置文件:kafka_client_jaas.conf中配置有问题,keyTab的路径不对,导致的;

第二种:生产消费报错: Failed to construct kafka producer

报错关键信息:Failed to construct kafka producer

解决方法:配置文件问题:KafkaClient中serviceName应该是kafka,之前配置成了zookeeper;重启后,就好了;

配置文件如下:

KafkaServer {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=kafka

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

KafkaClient {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    serviceName=kafka

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

Client {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=zookeeper

    keyTab="/etc/security/keytabs/kafka.service.keytab"

    principal="kafka/dcp16@DCP.COM";

};

 

 

问题描述:

 

[kafka@DCP16 bin]$ ./kafka-console-producer   --broker-list DCP16:9092 --topic topicin050511  --producer.config ../etc/kafka/producer.properties

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:335)

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:188)

    at kafka.producer.NewShinyProducer.<init>(BaseProducer.scala:40)

    at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:45)

    at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

Caused by: org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)

    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)

    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)

    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:277)

    ... 4 more

Caused by: java.lang.IllegalArgumentException: Conflicting serviceName values found in JAAS and Kafka configs value in JAAS file zookeeper, value in Kafka config kafka

    at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:305)

    at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:103)

    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:45)

    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)

    ... 7 more

[kafka@DCP16 bin]$ ./kafka-console-producer   --broker-list DCP16:9092 --topic topicin050511  --producer.config ../etc/kafka/producer.properties

 

消费时报错: ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)

 

[root@DCP16 bin]# ./kafka-console-consumer --zookeeper dcp18:2181,dcp16:2181,dcp19:2181/kafkakerberos --from-beginning --topic topicout050511 --new-consumer --consumer.config ../etc/kafka/consumer.properties --bootstrap-server DCP16:9092

[2017-05-07 22:24:37,479] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)

    at kafka.consumer.NewShinyConsumer.<init>(BaseConsumer.scala:53)

    at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:64)

    at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:51)

    at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:86)

    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:70)

    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:83)

    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:623)

    ... 6 more

Caused by: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner  authentication information from the user

    at com.sun.security.auth.module.Krb5LoginModule.promptForPass(Krb5LoginModule.java:899)

    at com.sun.security.auth.module.Krb5LoginModule.attemptAuthentication(Krb5LoginModule.java:719)

    at com.sun.security.auth.module.Krb5LoginModule.login(Krb5LoginModule.java:584)

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

    at java.lang.reflect.Method.invoke(Method.java:606)

    at javax.security.auth.login.LoginContext.invoke(LoginContext.java:762)

    at javax.security.auth.login.LoginContext.access$000(LoginContext.java:203)

    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:690)

    at javax.security.auth.login.LoginContext$4.run(LoginContext.java:688)

    at java.security.AccessController.doPrivileged(Native Method)

    at javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:687)

    at javax.security.auth.login.LoginContext.login(LoginContext.java:595)

    at org.apache.kafka.common.security.authenticator.AbstractLogin.login(AbstractLogin.java:69)

    at org.apache.kafka.common.security.kerberos.KerberosLogin.login(KerberosLogin.java:110)

    at org.apache.kafka.common.security.authenticator.LoginManager.<init>(LoginManager.java:46)

    at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:68)

    at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:78)

 

衍生问题:

kafka生产消息就会报错:

[2017-05-07 23:17:16,240] ERROR Error when sending message to topic topicin050511 with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

 

把KafkaClient更改为如下的配置,就可以 了:

KafkaClient {

    com.sun.security.auth.module.Krb5LoginModule required

   useTicketCache=true;

};

 

 

 

消费报错

第一种错误:replication factor: 1 larger than available brokers: 0

消费时报错:Error while executing topic command : replication factor: 1 larger than available brokers: 0

解决办法:/confluent-3.0.0/bin  下重启daemon

./kafka-server-stop  -daemon   ../etc/kafka/server.properties

./kafka-server-start  -daemon   ../etc/kafka/server.properties

   

然后zk重启;sh zkCli.sh -server ai186;

/usr/hdp/2.4.2.0-258/zookeeper/bin/zkCli.sh   --脚本的目录

如果还报错,可以查看配置文件中下面的配置:

zookeeper.connect=dcp18:2181/kafkakerberos;  --是group名称

 

 

第二种错误:TOPIC_AUTHORIZATION_FAILED

./bin/kafka-console-consumer --zookeeper DCP185:2181,DCP186:2181,DCP187:2181/kafka --from-beginning --topic wangjuan_topic1 --new-consumer --consumer.config ./etc/kafka/consumer.properties --bootstrap-server DCP187:9092

[2017-03-02 13:44:38,398] WARN The configuration zookeeper.connection.timeout.ms = 6000 was supplied but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)

[2017-03-02 13:44:38,575] WARN Error while fetching metadata with correlation id 1 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

[2017-03-02 13:44:38,677] WARN Error while fetching metadata with correlation id 2 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

[2017-03-02 13:44:38,780] WARN Error while fetching metadata with correlation id 3 : {wangjuan_topic1=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

 

 

解决方法:配置文件中下面的参数中的User的U必须是大写;

super.users=User:kafka

或者有可能是server.properties中的adver.listen的IP是不对的,有可能是代码中写死的IP;

 

第三种错误的可能的解决方法:

无法消费,则查看kafka的启动日志中的报错信息:日志文件的所属组不对,应该是hadoop;

或者,查看kafka对应的zookeeper的配置后缀,是否已经更改,如果更改了,则topic需要重新生成才行;

 

第三种错误:消费的tomcat报错:

[2017-04-01 06:37:21,823] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group

[2017-04-01 06:37:21,825] [WARN] [Thread-5] [ConsumerCoordinator.java line:476] Auto offset commit failed for group test-consumer-group: Commit offsets failed with retriable exception. You should retry committing offsets.

更改代码中,tomcat的心跳超时时间如下:

 

没有改之前的:;

./webapps/web/WEB-INF/classes/com/ai/bdx/dcp/hadoop/service/impl/DCPKafkaConsumer.class;

重启后,日志中显示:

[2017-04-01 10:14:56,167] [INFO] [Thread-5] [AbstractCoordinator.java line:542] Marking the coordinator DCP187:9092 (id: 2147483647 rack: null) dead for group test-consumer-group

[2017-04-01 10:14:56,286] [INFO] [Thread-5] [AbstractCoordinator.java line:505] Discovered coordinator DCP187:9092 (id: 2147483647 rack: null) for group test-consumer-group.

 

创建topic时错误

创建topic时报错:

[2017-04-10 10:32:23,776] WARN SASL configuration failed: javax.security.auth.login.LoginException: Checksum failed Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn)

Exception in thread "main" org.I0Itec.zkclient.exception.ZkAuthFailedException: Authentication failure

    at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:946)

    at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:923)

    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1230)

    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:156)

    at org.I0Itec.zkclient.ZkClient.<init>(ZkClient.java:130)

    at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:75)

    at kafka.utils.ZkUtils$.apply(ZkUtils.scala:57)

    at kafka.admin.TopicCommand$.main(TopicCommand.scala:54)

    at kafka.admin.TopicCommand.main(TopicCommand.scala)

问题定位:是jaas文件有问题:

解决方法:server.properties文件中的super.user要和jaas文件中的keytab的principle一致;

server.properties:super.users=User:client

kafka_server_jaas.conf文件改为:

 

KafkaServer {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=kafka

    keyTab="/data/data1/confluent-3.0.0/kafka.keytab"

    principal="kafka@DCP.COM";

};

 

KafkaClient {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/home/client/client.keytab"

    principal="client/DCP187@DCP.COM";

};

 

Client {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    useTicketCache=false

    serviceName=zookeeper

    keyTab="/home/client/client.keytab"

    principal="client/DCP187@DCP.COM";

};

 

kafka的配置文件:appConfig.json和resources.json配置项说明

两个文件的作用:

appConfig.json:可以覆盖在 metainfo.json中定义的配置值;当你需要提供新的运行时间方差时,这些可能会被用到;

resources.json:规定属于应用的每个部件类型所需要的yarn资源;

 

两个文件的配置项含义,下面的3个配置项没有查询到,其他的都已经确定;

appConfig.json文件中的:

   "site.global.app_user": "${USER_NAME}",                    

----?应用中使用的用户?我们环境中是client --官网和百度都没有找到其意义

   "site.broker.instance.name": "${USER}/${CLUSTER_NAME}",    

---broker的实例名称??官网上和官网都没有;72上没有配置具体值

   "site.server.port": "${KAFKA_BROKER.ALLOCATED_PORT}{PER_CONTAINER}",  

   ---KAFKA_BROKER.ALLOCATED_PORT}{PER_CONTAINER---官网上和百度都没有找到;

 

appConfig.json和resources.json各个配置项的含义分别如下:

appConfig.json文件的各项配置:

appConfig.json

 

{

     "components": {

         "broker": {},

         "slider-appmaster": {

             "slider.hdfs.keytab.dir":

"/user/client/.slider/keytabs/client",    ----keytab的目录;固定的,如果不存在,新建下即可;

             "slider.am.login.keytab.name": "client.keytab",             

        ---keytab文件,该文件必须关联所有kafka server和slider server的principle;

             "slider.keytab.principal.name": "client/dcp72@DCP.COM"      

         ---本机上,该keytab对应的principle

         }

     },

     "global": {

         "site.server.log.segment.bytes": "1073741824",                  

     ---- 一个log文件最大值,如果超过则切换到新的log文件

         "system_configs": "broker",                                

----slider的变量:发送给容器的配置类型列表,例如core-site,hdfs-site,hbase-site;

         "site.global.app_user": "${USER_NAME}",                    

----应用中使用的用户?不知道在哪里配置的

         "site.global.kafka_version": "kafka_3.0.0",                

---kafka的版本:confluent-3.0.0

         "site.broker.instance.name": "${USER}/${CLUSTER_NAME}",    

---实例名称??官网上没有

         "site.server.port":

"${KAFKA_BROKER.ALLOCATED_PORT}{PER_CONTAINER}",          ---?没有找到

         "site.global.pid_file": "${AGENT_WORK_ROOT}/app/run/koya.pid",  

      ---容器的进程id:ps -ef |grep  containername

         "site.server.num.network.threads": "3",                         

   ---处理网络请求的线程数

         "site.server.log.retention.check.interval.ms": "300000",        

    --日志保持检查间隔时间,默认300000ms ,5分钟

         "site.broker.xms_val": "819m",                                  

  

--??下面网址中没有找到:http://slider.incubator.apache.org/docs/slider_specs/application_instance_configuration.html#variable-naming-convention

         "site.server.delete.topic.enable": "true",                      

    ---能够删除kafka开关,如果设置为false,则删除失败;

         "java_home": "/usr/jdk64/jdk1.7.0_67",                          

     ---slider的变量:java的家目录,和环境中的一致

         "site.server.num.recovery.threads.per.data.dir": "1",           

     ---每个用来日志恢复的数据目录的线程数---需要更加精简化

         "site.server.log.dirs":

"/data/mfsdata/kafkashared/kafka022701/${@//site/global/app_container_tag}",

  ---数据存储的目录:kafka022701是命名随意,而且每个服务的该目录是唯一的,不可以重复;

         "site.server.num.partitions": "1",               

---每个topic的默认分区数,默认值为1,需要更多时,可以改大该值;

         "site.server.num.io.threads": "8",               

----做磁盘I/O操作的线程数

         "site.broker.zookeeper": "dcp187:2181,dcp186:2181,dcp185:2181", 

    ---broker所在的zookeeper集群

         "site.server.log.retention.hours": "168",                 

----每个日志最短保存时间,超过可删除

         "site.server.socket.request.max.bytes": "104857600",      

---一个socket可以接受的最大请求大小

         "site.global.app_install_dir": "${AGENT_WORK_ROOT}/app/install",

    ---应用安装的家目录:是应用的根目录;其中AGENT_WORK_ROOT是:container_work_dirs,即容器的工作目录;

         "site.global.app_root":

"${AGENT_WORK_ROOT}/app/install/kafka_2.11-0.10.1.1",    

--是应用的根目录;其中AGENT_WORK_ROOT是:container_work_dirs,即容器的工作目录;

         "site.server.socket.send.buffer.bytes": "102400",          

---socket 服务器使用的发送信息缓冲区

         "site.server.socket.receive.buffer.bytes": "102400",       

---socket 服务器使用的接收信息缓冲区

         "application.def":

".slider/package/KOYA/slider-kafka-app-package-0.91.0-incubating.zip"   

  ---slider的变量:应用程序定义包的位置,例如/slider/hbase_v096.zip

                      "create.default.zookeeper.node"              

-----可选配置项:应用是否需要默认的zk节点;我们配置文件中缺少该配置,该值是什么不清楚;yes/no?还是true/false?后续待验证;---

 

     },

     "metadata": {},

"schema": "http://example.org/specification/v2.0.0"

}

 

 

resources.json文件的各项配置项:

{

     "components": {

         "slider-appmaster": {},

         "KAFKA_BROKER": {

             "yarn.memory": "1024",          

--组件实例所需要的内存数量,单位为MB;该值必须比分配给任何JVM的堆(heap)

大小大;因为一个jvm会比单独使用一个heap使用更多的内存;

             "yarn.role.priority": "1",            

--该组件的唯一的优先级:它为独立组件类型提供唯一的索引;

             "yarn.container.failure.threshold": "10",   

---在一个失败窗口中,一个组件可以失败的次数;如果设置为0,则为不可以失败;

             "yarn.vcores": "1",              --所需要的虚拟核心的数量:vcore:virtual

  core;

             "yarn.component.instances": "3",          ---

             "yarn.container.failure.window.hours": "1"   

--容器失败的窗口,默认值为6;如果窗口大小有更改,该值则必须明确的配置下;如果失败超过了窗口,则失败次数需要重置;

         }

     },

     "global": {},

     "metadata": {},

"schema": "http://example.org/specification/v2.0.0"

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐