1. 现象

数据无法入链,orderer报错:“ Rejecting deliver request because of consenter error”
peer报错:“ Got error &{SERVICE_UNAVAILABLE}”

2. 问题定位

  • Kafka日志:
    在这里插入图片描述
  • Orderer日志:
    在这里插入图片描述
    在这里插入图片描述
    对比可发现,缺少日志:
    [channel: pubchain] Channel consumer set up successfully
    [channel: pubchain] Start phase completed successfully

应该是卡在setupChannelConsumerForChannel() 函数内。
在这里插入图片描述
再次推断,应该是卡在了orderer与kafka集群的连接。由于testchainid channel (system channel)与kafka集群的连接没问题,怀疑是kafka中pubchain的信息存在问题。

3. 解决思路

3.1 分析fabric1.0.0 kafka模式的工作细节。

定位持续数据流入期间,all in one模式重启后zookeeper&kafka不工作原因及解决方案。
调试步骤:
1)在fabric/orderer/kafka/chain.go中的setupChannelConsumerForChannel增加打印调试日志,便于定位分析:

// Sets up the partition consumer for a channel using the given retry options.
func setupChannelConsumerForChannel(retryOptions localconfig.Retry, haltChan chan struct{}, parentConsumer sarama.Consumer, channel channel, startFrom int64) (sarama.PartitionConsumer, error) {
        var err error
        var channelConsumer sarama.PartitionConsumer

        logger.Infof("[channel: %s][partition: %d] Setting up the channel consumer for this channel (start offset: %d)...", channel.topic(), channel.partition(), startFrom)

        retryMsg := "Connecting to the Kafka cluster setupChannelConsumerForChannel zyd***"
        setupChannelConsumer := newRetryProcess(retryOptions, haltChan, channel, retryMsg, func() error {
                channelConsumer, err = parentConsumer.ConsumePartition(channel.topic(), channel.partition(), startFrom)
                logger.Infof("zouyudi$$$$$$$ setupChannelConsumerForChannel channel:%s, partition:%d, EXPECTED STARTFROM:%d, error info: %s", channel.topic(), channel.partition(), startFrom, err)
                return err
        })

        return channelConsumer, setupChannelConsumer.retry()
}

2)模拟数据流入期间,强制关闭allinone服务器,重启服务器,启动后查看orderer日志信息

$ docker ps 
#获得orderer 对应的containerid
$ cat /var/lib/docker/containers/{containerid}/{containerid}-json.log | grep "Retrieved metadata"
{"log":"\u001b[36m2018-08-15 01:49:02.363 UTC [orderer/multichain] newChainSupport -\u003e DEBU 0ea\u001b[0m [channel: pubchain] Retrieved metadata for tip of chain (blockNumber=9, lastConfig=1, lastConfigSeq=2): value:\"\\010\\024\" \n","stream":"stderr","time":"2018-08-15T01:49:02.36809063Z"}
$ cat /var/lib/docker/containers/{containerid}/{containerid}-json.log | grep "start offset"
{"log":"2018-08-15 01:30:26.824 UTC [orderer/kafka] setupChannelConsumerForChannel -\u003e INFO 205\u001b[0m [channel: pubchain][partition: 0] Setting up the channel consumer for this channel (start offset: 21)...\n","stream":"stderr","time":"2018-08-15T01:30:26.825394083Z"}
$ cat /var/lib/docker/containers/{containerid}/{containerid}-json.log | grep "setupChannelConsumerForChannel"
{"log":"2018-08-15 01:30:36.825 UTC [orderer/kafka] func1 -\u003e INFO 221\u001b[0m zouyudi$$$$$$$ setupChannelConsumerForChannel channel:pubchain, partition:0, EXPECTED STARTFROM:21, error info: kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition.\n","stream":"stderr","time":"2018-08-15T01:30:36.826834332Z"}
#可以看到最后一个区块为9,该区块中记录的kafka consumer offset值为21(N_EXPECTED),报错信息为改制超过了kafka该topic维护的offset最大值范围

$ docker exec -it kafka0 bash
$ cd /opt/kafka
$ bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.209:11092 --from-beginning --topic pubchain --zookeeper 192.168.1.209:2181
# 按CTRL+C,可看到该通道中的消息总数为Processed a total of {N_ACTUAL} messages

####补救办法,往相应的topic补简单的垃圾信息如a,b,c,d等,让topic/partition内的消息总数达到N_EXPECTED####
$ docker exec -it kafka0 bash
$ cd /opt/kafka
$ bin/kafka-console-producer.sh --broker-list 192.168.1.209:11092 --topic pubchain
zyd
zouyudi
a^Hzy
zydab
zydc
zyd18
zyd19
zyd20
zyd21

# 当topic/partition中的消息总数达到N_EXPECTED时,orderer 超出10分钟小于12小时内与kafka重试间隔为5分钟,若着急可手工重启order服务。【根据fabric/orderer/kafka/retry.go以及orderer.yaml中kafka retry设置信息,10分钟内的重试间隔是5s,超出且在12小时内的重试间隔为5分钟。】
$ docker restart {orderer}

# 再看orderer日志,此时可正常与kafka建立消费通道。再发交易可正常不入链,不再报类似错误了。Internal server failure: Hiro response with error: rpc error: code = Unknown desc = CreateAndSendTransactionProposal returned error: invoke Endorser  returned error: Transaction processor (192.168.1.109:7051) returned error 'rpc error: code = Unknown desc = chaincode error (status: 500, message: CODE50005.The usrMember already exists: UsrMembers:5760170)' for proposal: {{5163ba556beaec2d2a3a41aa56dbdd0ee52e197c7e24e3d5b7973dda8870ade0 [106 84 88 91 166 157 191 45 81 1 201 33 30 9 137 62 32 31 2

3.2 查询kafka topic offset的最大最小值。

docker exec -it kafka0 bash
cd /opt/kafka

# offset最小值
root@82c90a0a1bbe:/opt/kafka# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.209:9092 -topic pubchain --time -2
pubchain:0:0
# offset最大值
root@82c90a0a1bbe:/opt/kafka# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.209:9092 -topic pubchain --time -1
pubchain:0:32
root@82c90a0a1bbe:/opt/kafka# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.209:10092 -topic pubchain --time -1
pubchain:0:32
root@82c90a0a1bbe:/opt/kafka# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.209:11092 -topic pubchain --time -1
pubchain:0:32
root@82c90a0a1bbe:/opt/kafka# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.1.209:12092 -topic pubchain --time -1
pubchain:0:32

暴力宕机会影响kafka offset值,实际consumer中宕机前一刻消息是在in memory,并未fsync到硬盘上,所以重启恢复topic/partition内消息有丢失。
最关键的参数是:KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=1


version: '2.1'

services:

  zookeeper:
    image: {{ zookeeper.image }}
    restart: always
    environment:
      - ZOO_SERVERS={% for node in zookeeper.nodes %}server.{{ node.id }}={{ node.host }}:{{ node.peer_port }}:{{ node.election_port }} {% endfor %}

    logging:
      driver: "json-file"
      options:
        max-size: "100m"
        max-file: "10"
    network_mode: host

  kafka:
    image: {{ kafka.image }}
    restart: always
    environment:
      - KAFKA_ZOOKEEPER_CONNECT={% for node in zookeeper.nodes %}{{ node.host }}:{{ node.service_port }},{% endfor %}

      - KAFKA_ADVERTISED_HOST_NAME={{ inventory_hostname }}
      - KAFKA_MESSAGE_MAX_BYTES=103809024 # 99 * 1024 * 1024 B
      - KAFKA_REPLICA_FETCH_MAX_BYTES=103809024 # 99 * 1024 * 1024 B
      - KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false
      - KAFKA_LOG_DIRS=/data/kafka-logs
      - KAFKA_LOG_RETENTION_MS=-1
      - KAFKA_MIN_INSYNC_REPLICAS=4
      - KAFKA_LOG_FLUSH_INTERVAL_MS=1
      - KAFKA_LOG_FLUSH_OFFSET_CHECKPOINT_INTERVAL_MS=1
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=4
      - KAFKA_REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS=1
      - KAFKA_LOG_FLUSH_INTERVAL_MESSAGES=1
      - KAFKA_DEFAULT_REPLICATION_FACTOR=4
    logging:
      driver: "json-file"
      options:
        max-size: "100m"
        max-file: "10"
                                                                                                                                                                                                                                                                                                                                                                                                           
"deployer/fabric/roles/compose-base/templates/zookaf-base.yaml.j2" 41L, 1327C            
docker exec -it zookeeper0 bash

root@ubuntu:/zookeeper-3.4.9# cat /conf/zoo.cfg 
clientPort=2182
dataDir=/data
dataLogDir=/datalog
tickTime=2000
initLimit=5
syncLimit=2
server.1=192.168.1.109:2888:3888
server.2=192.168.1.109:2889:3889
server.3=192.168.1.109:2890:3890
root@ubuntu:/zookeeper-3.4.9# 
root@ubuntu:/zookeeper-3.4.9# ls /data
myid  version-2
root@ubuntu:/zookeeper-3.4.9# cat /data/myid 
2
root@ubuntu:/zookeeper-3.4.9# ls /datalog/          
version-2
root@ubuntu:/zookeeper-3.4.9# ll /datalog/version-2/
total 60
drwxr-xr-x 2 zookeeper zookeeper     4096 Aug  6 01:50 ./
drwxr-xr-x 3 zookeeper zookeeper     4096 Aug  1 08:40 ../
-rw-r--r-- 1 zookeeper zookeeper 67108880 Aug  1 08:41 log.100000001
-rw-r--r-- 1 zookeeper zookeeper 67108880 Aug  2 01:27 log.200000001
-rw-r--r-- 1 zookeeper zookeeper 67108880 Aug  2 09:05 log.300000001
-rw-r--r-- 1 zookeeper zookeeper 67108880 Aug  3 02:01 log.400000001
-rw-r--r-- 1 zookeeper zookeeper 67108880 Aug  6 09:21 log.500000001
root@ubuntu:/zookeeper-3.4.9# ll /data/version-2/
total 36
drwxr-xr-x 2 zookeeper zookeeper 4096 Aug  6 01:50 ./
drwxr-xr-x 3 zookeeper zookeeper 4096 Aug  1 08:40 ../
-rw-r--r-- 1 zookeeper zookeeper    1 Aug  6 01:50 acceptedEpoch
-rw-r--r-- 1 zookeeper zookeeper    1 Aug  6 01:50 currentEpoch
-rw-r--r-- 1 zookeeper zookeeper  296 Aug  1 08:40 snapshot.0
-rw-r--r-- 1 zookeeper zookeeper 3981 Aug  2 01:26 snapshot.100000044
-rw-r--r-- 1 zookeeper zookeeper 3981 Aug  2 09:05 snapshot.200000021
-rw-r--r-- 1 zookeeper zookeeper 3981 Aug  3 02:01 snapshot.300000038
-rw-r--r-- 1 zookeeper zookeeper 3981 Aug  6 01:50 snapshot.40000003a

root@ubuntu:/zookeeper-3.4.9# cat /data/version-2/acceptedEpoch 
5
root@ubuntu:/zookeeper-3.4.9# cat /data/version-2/currentEpoch 
5


bin/zkCli.sh

[zk: localhost:2181(CONNECTED) 0] ls /
[controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, config]


[zk: localhost:2181(CONNECTED) 3] get /controller_epoch
8
cZxid = 0x100000025
ctime = Wed Aug 01 08:40:49 UTC 2018
mZxid = 0x50000000a
mtime = Mon Aug 06 01:51:00 UTC 2018
pZxid = 0x100000025
cversion = 0
dataVersion = 7
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 1
numChildren = 0


[zk: localhost:2181(CONNECTED) 4] get /controller
{"version":1,"brokerid":2,"timestamp":"1533520260357"}
cZxid = 0x500000009
ctime = Mon Aug 06 01:51:00 UTC 2018
mZxid = 0x500000009
mtime = Mon Aug 06 01:51:00 UTC 2018
pZxid = 0x500000009
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2650cee11cb0000
dataLength = 54
numChildren = 0

[zk: localhost:2181(CONNECTED) 15] ls /brokers
[ids, topics, seqid]
[zk: localhost:2181(CONNECTED) 6] get /brokers
null
cZxid = 0x100000008
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000008
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x100000020
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 3

[zk: localhost:2181(CONNECTED) 16] get /brokers/ids
null
cZxid = 0x10000000a
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x10000000a
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x500000017
cversion = 36
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 4
[zk: localhost:2181(CONNECTED) 19] ls /brokers/ids
[0, 1, 2, 3]
[zk: localhost:2181(CONNECTED) 20] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1533520261153","endpoints":["PLAINTEXT://192.168.1.109:9092"],"host":"192.168.1.109","version":2,"port":9092}
cZxid = 0x500000014
ctime = Mon Aug 06 01:51:01 UTC 2018
mZxid = 0x500000014
mtime = Mon Aug 06 01:51:01 UTC 2018
pZxid = 0x500000014
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3650cee11d70001
dataLength = 137
numChildren = 0
[zk: localhost:2181(CONNECTED) 21] get /brokers/ids/1
{"jmx_port":-1,"timestamp":"1533520260892","endpoints":["PLAINTEXT://192.168.1.109:10092"],"host":"192.168.1.109","version":2,"port":10092}
cZxid = 0x50000000d
ctime = Mon Aug 06 01:51:00 UTC 2018
mZxid = 0x50000000d
mtime = Mon Aug 06 01:51:00 UTC 2018
pZxid = 0x50000000d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3650cee11d70000
dataLength = 139
numChildren = 0
[zk: localhost:2181(CONNECTED) 22] get /brokers/ids/2
{"jmx_port":-1,"timestamp":"1533520261541","endpoints":["PLAINTEXT://192.168.1.109:11092"],"host":"192.168.1.109","version":2,"port":11092}
cZxid = 0x500000017
ctime = Mon Aug 06 01:51:01 UTC 2018
mZxid = 0x500000017
mtime = Mon Aug 06 01:51:01 UTC 2018
pZxid = 0x500000017
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x2650cee11cb0000
dataLength = 139
numChildren = 0
[zk: localhost:2181(CONNECTED) 23] get /brokers/ids/3
{"jmx_port":-1,"timestamp":"1533520261178","endpoints":["PLAINTEXT://192.168.1.109:12092"],"host":"192.168.1.109","version":2,"port":12092}
cZxid = 0x500000013
ctime = Mon Aug 06 01:51:01 UTC 2018
mZxid = 0x500000013
mtime = Mon Aug 06 01:51:01 UTC 2018
pZxid = 0x500000013
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x3650cee11d70002
dataLength = 139
numChildren = 0


[zk: localhost:2181(CONNECTED) 17] get /brokers/topics
null
cZxid = 0x10000000c
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x10000000c
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x10000003f
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2
[zk: localhost:2181(CONNECTED) 24] ls /brokers/topics
[testchainid, pubchain]
[zk: localhost:2181(CONNECTED) 25] get /brokers/topics/testchainid
{"version":1,"partitions":{"0":[0,2,3]}}
cZxid = 0x100000036
ctime = Wed Aug 01 08:40:57 UTC 2018
mZxid = 0x100000036
mtime = Wed Aug 01 08:40:57 UTC 2018
pZxid = 0x100000039
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 40
numChildren = 1
[zk: localhost:2181(CONNECTED) 26] ls /brokers/topics/testchainid
[partitions]
[zk: localhost:2181(CONNECTED) 27] get /brokers/topics/testchainid/partitions
null
cZxid = 0x100000039
ctime = Wed Aug 01 08:40:57 UTC 2018
mZxid = 0x100000039
mtime = Wed Aug 01 08:40:57 UTC 2018
pZxid = 0x10000003a
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 28] ls /brokers/topics/testchainid/partitions
[0]
[zk: localhost:2181(CONNECTED) 29] ls /brokers/topics/testchainid/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 32] get /brokers/topics/testchainid/partitions/0     
null
cZxid = 0x10000003a
ctime = Wed Aug 01 08:40:57 UTC 2018
mZxid = 0x10000003a
mtime = Wed Aug 01 08:40:57 UTC 2018
pZxid = 0x10000003b
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

[zk: localhost:2181(CONNECTED) 30] ls /brokers/topics/testchainid/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 33] get /brokers/topics/testchainid/partitions/0/state
{"controller_epoch":8,"leader":0,"version":1,"leader_epoch":7,"isr":[3,0,2]}
cZxid = 0x10000003b
ctime = Wed Aug 01 08:40:57 UTC 2018
mZxid = 0x50000001a
mtime = Mon Aug 06 01:51:03 UTC 2018
pZxid = 0x10000003b
cversion = 0
dataVersion = 14
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 76
numChildren = 0
[zk: localhost:2181(CONNECTED) 0] ls /brokers/topics/pubchain
[partitions]
[zk: localhost:2181(CONNECTED) 1] get /brokers/topics/pubchain
{"version":1,"partitions":{"0":[0,3,1]}}
cZxid = 0x10000003f
ctime = Wed Aug 01 08:41:26 UTC 2018
mZxid = 0x10000003f
mtime = Wed Aug 01 08:41:26 UTC 2018
pZxid = 0x100000042
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 40
numChildren = 1



[zk: localhost:2181(CONNECTED) 18] get /brokers/seqid
null
cZxid = 0x100000020
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000020
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x100000020
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0
[zk: localhost:2181(CONNECTED) 3] get /brokers/topics/pubchain/partitions
null
cZxid = 0x100000042
ctime = Wed Aug 01 08:41:26 UTC 2018
mZxid = 0x100000042
mtime = Wed Aug 01 08:41:26 UTC 2018
pZxid = 0x100000043
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 6] ls /brokers/topics/pubchain/partitions/0
[state]
[zk: localhost:2181(CONNECTED) 7] ls /brokers/topics/pubchain/partitions/0/state
[]
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/pubchain/partitions/0      
null
cZxid = 0x100000043
ctime = Wed Aug 01 08:41:26 UTC 2018
mZxid = 0x100000043
mtime = Wed Aug 01 08:41:26 UTC 2018
pZxid = 0x100000044
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 9] get /brokers/topics/pubchain/partitions/0/state
{"controller_epoch":8,"leader":0,"version":1,"leader_epoch":6,"isr":[3,1,0]}
cZxid = 0x100000044
ctime = Wed Aug 01 08:41:26 UTC 2018
mZxid = 0x500000018
mtime = Mon Aug 06 01:51:01 UTC 2018
pZxid = 0x100000044
cversion = 0
dataVersion = 10
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 76
numChildren = 0



[zk: localhost:2181(CONNECTED) 8] get /zookeeper

cZxid = 0x0
ctime = Thu Jan 01 00:00:00 UTC 1970
mZxid = 0x0
mtime = Thu Jan 01 00:00:00 UTC 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

[zk: localhost:2181(CONNECTED) 9] ls /admin
[delete_topics]
[zk: localhost:2181(CONNECTED) 3] get /admin
null
cZxid = 0x10000001b
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x10000001b
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x10000001d
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
[zk: localhost:2181(CONNECTED) 4] get /admin/delete_topics
null
cZxid = 0x10000001d
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x10000001d
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x10000001d
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0


[zk: localhost:2181(CONNECTED) 10] ls /isr_change_notification
[]
[zk: localhost:2181(CONNECTED) 6] get /isr_change_notification
null
cZxid = 0x100000021
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000021
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x50000001c
cversion = 8
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0


[zk: localhost:2181(CONNECTED) 11] ls /consumers
[]
[zk: localhost:2181(CONNECTED) 7] get /consumers
null
cZxid = 0x100000004
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000004
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0


[zk: localhost:2181(CONNECTED) 12] ls /config
[changes, clients, topics]
[zk: localhost:2181(CONNECTED) 8] get /config
null
cZxid = 0x100000011
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000011
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x100000017
cversion = 3
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 3


[zk: localhost:2181(CONNECTED) 13] ls /config/changes
[]
[zk: localhost:2181(CONNECTED) 9] get /config/changes
null
cZxid = 0x100000013
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000013
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x100000013
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0

[zk: localhost:2181(CONNECTED) 14] ls /config/clients
[]
[zk: localhost:2181(CONNECTED) 10] get  /config/clients
null
cZxid = 0x100000017
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000017
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x100000017
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 0

[zk: localhost:2181(CONNECTED) 15] ls /config/topics 
[testchainid, pubchain]
[zk: localhost:2181(CONNECTED) 11] get /config/topics 
null
cZxid = 0x100000015
ctime = Wed Aug 01 08:40:46 UTC 2018
mZxid = 0x100000015
mtime = Wed Aug 01 08:40:46 UTC 2018
pZxid = 0x10000003e
cversion = 2
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2



[zk: localhost:2181(CONNECTED) 16] ls /config/topics/testchainid
[]
[zk: localhost:2181(CONNECTED) 12] get /config/topics/testchainid
{"version":1,"config":{}}
cZxid = 0x100000035
ctime = Wed Aug 01 08:40:57 UTC 2018
mZxid = 0x100000035
mtime = Wed Aug 01 08:40:57 UTC 2018
pZxid = 0x100000035
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 25
numChildren = 0

[zk: localhost:2181(CONNECTED) 17] ls /config/topics/pubchain   
[]
[zk: localhost:2181(CONNECTED) 13] get /config/topics/pubchain   
{"version":1,"config":{}}
cZxid = 0x10000003e
ctime = Wed Aug 01 08:41:26 UTC 2018
mZxid = 0x10000003e
mtime = Wed Aug 01 08:41:26 UTC 2018
pZxid = 0x10000003e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 25
numChildren = 0


3.3 进入kafka container,查看topics信息,与zookeeper中的信息对应。

root@ubuntu:~# docker exec -it kafka0 bash
root@ubuntu:~# cd /opt/kafka
root@580a4814f6d1:/opt/kafka# bin/kafka-topics.sh --list --zookeeper 192.168.1.109:2181
pubchain
testchainid
##因KAFKA_DEFAULT_REPLICATION_FACTOR=3,有3个replicas
root@580a4814f6d1:/opt/kafka# bin/kafka-topics.sh --describe --zookeeper 192.168.1.109:2181 --topic pubchain
Topic:pubchain	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: pubchain	Partition: 0	Leader: 0	Replicas: 0,3,1	Isr: 0,1,3
root@580a4814f6d1:/opt/kafka# bin/kafka-topics.sh --describe --zookeeper 192.168.1.109:2181 --topic testchainid
Topic:testchainid	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: testchainid	Partition: 0	Leader: 0	Replicas: 0,2,3	Isr: 0,2,3

###经分析,现在kafka只配置了3个replicas,针对pubchain,存在kafka2无备份数据的情况。测试将0,1,3均docker stop,看consumer的情况。
root@070beb32f85f:/opt/kafka# bin/kafka-console-producer.sh --broker-list 192.168.1.109:11092 --topic pubchain
root@070beb32f85f:/opt/kafka# bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.109:11092 --from-beginning --topic pubchain --zookeeper 192.168.1.109:2181
###当把0,1,3,则orerder log直接有报错日志:processMessagesToBlocks -> ERRO 285 [channel: pubchain] Error during consumption: kafka: error while consuming pubchain/0: kafka server: In the middle of a leadership election, there is currently
 no leader for this partition and hence it is unavailable for writes.
 
#当只启动3,因KAFKA_MIN_INSYNC_REPLICAS=2,此时Isr只有1个,不满足条件,报错:[orderer/kafka] Enqueue -> ERRO 17bd [channel: pubchain] cannot enqueue envelope = kafka server: Messages are rejected since there are fewer in-sync replicas than required.

3.4 yaml文件中配置信息为如下

      - KAFKA_ADVERTISED_HOST_NAME={{ inventory_hostname }}
      - KAFKA_MESSAGE_MAX_BYTES=103809024 # 99 * 1024 * 1024 B
      - KAFKA_REPLICA_FETCH_MAX_BYTES=103809024 # 99 * 1024 * 1024 B
      - KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false
      - KAFKA_LOG_DIRS=/data/kafka-logs
      - KAFKA_LOG_RETENTION_MS=-1
      - KAFKA_MIN_INSYNC_REPLICAS=2
      - KAFKA_DEFAULT_REPLICATION_FACTOR=3

在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某一个partition的所有replica都挂了,就无法保证数据不丢失了。这种情况下有两种可行的方案:
1.等待ISR中任意一个replica“活”过来,并且选它作为leader 【KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=false】
2.选择第一个“活”过来的replica(并不一定是在ISR中)作为leader 【默认为这个KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=true】

kafka数据可靠性深度解读,非常好的文章,推荐学习。
zookeeper在kafka中的作用,有关于kafka在zookeeper中记录的临时节点信息。

3.5 fabric/orderer/config.go中有如下配置信息:

// Set the level of acknowledgement reliability needed from the broker.
// WaitForAll means that the partition leader will wait till all ISRs got
// the message before sending back an ACK to the sender.
brokerConfig.Producer.RequiredAcks = sarama.WaitForAll

# sarama中定义为
// The minimum number of in-sync replicas is configured on the broker via
// the `min.insync.replicas` configuration key.
WaitForAll RequiredAcks = -1
root@bc-p1:~# docker exec -it kafka0 bash
root@36b7d45ecbdc:/# cd /opt/kafka/
root@36b7d45ecbdc:/opt/kafka# bin/kafka-topics.sh --describe --zookeeper 172.16.141.61:2181 --topic pubchain
Topic:pubchain	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: pubchain	Partition: 0	Leader: 1	Replicas: 1,0,2	Isr: 1,0,2
root@36b7d45ecbdc:/opt/kafka# bin/kafka-topics.sh --describe --zookeeper 172.16.141.61:2181 --topic testchainid
Topic:testchainid	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: testchainid	Partition: 0	Leader: 2	Replicas: 2,3,0	Isr: 2,3,0

3.6 搭建zookeeper和kafka环境。

cd /home/zyd/zookeeper-3.4.12/
./bin/zkServer.sh start-foreground conf/zoo1.cfg
./bin/zkServer.sh start-foreground conf/zoo2.cfg
./bin/zkServer.sh start-foreground conf/zoo3.cfg


cd /home/zyd/kafka_2.11-2.0.0
./bin/kafka-server-start.sh ./config/server.properties

cd /home/zyd/zookeeper-3.4.12/
./bin/zkCli.sh -server 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

[zk: 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183(CONNECTED) 0] ls /
[cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]


Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐