13 Spring 消息

felix.shao2025-03-16

13 Spring 消息

13.1 JMS 的独立使用

JMS 的独立使用

 使用步骤如下。

  1. 发送端实现。
  2. 接收端实现。
  3. 测试,首先开启发送端,接着开启接收端。

 示例代码见 com.stu.spring.context.chapter13.jms 包中。

13.2 Spring 整合 ActiveMQ

Spring 整合 ActiveMQ

 使用步骤如下。

  1. Spring 配置文件。
  2. 发送端。
  3. 接收端。

 示例代码见 com.stu.spring.context.chapter13.springjms 包中,运行有 class 冲突,估计要独立运行跑,这里先忽略。
 另外有一种更好的循环监听消息方式,即消息监听器,具体代码略。

13.3 源码分析

13.3.1 JmsTemplate

JmsTemplate 类层次结构
JmsTemplate
    JmsDestinationAccessor
        JmsAccessor
            Object
            InitializingBean
    JmsOperations

 其 afterPropertiesSet() 函数未做初始化功能,我们跟踪 send 方法。

JmsTemplate send 方法

 代码非常类似 JdbcTemplate。

public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
    public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
		execute(session -> {
			doSend(session, destination, messageCreator);
			return null;
		}, false);
	}
}
 1. 通用代码抽取
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
    public <T> T execute(SessionCallback<T> action, boolean startConnection) throws JmsException {
		Assert.notNull(action, "Callback object must not be null");
		Connection conToClose = null;
		Session sessionToClose = null;
		try {
			Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
					obtainConnectionFactory(), this.transactionalResourceFactory, startConnection);
			if (sessionToUse == null) {
				// 创建 connection
                conToClose = createConnection();
                // 根据 connection 创建 session
				sessionToClose = createSession(conToClose);
                // 是否开启向服务器推送连接信息,只有接收信息时需要,发送时不需要
				if (startConnection) {
					conToClose.start();
				}
				sessionToUse = sessionToClose;
			}
			if (logger.isDebugEnabled()) {
				logger.debug("Executing callback on JMS Session: " + sessionToUse);
			}
            // 调用回调函数
			return action.doInJms(sessionToUse);
		}
		catch (JMSException ex) {
			throw convertJmsAccessException(ex);
		}
		finally {
            // 关闭 session
			JmsUtils.closeSession(sessionToClose);
            // 释放连接
			ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection);
		}
	}
}
 2. 发送消息的实现

 注意观察 send 方法,此时的发送逻辑被转向了 doSend 方法。

public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
    public void send(final Destination destination, final MessageCreator messageCreator) throws JmsException {
		execute(session -> {
			doSend(session, destination, messageCreator);
			return null;
		}, false);
	}

    protected void doSend(Session session, Destination destination, MessageCreator messageCreator)
			throws JMSException {

		Assert.notNull(messageCreator, "MessageCreator must not be null");
		MessageProducer producer = createProducer(session, destination);
		try {
			Message message = messageCreator.createMessage(session);
			if (logger.isDebugEnabled()) {
				logger.debug("Sending created message: " + message);
			}
			doSend(producer, message);
			// Check commit - avoid commit call within a JTA transaction.
			if (session.getTransacted() && isSessionLocallyTransacted(session)) {
				// Transacted session created by this template -> commit.
				JmsUtils.commitIfNecessary(session);
			}
		}
		finally {
			JmsUtils.closeMessageProducer(producer);
		}
	}
}
 3. 接收消息
public class JmsTemplate extends JmsDestinationAccessor implements JmsOperations {
    public Message receive(Destination destination) throws JmsException {
		return receiveSelected(destination, null);
	}

    public Message receiveSelected(final Destination destination, @Nullable final String messageSelector) throws JmsException {
		return execute(session -> doReceive(session, destination, messageSelector), true);
	}

    protected Message doReceive(Session session, Destination destination, @Nullable String messageSelector)
			throws JMSException {

		return doReceive(session, createConsumer(session, destination, messageSelector));
	}

    protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException {
		try {
			// Use transaction timeout (if available).
			long timeout = getReceiveTimeout();
			ConnectionFactory connectionFactory = getConnectionFactory();
			JmsResourceHolder resourceHolder = null;
			if (connectionFactory != null) {
				resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(connectionFactory);
			}
			if (resourceHolder != null && resourceHolder.hasTimeout()) {
				timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis());
			}
			Message message = receiveFromConsumer(consumer, timeout);
			if (session.getTransacted()) {
				// Commit necessary - but avoid commit call within a JTA transaction.
				if (isSessionLocallyTransacted(session)) {
					// Transacted session created by this template -> commit.
					JmsUtils.commitIfNecessary(session);
				}
			}
			else if (isClientAcknowledge(session)) {
				// Manually acknowledge message, if any.
				if (message != null) {
					message.acknowledge();
				}
			}
			return message;
		}
		finally {
			JmsUtils.closeMessageConsumer(consumer);
		}
	}
}

public abstract class JmsDestinationAccessor extends JmsAccessor {
    protected Message receiveFromConsumer(MessageConsumer consumer, long timeout) throws JMSException {
		if (timeout > 0) {
			return consumer.receive(timeout);
		}
		else if (timeout < 0) {
			return consumer.receiveNoWait();
		}
		else {
			return consumer.receive();
		}
	}
}

13.3.2 监听器容器

监听器容器

 Spring 中消息监听器容器的类型如下。

  • SimpleMessageListenerContainer:最简单的消息监听器容器,只能处理固定数量的 JMS 会话,且不支持事务。
  • DefaultMessageListenerContainer:这个消息监听器容器建立在 SimpleMessageListenerContainer 容器之上,添加了对事务的支持。
  • serversession.ServerSessionMessage.ListenerContainer:这时功能最强大的消息监听器,与 DefaultMessageListenerContainer 相同,除了支持事务,它还允许动态地管理 JMS 会话。

 下面以 DefaultMessageListenerContainer 为例进行分析,了解消息监听器容器的实现。

DefaultMessageListenerContainer 类层次结构
DefaultMessageListenerContainer
    AbstractPollingMessageListenerContainer
        AbstractMessageListenerContainer
            AbstractJmsListeningContainer
                JmsDestinationAccessor
                    JmsAccessor
                        Object
                        InitializingBean
                BeanNameAware
                    Aware
                DisposableBean
                SmartLifecycle
                    Lifecycle
                    Phased
            MessageListenerContainer
                SmartLifecycle
                    Lifecycle
                    Phased

 DefaultMessageListenerContainer 初始化
public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor
		implements BeanNameAware, DisposableBean, SmartLifecycle {
    public void afterPropertiesSet() {
        // 验证 connectionFactory
		super.afterPropertiesSet();
        // 验证配置文件
		validateConfiguration();
        // 初始化,前面两行对应只是简单的两个验证,初始化才是核心逻辑
		initialize();
	}

    public void initialize() throws JmsException {
		try {
            // lifecycleMonitor 用于控制生命周期的同步处理
			synchronized (this.lifecycleMonitor) {
				this.active = true;
				this.lifecycleMonitor.notifyAll();
			}
			doInitialize();
		}
		catch (JMSException ex) {
			synchronized (this.sharedConnectionMonitor) {
				ConnectionFactoryUtils.releaseConnection(this.sharedConnection, getConnectionFactory(), this.autoStartup);
				this.sharedConnection = null;
			}
			throw convertJmsAccessException(ex);
		}
	}
}

public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
    protected void doInitialize() throws JMSException {
		synchronized (this.lifecycleMonitor) {
            // concurrentConsumers 属性:消息监听器允许创建多个 session 和 messageConsumer 来接收消息。  
            // 但是多个 messageConsumer 接收到同样的消息 会有并发问题,消息处理的顺序可能和消息发送的顺序不同
			for (int i = 0; i < this.concurrentConsumers; i++) {
				scheduleNewInvoker();
			}
		}
	}
}
 1. scheduleNewInvoker

 并发处理。

public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
    private void scheduleNewInvoker() {
		AsyncMessageListenerInvoker invoker = new AsyncMessageListenerInvoker();
		if (rescheduleTaskIfNecessary(invoker)) {
			// This should always be true, since we're only calling this when active.
			this.scheduledInvokers.add(invoker);
		}
	}
}

public abstract class AbstractJmsListeningContainer extends JmsDestinationAccessor
		implements BeanNameAware, DisposableBean, SmartLifecycle {
    protected final boolean rescheduleTaskIfNecessary(Object task) {
		if (this.running) {
			try {
				doRescheduleTask(task);
			}
			catch (RuntimeException ex) {
				logRejectedTask(task, ex);
				this.pausedTasks.add(task);
			}
			return true;
		}
		else if (this.active) {
			this.pausedTasks.add(task);
			return true;
		}
		else {
			return false;
		}
	}

    protected void doRescheduleTask(Object task) {
		throw new UnsupportedOperationException(
				ClassUtils.getShortName(getClass()) + " does not support rescheduling of tasks");
	}
}

 根据 concurrentConsumers 数量建立了对应数量的线程处理。我们接着看 rescheduleTaskIfNecessary 的处理逻辑。

 2. AsyncMessageListenerInvoker 处理
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
    private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {
        public void run() {
            // 并发控制
			synchronized (lifecycleMonitor) {
				activeInvokerCount++;
				lifecycleMonitor.notifyAll();
			}
			boolean messageReceived = false;
			try {
                // 根据每个任务设置的最大处理消息数量而作不同处理
                // 小于 0 默认为无限制,一直接收消息
				if (maxMessagesPerTask < 0) {
					messageReceived = executeOngoingLoop();
				}
				else {
					int messageCount = 0;
                    // 消息数量控制,一旦超出数量则停止循环
					while (isRunning() && messageCount < maxMessagesPerTask) {
                        // invokeListener 核心逻辑
						messageReceived = (invokeListener() || messageReceived);
						messageCount++;
					}
				}
			}
			catch (Throwable ex) {
                // 清理操作,包括关闭 session 等
				clearResources();
				if (!this.lastMessageSucceeded) {
					// We failed more than once in a row or on startup -
					// wait before first recovery attempt.
					waitBeforeRecoveryAttempt();
				}
				this.lastMessageSucceeded = false;
				boolean alreadyRecovered = false;
				synchronized (recoveryMonitor) {
					if (this.lastRecoveryMarker == currentRecoveryMarker) {
						handleListenerSetupFailure(ex, false);
						recoverAfterListenerSetupFailure();
						currentRecoveryMarker = new Object();
					}
					else {
						alreadyRecovered = true;
					}
				}
				if (alreadyRecovered) {
					handleListenerSetupFailure(ex, true);
				}
			}
			finally {
				synchronized (lifecycleMonitor) {
					decreaseActiveInvokerCount();
					lifecycleMonitor.notifyAll();
				}
				if (!messageReceived) {
					this.idleTaskExecutionCount++;
				}
				else {
					this.idleTaskExecutionCount = 0;
				}
				synchronized (lifecycleMonitor) {
					if (!shouldRescheduleInvoker(this.idleTaskExecutionCount) || !rescheduleTaskIfNecessary(this)) {
						// We're shutting down completely.
						scheduledInvokers.remove(this);
						if (logger.isDebugEnabled()) {
							logger.debug("Lowered scheduled invoker count: " + scheduledInvokers.size());
						}
						lifecycleMonitor.notifyAll();
						clearResources();
					}
					else if (isRunning()) {
						int nonPausedConsumers = getScheduledConsumerCount() - getPausedTaskCount();
						if (nonPausedConsumers < 1) {
							logger.error("All scheduled consumers have been paused, probably due to tasks having been rejected. " +
									"Check your thread pool configuration! Manual recovery necessary through a start() call.");
						}
						else if (nonPausedConsumers < getConcurrentConsumers()) {
							logger.warn("Number of scheduled consumers has dropped below concurrentConsumers limit, probably " +
									"due to tasks having been rejected. Check your thread pool configuration! Automatic recovery " +
									"to be triggered by remaining consumers.");
						}
					}
				}
			}
		}
    }
}
  2.1 executeOngoingLoop
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
    private boolean executeOngoingLoop() throws JMSException {
        boolean messageReceived = false;
        boolean active = true;
        while (active) {
            synchronized (lifecycleMonitor) {
                boolean interrupted = false;
                boolean wasWaiting = false;
                // 如果当前任务已经处于激活状态但是却给了暂时终止的命令
                while ((active = isActive()) && !isRunning()) {
                    if (interrupted) {
                        throw new IllegalStateException("Thread was interrupted while waiting for " +
                                "a restart of the listener container, but container is still stopped");
                    }
                    if (!wasWaiting) {
                        // 如果并非处于等待状态则说明是第一次执行,需要将激活任务数量减少
                        decreaseActiveInvokerCount();
                    }
                    // 开始进入等待状态,等待任务的恢复命令
                    wasWaiting = true;
                    try {
                        // 通过 wait 等待,也就是等待 notity 或者 notifyAll
                        lifecycleMonitor.wait();
                    }
                    catch (InterruptedException ex) {
                        // Re-interrupt current thread, to allow other threads to react.
                        Thread.currentThread().interrupt();
                        interrupted = true;
                    }
                }
                if (wasWaiting) {
                    activeInvokerCount++;
                }
                if (scheduledInvokers.size() > maxConcurrentConsumers) {
                    active = false;
                }
            }
            // 正常处理流程
            if (active) {
                messageReceived = (invokeListener() || messageReceived);
            }
        }
        return messageReceived;
    }
}

 按照正常逻辑是不会进入 while 循环中的,而是直接进入 invokeListener() 来接收消息并激活监听器。

  2.2 invokeListener

 消息的处理。

public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
    private boolean invokeListener() throws JMSException {
        this.currentReceiveThread = Thread.currentThread();
        try {
            // 初始化资源包括首次创建的时候创建 session 与 consumer
            initResourcesIfNecessary();
            boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
            // 改变标志位,信息成功处理
            this.lastMessageSucceeded = true;
            return messageReceived;
        }
        finally {
            this.currentReceiveThread = null;
        }
    }
}

public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer {
    protected boolean receiveAndExecute(
			Object invoker, @Nullable Session session, @Nullable MessageConsumer consumer)
			throws JMSException {

		if (this.transactionManager != null) {
			// Execute receive within transaction.
			TransactionStatus status = this.transactionManager.getTransaction(this.transactionDefinition);
			boolean messageReceived;
			try {
				messageReceived = doReceiveAndExecute(invoker, session, consumer, status);
			}
			catch (JMSException | RuntimeException | Error ex) {
				rollbackOnException(this.transactionManager, status, ex);
				throw ex;
			}
			this.transactionManager.commit(status);
			return messageReceived;
		}

		else {
			// Execute receive outside of transaction.
			return doReceiveAndExecute(invoker, session, consumer, null);
		}
	}
}

 注意看这里有事务的提交和回滚处理逻辑。

  2.3 doReceiveAndExecute

 在有无事务环境都支持。

public abstract class AbstractPollingMessageListenerContainer extends AbstractMessageListenerContainer {
    protected boolean doReceiveAndExecute(Object invoker, @Nullable Session session,
			@Nullable MessageConsumer consumer, @Nullable TransactionStatus status) throws JMSException {

		Connection conToClose = null;
		Session sessionToClose = null;
		MessageConsumer consumerToClose = null;
		try {
			Session sessionToUse = session;
			boolean transactional = false;
			if (sessionToUse == null) {
				sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
						obtainConnectionFactory(), this.transactionalResourceFactory, true);
				transactional = (sessionToUse != null);
			}
			if (sessionToUse == null) {
				Connection conToUse;
				if (sharedConnectionEnabled()) {
					conToUse = getSharedConnection();
				}
				else {
					conToUse = createConnection();
					conToClose = conToUse;
					conToUse.start();
				}
				sessionToUse = createSession(conToUse);
				sessionToClose = sessionToUse;
			}
			MessageConsumer consumerToUse = consumer;
			if (consumerToUse == null) {
				consumerToUse = createListenerConsumer(sessionToUse);
				consumerToClose = consumerToUse;
			}
            // 接收消息
			Message message = receiveMessage(consumerToUse);
			if (message != null) {
				if (logger.isDebugEnabled()) {
					logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
							consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
							sessionToUse + "]");
				}
                // 模板方法,当消息接收且在未处理前给子类机会做相应处理,当期空实现
				messageReceived(invoker, sessionToUse);
				boolean exposeResource = (!transactional && isExposeListenerSession() &&
						!TransactionSynchronizationManager.hasResource(obtainConnectionFactory()));
				if (exposeResource) {
					TransactionSynchronizationManager.bindResource(
							obtainConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
				}
				try {
                    // 激活监听器
					doExecuteListener(sessionToUse, message);
				}
				catch (Throwable ex) {
					if (status != null) {
						if (logger.isDebugEnabled()) {
							logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
						}
						status.setRollbackOnly();
					}
					handleListenerException(ex);
					// Rethrow JMSException to indicate an infrastructure problem
					// that may have to trigger recovery...
					if (ex instanceof JMSException) {
						throw (JMSException) ex;
					}
				}
				finally {
					if (exposeResource) {
						TransactionSynchronizationManager.unbindResource(obtainConnectionFactory());
					}
				}
				// Indicate that a message has been received.
				return true;
			}
			else {
				if (logger.isTraceEnabled()) {
					logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
							"session [" + sessionToUse + "] did not receive a message");
				}
                // 接收到空消息的处理
				noMessageReceived(invoker, sessionToUse);
				// Nevertheless call commit, in order to reset the transaction timeout (if any).
				if (shouldCommitAfterNoMessageReceived(sessionToUse)) {
					commitIfNecessary(sessionToUse, null);
				}
				// Indicate that no message has been received.
				return false;
			}
		}
		finally {
			JmsUtils.closeMessageConsumer(consumerToClose);
			JmsUtils.closeSession(sessionToClose);
			ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
		}
	}
}
 2.4 doExecuteListener

 激活监听器

public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer
		implements MessageListenerContainer {
    protected void doExecuteListener(Session session, Message message) throws JMSException {
		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
			if (logger.isWarnEnabled()) {
				logger.warn("Rejecting received message because of the listener container " +
						"having been stopped in the meantime: " + message);
			}
			rollbackIfNecessary(session);
			throw new MessageRejectedWhileStoppingException();
		}

		try {
			invokeListener(session, message);
		}
		catch (JMSException | RuntimeException | Error ex) {
			rollbackOnExceptionIfNecessary(session, ex);
			throw ex;
		}
        // 完成消息服务的事务提交,涉及两个事务,一个是数据库事务,另外一个是消息 ACK 事务。
        // 第二个事务说明:告诉消息服务器本地已经正常接收消息,消息服务器接收到本地的事务提交后便可以将此消息删除,否则,当前消息会被其他接收者重新接收
		commitIfNecessary(session, message);
	}

    protected void invokeListener(Session session, Message message) throws JMSException {
		Object listener = getMessageListener();

		if (listener instanceof SessionAwareMessageListener) {
			doInvokeListener((SessionAwareMessageListener) listener, session, message);
		}
		else if (listener instanceof MessageListener) {
			doInvokeListener((MessageListener) listener, message);
		}
		else if (listener != null) {
			throw new IllegalArgumentException(
					"Only MessageListener and SessionAwareMessageListener supported: " + listener);
		}
		else {
			throw new IllegalStateException("No message listener specified - see property 'messageListener'");
		}
	}

    protected void doInvokeListener(MessageListener listener, Message message) throws JMSException {
		listener.onMessage(message);
	}
}

 它通过层层调用,最终提取监听器并使用 listener.onMessage(message) 对其进行了激活,也就是激活了用户自定义的监听器逻辑。

Last Updated 3/16/2025, 9:52:57 PM