從Spring RabbitMQ消費者啟動,到接收訊息和執行消費邏輯,一步步了解其實作,
目錄
- 1. 消費者如何啟動程序
- 1.1 啟動配置類
- 1.2 創建消費者核心邏輯
- 1.3 PS: BeanPostPorcessor如何被Spring處理?
- 2. RabbitMQ訊息如何被消費
- 2.1 SimpleMessageListenerContainer
- 2.2 BlockingQueueConsumer
1. 消費者如何啟動程序
1.1 啟動配置類
創建RabbitListenerAnnotationBeanPostProcessor
@Configuration
public class RabbitBootstrapConfiguration {
@Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() {
return new RabbitListenerAnnotationBeanPostProcessor();
}
.....
}
1.2 創建消費者核心邏輯
核心邏輯在RabbitListenerAnnotationBeanPostProcessor,在Spring Bean初始化程序中執行,
對于每個訊息監聽都會創建對應的MessageListenerContainer(默認實作為SimpleMessageListenerContainer)
// 通過BeanPostProcessor在Bean創建后,創建訊息監聽器
public class RabbitListenerAnnotationBeanPostProcessor
implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware,
SmartInitializingSingleton {
......
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
Class<?> targetClass = AopUtils.getTargetClass(bean);
// 通過反射獲取@RabbitListener修飾的方法
final TypeMetadata metadata = this.typeCache.computeIfAbsent(targetClass, this::buildMetadata);
for (ListenerMethod lm : metadata.listenerMethods) {
for (RabbitListener rabbitListener : lm.annotations) {
// 創建MethodRabbitListenerEndpoint,并注冊到RabbitListenerEndpointRegistrar
processAmqpListener(rabbitListener, lm.method, bean, beanName);
}
}
if (metadata.handlerMethods.length > 0) {
processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName);
}
return bean;
}
protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint();
endpoint.setMethod(methodToUse);
processListener(endpoint, rabbitListener, bean, methodToUse, beanName);
}
// 創建RabbitMQ消費者核心邏輯
protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
Object adminTarget, String beanName) {
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(rabbitListener));
// resolveQueues方法會處理創建佇列的作業
endpoint.setQueueNames(resolveQueues(rabbitListener));
.......
// registerEndpoint()里核心創建MessageListenerContainer,其默認實作是SimpleMessageListenerContainer
this.registrar.registerEndpoint(endpoint, factory);
}
......
}
1.3 PS: BeanPostPorcessor如何被Spring處理?
雖然大家都很熟悉Spring Bean初始化流程里,但嘮叨一下
呼叫鏈路:getBean -> doGetBean -> createBean -> initializeBean
->applyBeanPostProcessorsBeforeInitialization -> applyBeanPostProcessorsAfterInitialization
public abstract class AbstractAutowireCapableBeanFactory extends AbstractBeanFactory
implements AutowireCapableBeanFactory{
// 創建一個Bean實體物件,應用post-processors
protected Object createBean(String beanName, RootBeanDefinition mbd, Object[] args) throws BeanCreationException {
// 各種準備作業
......
// 最后呼叫doCreateBean
Object beanInstance = doCreateBean(beanName, mbdToUse, args);
if (logger.isDebugEnabled()) {
logger.debug("Finished creating instance of bean '" + beanName + "'");
}
return beanInstance;
}
protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final Object[] args)
throws BeanCreationException {
......
// Initialize the bean instance.
Object exposedObject = bean;
try {
populateBean(beanName, mbd, instanceWrapper);
if (exposedObject != null) {
// 呼叫initializeBean
exposedObject = initializeBean(beanName, exposedObject, mbd);
}
}
catch (Throwable ex) {
.....
}
}
// 初始化Bean實體
protected Object initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd) {
......
if (mbd == null || !mbd.isSynthetic()) {
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
}
try {
invokeInitMethods(beanName, wrappedBean, mbd);
}
catch (Throwable ex) {
throw new BeanCreationException(
(mbd != null ? mbd.getResourceDescription() : null),
beanName, "Invocation of init method failed", ex);
}
if (mbd == null || !mbd.isSynthetic()) {
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
}
return wrappedBean;
}
}
2. RabbitMQ訊息如何被消費
2.1 SimpleMessageListenerContainer
上面說了消費者啟動會創建SimpleMessageListenerContainer,它啟動時會創建一個AsyncMessageProcessingConsumer內部類的物件(實作了Runnable介面,核心屬性是BlockingQueueConsumer),AsyncMessageProcessingConsumer的run()通過while回圈不斷接收訊息并呼叫我們使用@RabbitListener修飾的方法實作的消費邏輯,
@Override
protected void doStart() throws Exception {
......
super.doStart();
synchronized (this.consumersMonitor) {
if (this.consumers != null) {
throw new IllegalStateException("A stopped container should not have consumers");
}
// 根據配置的并發數創建對應數量BlockingQueueConsumer
int newConsumers = initializeConsumers();
......
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
// 執行AsyncMessageProcessingConsumer,輪詢呼叫獲取佇列里的訊息并執行消費邏輯
getTaskExecutor().execute(processor);
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
for (AsyncMessageProcessingConsumer processor : processors) {
FatalListenerStartupException startupException = processor.getStartupException();
if (startupException != null) {
throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
}
}
}
}
2.2 BlockingQueueConsumer
BlockingQueueConsumer扮演一個解耦訊息接收和訊息消費的角色,一方面負責承接Channel接收的訊息并壓入BlockingQueue queue,另一方面被AsyncMessageProcessingConsumer輪詢呼叫獲取佇列里的訊息并執行消費邏輯,
// 從佇列中獲取訊息
public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
......
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
throw new ConsumerCancelledException();
}
return message;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
......
try {
// 如果BlockingQueueConsumer已被標記為停止,呼叫offer將訊息入隊,如果佇列滿了會馬上回傳false
if (BlockingQueueConsumer.this.abortStarted > 0) {
//如果offer失敗,發送basic.nack命令通知服務端訊息沒有消費成功,然后發送basic.cancel命令通知服務端取消訂閱,服務端不再發送訊息到該消費者
if (!BlockingQueueConsumer.this.queue.offer(
new Delivery(consumerTag, envelope, properties, body, this.queue),
BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
// Defensive - should never happen
BlockingQueueConsumer.this.queue.clear();
getChannel().basicNack(envelope.getDeliveryTag(), true, true);
getChannel().basicCancel(consumerTag);
try {
getChannel().close();
}
catch (TimeoutException e) {
// no-op
}
}
}
else {
// 如果BlockingQueueConsumer沒有標記為停止,呼叫put入隊,如果佇列空間滿了則會一直等待直到空間可用
BlockingQueueConsumer.this.queue
.put(new Delivery(consumerTag, envelope, properties, body, this.queue));
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/430275.html
標籤:其他
上一篇:Day542.kafka基礎
