原始碼地址
https://gitee.com/bseaworkspace/study_java_web/tree/master/springIntegrationJMS
說明
- JMS ActiveMQ 服務器有兩種方式
- 下載((http://activemq.apache.org/download.html))ActiveMQ 壓縮包,解壓以后運行其bin目錄下面的activemq.bat檔案啟動activeMQ,
- 直接使用內置的 ActiveMQ(EmbbededActiveMQ),這個直接隨著Spring啟動,自動啟動一個內置的ActiveMQ JMS 服務,
- 本案例,使用了內置的ActiveMQ,所以不需要額外下載和啟動ActiveMQ
代碼使用方式
提供了三個測驗用例,分別測驗了
- GatewayDemo 結合SpringIntegration的channel-adapter 實作點對點的JMS通信實列
uses the DemoBean service, which will echo the response and
upper-casing it.
- ChannelAdapterDemo 結合SpringIntegration的inbound-gateway和outbound-gateway 實作點對點的JMS通信實列
will simply echo the response
- AggregatingDemo .結合SpringIntegration的inbound-gateway和outbound-gateway 實作點對點和訂閱者模式的JMS通信實列
uses a JMS Topic; and aggregates the responses from two inbound
gateways, which invoke a flow that upper-cases the response; the
aggregation returns a list of responses

本實列 使用到的Spring Integration知識
- Spring Integration 主要解決的問題是不同系統之間互動的問題,通過異步訊息驅動來達到系統互動時系統之間的松耦合,
- Spring Inegration 主要三部分組成 Message(訊息), Channel(傳遞訊息的管道), Message EndPoint(正在處理訊息的組件)
腦圖地址:
http://naotu.baidu.com/file/272eb7e9ff11511f831c394baeb76302?token=06add75ef19ec173

- Message 一共由兩部分組成,我們一般把資訊放在payload上,

- 每個Message都必須通過Channel來傳遞,訊息發送者發送訊息到通道(Channel),訊息接受者從通道(Channel)接收訊息,
MessageChannel 有兩大子介面,分別是PollableChannel (可輪詢)和SubscribableChannel(可訂閱),我們所有的訊息通道類都是現實這兩個介面,

- 配接器(Channel Adapter)可以設定channel的方向, 比如 message是 包裹,那么channel就是快遞員負責運送包裹,配接器就是包裹的快遞站,快遞員是要把包裹放到本地的收件人家里,還是要幫忙把包裹寄到外地出去,都是快遞站決定的,所以配接器決定了channel的方向,
通道配接器(Channel
Adapter)是一種連接外部系統或傳輸協議的端點(EndPoint),可以分為入站(inbound)和出站(outbound),
通道配接器是單向的,入站通道配接器只支持接收訊息,出站通道配接器只支持輸出訊息,Spring Integration內置了如下的配接器:
RabbitMQ、Feed、File、FTP/SFTP、Gemfire、HTTP、TCP/UDP、JDBC、JPA、JMS、Mail、MongoDB、Redis、RMI
Twitter、XMPP、WebServices(SOAP、REST)、WebSocket

- 訊息網關(Gateway)類似于Adapter,但是提供了雙向的請求/回傳集成方式,也分為入站(inbound)和出站(outbound),
Spring Integration 對回應的Adapter都提供了Gateway,
ActiveMQ的相關概念
1.Destination
目的地,JMS Provider(訊息中間件)維護,用于對Message進行管理的物件,
MessageProducer需要指定Destination才能發送訊息,MessageConsumer需要指定Destination才能接收訊息,
2.Producer
訊息生成者(客戶端,生成訊息),負責發送Message到目的地,應用介面為MessageProducer
3.Consumer【Receiver】
訊息消費者(處理訊息),負責從目的地中消費【處理|監聽|訂閱】Message,應用介面為MessageConsumer
4.Message
訊息(Message),訊息封裝一次通信的內容,常見型別有:StreamMessage、BytesMessage、TextMessage、ObjectMessage、MapMessage,
5.ConnectionFactory 鏈接工廠, 用于創建鏈接的工廠型別, 注意,不能和 JDBC 中的 ConnectionFactory 混 淆,
6.Connection 鏈接. 用于建立訪問 ActiveMQ 連接的型別, 由鏈接工廠創建. 注意,不能和 JDBC 中的 Connection 混淆,
7.Session 會話, 一次持久有效有狀態的訪問. 由鏈接創建. 是具體操作訊息的基礎支撐,
8.Queue&Topic
Queue是佇列目的地,Topic是主題目的地,都是Destination的子介面,
Queue特點:佇列中的訊息,默認只能有唯一的一個消費者處理,
Topic特點:主題中的訊息,會發送給所有的消費者同時處理,只有在訊息可以重復處理的業務場景中可使用,
9.PTP
Point to Point,點對點訊息模型,就是基于Queue實作的訊息處理方式,
10.PUB&SUB
Publish&Subscribe,訊息的發布/訂閱模型,是基于Topic實作的訊息處理方式
原始碼決議
- 關于DefaultMessageListenerContainer
在Spring框架中使用JMS傳遞訊息有兩種方式:JMS template和message listener container,前者用于同步收發訊息,后者用于異步收發訊息,
message listener container系結連接類工廠(connection factory)、JMS Destination、JNDI Destination決議器和message listener bean,
Spring提供了兩種message listener container:DefaultMessageListenerContainer和SimpleMessageListenerContainer,兩種message listener container都允許指定數量的并發監聽執行緒,只有DefaultMessageListenerContainer可以在允許期間動態調整監聽執行緒的數量,另外,DefaultMessageListenerContainer允許和XA Transactions的集成,對于使用本地事務管理器和不需要基于可變負載的執行緒、會話、連接調整的簡單訊息傳遞應用,使用SimpleMessageListenerContainer,對于使用外部事務管理器或XA事務的訊息傳遞應用,使用DefaultMessageListenerContainer,
message listener container的配置類似JmsTemplate,使用JNDI訪問連接工廠和JMS Destinations,
或直接使用JMS提供者的native 連接工廠和JMS destination類,
package com.xsz;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.context.support.GenericXmlApplicationContext;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
/**
* @author Gunnar Hillert
* @author Gary Russell
* @author Artem Bilan
*/
public class AggregatorDemoTest {
private final static String[] configFilesGatewayDemo = {
"/META-INF/spring/integration/common.xml",
"/META-INF/spring/integration/aggregation.xml"
};
@Test
public void testGatewayDemo() throws InterruptedException {
System.setProperty("spring.profiles.active", "testCase");
final GenericXmlApplicationContext applicationContext = new GenericXmlApplicationContext(
configFilesGatewayDemo);
Map<String, DefaultMessageListenerContainer> containers = applicationContext
.getBeansOfType(DefaultMessageListenerContainer.class);
// wait for containers to subscribe before sending a message.
containers.values().forEach(c -> {
int n = 0;
while (n++ < 100 && !c.isRegisteredWithDestination()) {
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (!c.isRegisteredWithDestination()) {
throw new IllegalStateException("Container failed to subscribe to topic");
}
});
final MessageChannel stdinToJmsOutChannel = applicationContext.getBean("stdinToJmsOutChannel", MessageChannel.class);
stdinToJmsOutChannel.send(MessageBuilder.withPayload("jms test").build());
final QueueChannel queueChannel = applicationContext.getBean("queueChannel", QueueChannel.class);
@SuppressWarnings("unchecked")
Message<List<String>> reply = (Message<List<String>>) queueChannel.receive(20_000);
Assert.assertNotNull(reply);
List<String> out = reply.getPayload();
Assert.assertEquals("[JMS TEST, JMS TEST]", out.toString());
applicationContext.close();
}
}
Spring Integration系列
《1》Hello World專案
https://blog.csdn.net/h356363/article/details/112076788
《2》 http專案
https://blog.csdn.net/h356363/article/details/112120991
更多資料關注微信公眾平臺

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/247186.html
標籤:其他
下一篇:簡單連接資料庫(swing)
