最近调试Spark Streaming + Kafka,遇到一个连接错误, 任务日志如下。
2019-06-22 15:36:47,516 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | Kafka version : 1.0.1 | org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:109)
2019-06-22 15:36:47,516 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | Kafka commitId : c0518aa65f25317e | org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:110)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set key.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set value.deserializer to org.apache.kafka.common.serialization.ByteArrayDeserializer, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set auto.offset.reset to none, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,520 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set group.id to spark-kafka-source-be3ec2f0-7031-4057-a255-1eb47dee271b--2020720649-executor, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,521 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set enable.auto.commit to false, earlier value: | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,521 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | executor: Set receive.buffer.bytes to 65536 | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,537 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | Starting new streaming query. | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
2019-06-22 15:36:47,614 | WARN | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-be3ec2f0-7031-4057-a255-1eb47dee271b--2020720649-driver-0] Connection to node -1 could not be established. Broker may not be available. | org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)
2019-06-22 15:36:47,617 | WARN | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | [Consumer clientId=consumer-1, groupId=spark-kafka-source-be3ec2f0-7031-4057-a255-1eb47dee271b--2020720649-driver-0] Connection to node -2 could not be established. Broker may not be available. | org.apache.kafka.common.utils.LogContext$KafkaLogger.warn(LogContext.java:241)
报错之前,ConsumerConfig 被打印出来,其中bootstrap.servers显示我们指定的brokers列表。
2019-06-22 15:36:47,463 | INFO | [stream execution thread for [id = c2783ada-0f4f-4d9e-b99a-e016472cc182, runId = 2ab8955b-6f56-4d29-a114-61f0b2503f5b]] | ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [10.80.123.9:9092, 10.80.123.17:9092, 10.80.123.20:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
根据kafkatool通过ZK可以成功连接的现象,查看/etc/hosts,发现brokers的地址是10.80.124.x;采用Java测试代码验证,发现10.80.124.x确实可以工作。结合代码分析(见下文),判断我们指定的bootstrap.servers参数有误。
接下来,从源代码分析NetworkClient是如何工作的。
Spark Streaming 内置提供2种类型的输入源:
- 基本输入源,比如文件系统或socket连接
- 高级输入源,Kafka, Flume, Kinesis等需要依赖额外的工具类。
具体包【类】依赖关系如下:
spark-streaming-kafka-010【Subscribe】
【+】 kafka-clients【KafkaConsumer】
【+】 kafka-clients【ConsumerNetworkClient】
【+】 kafka-clients【NetworkClient】
Subscribe在指定的TopicPartition和offset调用onStart并返回接口Comsumer对象。该对象初始化过程创建了NetworkClient对象。
private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
int heartbeatIntervalMs = config.getInt("heartbeat.interval.ms");
NetworkClient netClient = new NetworkClient(new Selector(config.getLong("connections.max.idle.ms"), this.metrics, this.time, metricGrpPrefix, channelBuilder, logContext), this.metadata, clientId, 100, config.getLong("reconnect.backoff.ms"), config.getLong("reconnect.backoff.max.ms"), config.getInt("send.buffer.bytes"), config.getInt("receive.buffer.bytes"), config.getInt("request.timeout.ms"), ClientDnsLookup.forConfig(config.getString("client.dns.lookup")), this.time, true, new ApiVersions(), throttleTimeSensor, logContext);
this.client = new ConsumerNetworkClient(logContext, netClient, this.metadata, this.time, this.retryBackoffMs, config.getInt("request.timeout.ms"), heartbeatIntervalMs);
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString("auto.offset.reset").toUpperCase(Locale.ROOT));
this.coordinator = this.groupId == null ? null : new ConsumerCoordinator(logContext, this.client, this.groupId, maxPollIntervalMs, sessionTimeoutMs, new Heartbeat(this.time, sessionTimeoutMs, heartbeatIntervalMs, maxPollIntervalMs, this.retryBackoffMs), this.assignors, this.metadata, this.subscriptions, this.metrics, metricGrpPrefix, this.time, this.retryBackoffMs, enableAutoCommit, config.getInt("auto.commit.interval.ms"), this.interceptors, config.getBoolean("exclude.internal.topics"), config.getBoolean("internal.leave.group.on.close"));
this.fetcher = new Fetcher(logContext, this.client, config.getInt("fetch.min.bytes"), config.getInt("fetch.max.bytes"), config.getInt("fetch.max.wait.ms"), config.getInt("max.partition.fetch.bytes"), config.getInt("max.poll.records"), config.getBoolean("check.crcs"), this.keyDeserializer, this.valueDeserializer, this.metadata, this.subscriptions, this.metrics, metricsRegistry.fetcherMetrics, this.time, this.retryBackoffMs, this.requestTimeoutMs, isolationLevel);
} catch (Throwable var24) {
this.close(0L, true);
throw new KafkaException("Failed to construct kafka consumer", var24);
}
NetworkClient主要用于和Kafka集群节点(Node)保持连接和通讯。
public List<ClientResponse> poll(long timeout, long now) {
this.ensureActive();
if (!this.abortedSends.isEmpty()) {
List<ClientResponse> responses = new ArrayList();
this.handleAbortedSends(responses);
this.completeResponses(responses);
return responses;
} else {
long metadataTimeout = this.metadataUpdater.maybeUpdate(now);
try {
this.selector.poll(Utils.min(timeout, new long[]{metadataTimeout, (long)this.defaultRequestTimeoutMs}));
} catch (IOException var10) {
this.log.error("Unexpected error during I/O", var10);
}
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList();
this.handleCompletedSends(responses, updatedNow);
this.handleCompletedReceives(responses, updatedNow);
this.handleDisconnections(responses, updatedNow);
this.handleConnections();
this.handleInitiateApiVersionRequests(updatedNow);
this.handleTimedOutRequests(responses, updatedNow);
this.completeResponses(responses);
return responses;
}
}
当要连接的node断开之后,会保留在selectable对象中并在每次poll时检查并输出异常。
private void handleDisconnections(List<ClientResponse> responses, long now) {
Iterator var4 = this.selector.disconnected().entrySet().iterator();
while(var4.hasNext()) {
Entry<String, ChannelState> entry = (Entry)var4.next();
String node = (String)entry.getKey();
this.log.debug("Node {} disconnected.", node);
this.processDisconnection(responses, node, now, (ChannelState)entry.getValue());
}
if (this.selector.disconnected().size() > 0) {
this.metadataUpdater.requestUpdate();
}
}
在processDisconnection方法中,处理NOT_CONNECTED的情况。
private void processDisconnection(List<ClientResponse> responses, String nodeId, long now, ChannelState disconnectState) {
this.connectionStates.disconnected(nodeId, now);
this.apiVersions.remove(nodeId);
this.nodesNeedingApiVersionsFetch.remove(nodeId);
switch(disconnectState.state()) {
case AUTHENTICATION_FAILED:
AuthenticationException exception = disconnectState.exception();
this.connectionStates.authenticationFailed(nodeId, now, exception);
this.metadataUpdater.handleAuthenticationFailure(exception);
this.log.error("Connection to node {} ({}) failed authentication due to: {}", new Object[]{nodeId, disconnectState.remoteAddress(), exception.getMessage()});
break;
case AUTHENTICATE:
this.log.warn("Connection to node {} ({}) terminated during authentication. This may happen due to any of the following reasons: (1) Authentication failed due to invalid credentials with brokers older than 1.0.0, (2) Firewall blocking Kafka TLS traffic (eg it may only allow HTTPS traffic), (3) Transient network issue.", nodeId, disconnectState.remoteAddress());
break;
case NOT_CONNECTED:
this.log.warn("Connection to node {} ({}) could not be established. Broker may not be available.", nodeId, disconnectState.remoteAddress());
}
Iterator var8 = this.inFlightRequests.clearAll(nodeId).iterator();
while(var8.hasNext()) {
NetworkClient.InFlightRequest request = (NetworkClient.InFlightRequest)var8.next();
this.log.trace("Cancelled request {} {} with correlation id {} due to node {} being disconnected", new Object[]{request.header.apiKey(), request.request, request.header.correlationId(), nodeId});
if (!request.isInternalRequest) {
responses.add(request.disconnected(now, disconnectState.exception()));
} else if (request.header.apiKey() == ApiKeys.METADATA) {
this.metadataUpdater.handleDisconnection(request.destination);
}
}
}
全文完。
所有评论(0)