之前一直用kafka,也了解了kafka的本身的一些机制,包括顺序读、顺序写、零拷贝、分治、水位等。但一直没详细的了解下kafka消费端是如何工作的。 趁着假期分析下,环境如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/>
    </parent>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.5.1</version>
    </dependency>

我们都知道如何使用,yml文件中配置一下,加个注解

spring:
  kafka:
    bootstrap-servers: 10.255.200.214:9092
    consumer:
      group-id: ${spring.application.name}_consumer
      enable-auto-commit: true
      auto-commit-interval: 5000
      auto-offset-reset: latest  #earliest
      max-poll-records: 1
    producer:
      client-id: ${spring.application.name}_producer
      retries: 3
      batch-size: 1048576
      buffer-memory: 6291456
      acks: all
      compression-type: gzip
    listener:
      type: batch
      concurrency: 5

```java
 @KafkaListener(topics = {"topic"}, containerFactory = "kafkaListenerContainerFactory")
    public void single(ConsumerRecord<String, String> consumerRecord) {
        
 }

我们先看下kafka如何被引入到spring中的

在spring的autoconfigure包中。

KafkaAutoConfiguration 中Import了KafkaAnnotationDrivenConfiguration

@Import({ KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
    
}

KafkaAnnotationDrivenConfiguration中定义了Bean EnableKafkaConfiguration,用了@EnableKafka,同时在这个类里还定义了

AbstractKafkaListenerContainerFactory的实现类:ConcurrentKafkaListenerContainerFactory

class KafkaAnnotationDrivenConfiguration {
    @Bean
	@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
	ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
			ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		//这里将ConcurrentKafkaListenerContainerFactoryConfigurer中的参数映射了到ConcurrentKafkaListenerContainerFactory中,所以factory有各种要插入的参数以及记录拦截器、消息转换器、异常处理器等
		configurer.configure(factory, kafkaConsumerFactory
				.getIfAvailable(() -> new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
		return factory;
	}
	
    @Configuration(proxyBeanMethods = false)
	@EnableKafka
	@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	static class EnableKafkaConfiguration {

	}
}

@EnableKafka中import了KafkaListenerConfigurationSelector

@Import(KafkaListenerConfigurationSelector.class)
public @interface EnableKafka {
}

KafkaListenerConfigurationSelector最终导入了KafkaBootstrapConfiguration

@Order
public class KafkaListenerConfigurationSelector implements DeferredImportSelector {

	@Override
	public String[] selectImports(AnnotationMetadata importingClassMetadata) {
		return new String[] { KafkaBootstrapConfiguration.class.getName() };
	}

}

KafkaBootstrapConfiguration通过registerBeanDefinitions注入了 KafkaListenerAnnotationBeanPostProcessor和KafkaListenerEndpointRegistry

public class KafkaBootstrapConfiguration implements ImportBeanDefinitionRegistrar {

	@Override
	public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
		if (!registry.containsBeanDefinition(
				KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)) {

			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerAnnotationBeanPostProcessor.class));
		}

		if (!registry.containsBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)) {
			registry.registerBeanDefinition(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
					new RootBeanDefinition(KafkaListenerEndpointRegistry.class));
		}
	}

}

在KafkaListenerAnnotationBeanPostProcessor中因为实现了BeanPostProcessor所以具体可以看下postProcessAfterInitialization(bean实例化以后会触发这里),在这里将注解为KafkaListener的方法找了出来,并组装实例,启动

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
		implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    @Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
					(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				this.logger.trace(() -> "No @KafkaListener annotations found on bean type: " + bean.getClass());
			}
			else {
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				this.logger.debug(() -> annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
			}
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}		    
}

到这里,我们拆为两个分支,一条线看下postProcessAfterInitialization如何被执行,一条线看如何加载KafkaListener

postProcessAfterInitialization如何被执行

BeanPostProcessor是Spring IOC容器提供的一个扩展接口

public interface BeanPostProcessor {
    //bean初始化调用前调用(全局)
    @Nullable
	default Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
		return bean;
	}
	//bean初始化调用后调用(全局)
	@Nullable
	default Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		return bean;
	}
}

public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory implements AutowireCapableBeanFactory {
   // 在applyBeanPostProcessorsAfterInitialization中调用了postProcessAfterInitialization
	@Override
	public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName) throws BeansException {
		Object result = existingBean;
		//获取所有的BeanPostProcessor对象(这块是关键点)
		for (BeanPostProcessor processor : getBeanPostProcessors()) {
		    //挨个执行postProcessAfterInitialization
			Object current = processor.postProcessAfterInitialization(result, beanName);
		}
		return result;
	}
	
	// applyBeanPostProcessorsAfterInitialization 被以下三个方法(resolveBeforeInstantiation,initializeBean,postProcessObjectFromFactoryBean)调用
	
	protected Object resolveBeforeInstantiation(String beanName, RootBeanDefinition mbd) {
	    bean = applyBeanPostProcessorsAfterInitialization(bean, beanName);
	}
	//实例化bean(必走方法)
	protected Object initializeBean(String beanName, Object bean, @Nullable RootBeanDefinition mbd) {
		// Aware处理
		invokeAwareMethods(beanName, bean);

	    // 前置处理
		wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);

		// 执行initMethod
		invokeInitMethods(beanName, wrappedBean, mbd);
		
		// 后置处理
		wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);

		return wrappedBean;
	}
	protected Object postProcessObjectFromFactoryBean(Object object, String beanName) {
		return applyBeanPostProcessorsAfterInitialization(object, beanName);
	}
	
	//我们重点跟进下initializeBean,initializeBean被以下三个方法(configureBean,initializeBean,doCreateBean)调用
	

	public Object configureBean(Object existingBean, String beanName) throws BeansException {
	    BeanWrapper bw = new BeanWrapperImpl(existingBean);
		initBeanWrapper(bw);
		populateBean(beanName, bd, bw);
		return initializeBean(beanName, existingBean, bd);
	}
	
	public Object initializeBean(Object existingBean, String beanName) {
		return initializeBean(beanName, existingBean, null);
	}
	//实例化Bean
	protected Object doCreateBean(String beanName, RootBeanDefinition mbd, @Nullable Object[] args)
			throws BeanCreationException {
		BeanWrapper instanceWrapper = null;
		if (mbd.isSingleton()) {
			instanceWrapper = this.factoryBeanInstanceCache.remove(beanName);
		}
		if (instanceWrapper == null) {
			instanceWrapper = createBeanInstance(beanName, mbd, args);
		}
		Object bean = instanceWrapper.getWrappedInstance();
		// Initialize the bean instance.
		Object exposedObject = bean;
		try {
		    //构建Bean,主要是init-method和依赖bean的处理
			populateBean(beanName, mbd, instanceWrapper);
			//实例化Bean
			exposedObject = initializeBean(beanName, exposedObject, mbd);
		}
		// Register bean as disposable.
		try {
			registerDisposableBeanIfNecessary(beanName, bean, mbd);
		}
	}
	//构建Bean
	protected void populateBean(String beanName, RootBeanDefinition mbd, @Nullable BeanWrapper bw) {
	
		PropertyValues pvs = (mbd.hasPropertyValues() ? mbd.getPropertyValues() : null);
        //根据AutowireMode构建类型,这里会把依赖bean也注册了
		int resolvedAutowireMode = mbd.getResolvedAutowireMode();
		if (resolvedAutowireMode == AUTOWIRE_BY_NAME || resolvedAutowireMode == AUTOWIRE_BY_TYPE) {
			MutablePropertyValues newPvs = new MutablePropertyValues(pvs);
			// Add property values based on autowire by name if applicable.
			if (resolvedAutowireMode == AUTOWIRE_BY_NAME) {
				autowireByName(beanName, mbd, bw, newPvs);
			}
			// Add property values based on autowire by type if applicable.
			if (resolvedAutowireMode == AUTOWIRE_BY_TYPE) {
				autowireByType(beanName, mbd, bw, newPvs);
			}
			pvs = newPvs;
		}
        //
		boolean hasInstAwareBpps = hasInstantiationAwareBeanPostProcessors();
	 
		boolean needsDepCheck = (mbd.getDependencyCheck() != AbstractBeanDefinition.DEPENDENCY_CHECK_NONE);
		PropertyDescriptor[] filteredPds = null;
		if (hasInstAwareBpps) {
	
			//有初始化方法,设置属性
			for (BeanPostProcessor bp : getBeanPostProcessors()) {
				if (bp instanceof InstantiationAwareBeanPostProcessor) {
					InstantiationAwareBeanPostProcessor ibp = (InstantiationAwareBeanPostProcessor) bp;
					PropertyValues pvsToUse = ibp.postProcessProperties(pvs, bw.getWrappedInstance(), beanName);
					if (pvsToUse == null) {
						if (filteredPds == null) {
							filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
						}
						pvsToUse = ibp.postProcessPropertyValues(pvs, filteredPds, bw.getWrappedInstance(), beanName);
						if (pvsToUse == null) {
							return;
						}
					}
					pvs = pvsToUse;
				}
			}
		}
		if (needsDepCheck) {
		    //有依赖,设置依赖属性
			if (filteredPds == null) {
				filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
			}
			checkDependencies(beanName, mbd, filteredPds, pvs);
		}

		if (pvs != null) {
			applyPropertyValues(beanName, mbd, bw, pvs);
		}
	}

}

configureBean和initializeBean 最后追到BeanConfigurerSupport.configureBean 和

public class BeanConfigurerSupport implements BeanFactoryAware, InitializingBean, DisposableBean {
    public void configureBean(Object beanInstance) {
        beanFactory.initializeBean(beanInstance, (beanName != null ? beanName : ""));
        try {
			String beanName = bwi.getBeanName();
			if (bwi.indicatesAutowiring() || (bwi.isDefaultBeanName() && beanName != null &&
					!beanFactory.containsBean(beanName))) {
				// Perform autowiring (also applying standard factory / post-processor callbacks).
				beanFactory.autowireBeanProperties(beanInstance, bwi.getAutowireMode(), bwi.getDependencyCheck());
				beanFactory.initializeBean(beanInstance, (beanName != null ? beanName : ""));
			}
			else {
				// Perform explicit wiring based on the specified bean definition.
				beanFactory.configureBean(beanInstance, (beanName != null ? beanName : ""));
			}
		}
    }
    
}

doCreateBean 最后追到的地方就比较多了AbstractAutowireCapableBeanFactory.createBean、AbstractBeanFactory.createBean、BeanDefinitionValueResolver.resolveInnerBean 整理下流程

到这我们知道KafkaListenerAnnotationBeanPostProcessor是如何运行了吧

我们再看下如何加载KafkaListener
@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			//获取KafkaListener标注的类
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			final List<Method> multiMethods = new ArrayList<>();
			//解析方法上标注KafkaListener
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
			        //传入的回调函数
					(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
					    //通过回调函数将KafkaListener的方法都找出来
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					});
			if (hasClassLevelListeners) {
			    //类级别的监听需要结合@KafkaHandler执行
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
            //annotatedMethods非空时
			for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
				Method method = entry.getKey();
				for (KafkaListener listener : entry.getValue()) {
				    //处理kafkaListener 
					processKafkaListener(listener, method, bean, beanName);
				}
			}
			//类级别的监听特殊处理
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	}
	//通过回调函数将KafkaListener的方法都找出来
	private Set<KafkaListener> findListenerAnnotations(Method method) {
		Set<KafkaListener> listeners = new HashSet<>();
		KafkaListener ann = AnnotatedElementUtils.findMergedAnnotation(method, KafkaListener.class);
		if (ann != null) {
			listeners.add(ann);
		}
		KafkaListeners anns = AnnotationUtils.findAnnotation(method, KafkaListeners.class);
		if (anns != null) {
			listeners.addAll(Arrays.asList(anns.value()));
		}
		return listeners;
	}

我们看下 processKafkaListener中干了什么?

	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
	    //这里做了一次jdk动态代理的判断
		Method methodToUse = checkProxy(method, bean);
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
		endpoint.setMethod(methodToUse);
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	}
	protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
			Object bean, Object adminTarget, String beanName) {

		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.addListener(beanRef, bean);
		}
		// 填充MethodKafkaListenerEndpoint的实例对象
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		// 从KafkaListener注解中获取配置的topc,partition
		endpoint.setId(getEndpointId(kafkaListener));
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		//partitions配置
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		//可能有多个topic
		endpoint.setTopics(resolveTopics(kafkaListener));
		//这个应该通过表达式来拉取topic
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
		    //设置并行线程数
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}
		//解析其他额外的参数
		resolveKafkaProperties(endpoint, kafkaListener.properties());
		endpoint.setSplitIterables(kafkaListener.splitIterables());

		KafkaListenerContainerFactory<?> factory = null;
		//通过配置的工厂名称从ioc中获取实例,并设置进去,这个工厂是在KafkaAnnotationDrivenConfiguration定义的ConcurrentKafkaListenerContainerFactory
		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
		if (StringUtils.hasText(containerFactoryBeanName)) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			try {
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
		}

		endpoint.setBeanFactory(this.beanFactory);
		String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
		if (StringUtils.hasText(errorHandlerBeanName)) {
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
		}
		//最终将kafka的监听实例构建好以后注入到KafkaListenerEndpointRegistrar的实例registrar中
		this.registrar.registerEndpoint(endpoint, factory);
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.removeListener(beanRef);
		}
	}

我们在看下注入到KafkaListenerEndpointRegistrar的实例以后干了什么?

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
    public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
	
		//构建一个KafkaListenerEndpointDescriptor
		KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
		synchronized (this.endpointDescriptors) {
		    //如果this.startImmediately 立刻启动为true
			if (this.startImmediately) { // Register and start immediately
				// 直接启动
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
						resolveContainerFactory(descriptor), true);
			}
			else {
				this.endpointDescriptors.add(descriptor);
			}
		}
	}
}
public class KafkaListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware,
		ApplicationListener<ContextRefreshedEvent> {
	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
			boolean startImmediately) {
		String id = endpoint.getId();
		synchronized (this.listenerContainers) {
		    //最终将参数都包装到了MessageListenerContainer中,作为一个Listener,通过工厂创建,并将参数都初始化进去(ps这个工厂是ConcurrentKafkaListenerContainerFactory)最终创建的对象是ConcurrentMessageListenerContainer
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
				List<MessageListenerContainer> containerGroup;
				//通过这可以看到,kafka是以group为单元的,一个group只能有一个实例,有就获取,没有就创建一个单例的
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				}
				else {
					containerGroup = new ArrayList<MessageListenerContainer>();
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
			if (startImmediately) {
			    //自启动
				startIfNecessary(container);
			}
		}
	}
	//创建一个MessageListenerContainer
	protected MessageListenerContainer createListenerContainer(KafkaListenerEndpoint endpoint,
			KafkaListenerContainerFactory<?> factory) {
        //在ConcurrentKafkaListenerContainerFactory中实现最终调用的是AbstractKafkaListenerContainerFactory最终创建了一个ConcurrentMessageListenerContainer
		MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint);

		int containerPhase = listenerContainer.getPhase();
		if (listenerContainer.isAutoStartup() &&
				containerPhase != AbstractMessageListenerContainer.DEFAULT_PHASE) {  // a custom phase value
			if (this.phase != AbstractMessageListenerContainer.DEFAULT_PHASE && this.phase != containerPhase) {
				throw new IllegalStateException("Encountered phase mismatch between container "
						+ "factory definitions: " + this.phase + " vs " + containerPhase);
			}
			this.phase = listenerContainer.getPhase();
		}

		return listenerContainer;
	}
	private void startIfNecessary(MessageListenerContainer listenerContainer) {
		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
		 //所以最终是ConcurrentMessageListenerContainer.start()
			listenerContainer.start();
		}
	}
}


public class ConcurrentKafkaListenerContainerFactory<K, V>
		extends AbstractKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<K, V>, K, V> {
	@Override
	protected ConcurrentMessageListenerContainer<K, V> createContainerInstance(KafkaListenerEndpoint endpoint) {
		TopicPartitionOffset[] topicPartitions = endpoint.getTopicPartitionsToAssign();
		if (topicPartitions != null && topicPartitions.length > 0) {
			ContainerProperties properties = new ContainerProperties(topicPartitions);
			//最终还会到AbstractKafkaListenerContainerFactory的AbstractMessageListenerContainer
			return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
		}
		else {
			Collection<String> topics = endpoint.getTopics();
			if (!topics.isEmpty()) {
				ContainerProperties properties = new ContainerProperties(topics.toArray(new String[0]));
				return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
			}
			else {
				ContainerProperties properties = new ContainerProperties(endpoint.getTopicPattern());
				return new ConcurrentMessageListenerContainer<>(getConsumerFactory(), properties);
			}
		}
	}
	@Override
	protected void initializeContainer(ConcurrentMessageListenerContainer<K, V> instance,
			KafkaListenerEndpoint endpoint) {
        //最终又到抽象类中的initializeContainer
		super.initializeContainer(instance, endpoint);
		//设置线程数,一个是kafkaListener中的(优先),一个是配置中的
		if (endpoint.getConcurrency() != null) {
			instance.setConcurrency(endpoint.getConcurrency());
		}
		else if (this.concurrency != null) {
			instance.setConcurrency(this.concurrency);
		}
	}
}
//抽象类的关键方法
public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMessageListenerContainer<K, V>, K, V>
		implements KafkaListenerContainerFactory<C>, ApplicationEventPublisherAware, InitializingBean,
			ApplicationContextAware {
  	@Override
	public C createListenerContainer(KafkaListenerEndpoint endpoint) {
	    //通过ConcurrentKafkaListenerContainerFactory创建一个ConcurrentMessageListenerContainer
		C instance = createContainerInstance(endpoint);
		JavaUtils.INSTANCE
				.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
		if (endpoint instanceof AbstractKafkaListenerEndpoint) {
			configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
		}
        //设置消息转换器(内有玄机,是一个适配器)
		endpoint.setupListenerContainer(instance, this.messageConverter);
		initializeContainer(instance, endpoint);
		customizeContainer(instance);
		return instance;
	}
	//初始化指定容器,将一些配置都初始化进去
	protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
		ContainerProperties properties = instance.getContainerProperties();
		BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
				"messageListener", "ackCount", "ackTime", "subBatchPerPartition", "kafkaConsumerProperties");
		JavaUtils.INSTANCE
				.acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
				.acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
						properties::setAckCount)
				.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
						properties::setAckTime)
				.acceptIfNotNull(this.containerProperties.getSubBatchPerPartition(),
						properties::setSubBatchPerPartition)
				.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler)
				.acceptIfNotNull(this.missingTopicsFatal, instance.getContainerProperties()::setMissingTopicsFatal);
		if (endpoint.getAutoStartup() != null) {
			instance.setAutoStartup(endpoint.getAutoStartup());
		}
		else if (this.autoStartup != null) {
			instance.setAutoStartup(this.autoStartup);
		}
		instance.setRecordInterceptor(this.recordInterceptor);
		JavaUtils.INSTANCE
				.acceptIfNotNull(this.phase, instance::setPhase)
				.acceptIfNotNull(this.applicationContext, instance::setApplicationContext)
				.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
				.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
				.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
				.acceptIfNotNull(endpoint.getConsumerProperties(),
						instance.getContainerProperties()::setKafkaConsumerProperties);
	}

	private void customizeContainer(C instance) {
		if (this.containerCustomizer != null) {
			this.containerCustomizer.configure(instance);
		}
	}
}

start()执行,最终由AbstractMessageListenerContainer.start()重写了Lifecycle.start()方法,在start()方法里调用了抽象方法doStart()。dostart()方法最后由ConcurrentMessageListenerContainer实现

public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    	@Override
	protected void doStart() {
		if (!isRunning()) {
			checkTopics();
			ContainerProperties containerProperties = getContainerProperties();
			TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
			//如果线程数大于partitions的个数,那么线程数设置为partitions的个数
			if (topicPartitions != null && this.concurrency > topicPartitions.length) {
				this.concurrency = topicPartitions.length;
			}
			setRunning(true);
            //构建KafkaMessageListenerContainer
			for (int i = 0; i < this.concurrency; i++) {
				KafkaMessageListenerContainer<K, V> container =
						constructContainer(containerProperties, topicPartitions, i);
				String beanName = getBeanName();
				container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
				container.setApplicationContext(getApplicationContext());
				if (getApplicationEventPublisher() != null) {
					container.setApplicationEventPublisher(getApplicationEventPublisher());
				}
				container.setClientIdSuffix(this.concurrency > 1 || this.alwaysClientIdSuffix ? "-" + i : "");
				container.setGenericErrorHandler(getGenericErrorHandler());
				container.setAfterRollbackProcessor(getAfterRollbackProcessor());
				container.setRecordInterceptor(getRecordInterceptor());
				container.setInterceptBeforeTx(isInterceptBeforeTx());
				container.setEmergencyStop(() -> {
					stop(() -> {
						// NOSONAR
					});
					publishContainerStoppedEvent();
				});
				if (isPaused()) {
					container.pause();
				}
				//最终调用的是KafkaMessageListenerContainer的dostart()
				container.start();
				this.containers.add(container);
			}
		}
	}
}

public abstract class AbstractMessageListenerContainer<K, V>
		implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware,
			ApplicationContextAware {
	@Override
	public final void start() {
		checkGroupId();
		synchronized (this.lifecycleMonitor) {
			if (!isRunning()) {
				Assert.state(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
						() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
				doStart();
			}
		}
	}		    
}

public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
		extends AbstractMessageListenerContainer<K, V> {
    @Override
	protected void doStart() {
		if (isRunning()) {
			return;
		}
		if (this.clientIdSuffix == null) { // stand-alone container
			checkTopics();
		}
		ContainerProperties containerProperties = getContainerProperties();
		//非自动提交的情况下,默认acktime是5秒
		checkAckMode(containerProperties);

		Object messageListener = containerProperties.getMessageListener();
		//设置消费者的异步线程池AsyncListenableTaskExecutor
		if (containerProperties.getConsumerTaskExecutor() == null) {
			SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
					(getBeanName() == null ? "" : getBeanName()) + "-C-");
			containerProperties.setConsumerTaskExecutor(consumerExecutor);
		}
		GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
		//推断监听类型
		ListenerType listenerType = determineListenerType(listener);
		//根据ListenerType构建ListenerConsumer
		this.listenerConsumer = new ListenerConsumer(listener, listenerType);
		setRunning(true);
		this.startLatch = new CountDownLatch(1);
		//提交一个异步任务
		this.listenerConsumerFuture = containerProperties
				.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);
		try {
			if (!this.startLatch.await(containerProperties.getConsumerStartTimout().toMillis(), TimeUnit.MILLISECONDS)) {
				this.logger.error("Consumer thread failed to start - does the configured task executor "
						+ "have enough threads to support all containers and concurrency?");
			    //等的时间太长会通过spring抛出一个Event
				publishConsumerFailedToStart();
			}
		}
		catch (@SuppressWarnings(UNUSED) InterruptedException e) {
			Thread.currentThread().interrupt();
		}
	}		    
}

我们看下

private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
    @Override
	public void run() {
		ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());
		//发送消费Starting事件
		publishConsumerStartingEvent();
		this.consumerThread = Thread.currentThread();
		setupSeeks();
		KafkaUtils.setConsumerGroupId(this.consumerGroupId);
		this.count = 0;
		this.last = System.currentTimeMillis();
		initAssignedPartitions();
		//发送消费Started事件
		publishConsumerStartedEvent();
		Throwable exitThrowable = null;
		while (isRunning()) {
			try {
			    //关键
				pollAndInvoke();
			}
			catch ( e) {
				//各种异常处理
			}
			
		}
		wrapUp(exitThrowable);
	}
	//这块就是kafka的核心逻辑
	protected void pollAndInvoke() {
		if (!this.autoCommit && !this.isRecordAck) {
		    //非自动提交处理
			processCommits();
		}
		//事务修复
		fixTxOffsetsIfNeeded();
		//空闲轮训逻辑
		idleBetweenPollIfNecessary();
		if (this.seeks.size() > 0) {
		    //处理offset移动
			processSeeks();
		}
		//有必要暂停消费
		pauseConsumerIfNecessary();
		this.lastPoll = System.currentTimeMillis();
		if (!isRunning()) {
			return;
		}
		this.polling.set(true);
		//这里会checkRebalanceCommits
		ConsumerRecords<K, V> records = doPoll();
		if (!this.polling.compareAndSet(true, false) && records != null) {
			/*
			 * There is a small race condition where wakeIfNecessary was called between
			 * exiting the poll and before we reset the boolean.
			 */
			if (records.count() > 0) {
				this.logger.debug(() -> "Discarding polled records, container stopped: " + records.count());
			}
			return;
		}
		//有必要恢复消费
		resumeConsumerIfNeccessary();
		debugRecords(records);
		if (records != null && records.count() > 0) {
			savePositionsIfNeeded(records);
			if (this.containerProperties.getIdleEventInterval() != null) {
				this.lastReceive = System.currentTimeMillis();
			}
			//最终执行监听
			invokeListener(records);
		}
		else {
			checkIdle();
		}
	}
	private void invokeListener(final ConsumerRecords<K, V> records) {
		if (this.isBatchListener) {
			invokeBatchListener(records);
		}
		else {
			invokeRecordListener(records);
		}
	}
	//批量消费
	private void invokeBatchListener(final ConsumerRecords<K, V> records) {
		List<ConsumerRecord<K, V>> recordList = null;
		if (!this.wantsFullRecords) {
			recordList = createRecordList(records);
		}
		if (this.wantsFullRecords || recordList.size() > 0) {
			if (this.transactionTemplate != null) {
				invokeBatchListenerInTx(records, recordList);
			}
			else {
				doInvokeBatchListener(records, recordList);
			}
		}
	}
	//单条消费
	private void invokeRecordListener(final ConsumerRecords<K, V> records) {
		if (this.transactionTemplate != null) {
			invokeRecordListenerInTx(records);
		}
		else {
			doInvokeWithRecords(records);
		}
	}
	private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
		Iterator<ConsumerRecord<K, V>> iterator = records.iterator();
	    //单条消费内部是一个迭代器
		while (iterator.hasNext()) {
			if (this.stopImmediate && !isRunning()) {
				break;
			}
			final ConsumerRecord<K, V> record = checkEarlyIntercept(iterator.next());
			if (record == null) {
				continue;
			}
			this.logger.trace(() -> "Processing " + ListenerUtils.recordToString(record));
			doInvokeRecordListener(record, iterator);
			if (this.nackSleep >= 0) {
				handleNack(records, record);
				break;
			}
		}
	}
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐