訊息佇列(Message Queue,以下簡稱MQ)常用于異步系統的資料傳遞,若不用MQ,我們只能[在應用層]使用輪詢或介面回呼等方式處理,這在效率或耦合度上是難以讓人滿意的,當然我們也可以在系統間保持一個長連接,基于底層socket機制進行資料的實時收發,如果再將這部分功能獨立成一個中間件,供專案中所有系統使用,就是我們今天所指的MQ,
對比&選擇
以下以當前較為流行社區活躍度較高的兩個MQ——RabbitMQ和Kafka做一比較,順帶提一提redis,
簡單的小型系統可以使用redis,redis簡單易用,本身就提供了佇列結構,也支持發布訂閱模式,不過說到底redis是一個快取資料庫,主要職責并不是訊息佇列,缺少訊息可達(防丟失)、可靠性(分布式、集群)、異步、事務處理等特性,需要應用層額外處理,
RabbitMQ:erlang開發,單機吞吐高,但是它只支持集群模式,不支持分布式,可靠性依靠的是集群中分屬兩個不同節點的master queue和mirror queue同步資料,在master queue所在節點掛掉之后,系統把mirror queue提升為master queue,負責處理客戶端佇列操作請求,注意,mirror queue只做鏡像,設計目的不是為了承擔客戶端讀寫壓力,讀寫都走的master queue,這就有了單點性能瓶頸,RabbitMQ支持消費端pull和push模式,
Kafka:Scala開發,支持分布式,因此如果是相同佇列,集群吞吐量肯定是大于RabbitMQ的,Kafka只支持pull模式,pull有個缺點是,如果broker沒有可供消費的訊息,將導致consumer不斷輪詢,直到新訊息到達,為了避免這點,Kafka有個引數可以讓consumer阻塞知道新訊息到達(當然也可以阻塞直到訊息的數量達到某個特定的量這樣就可以批量獲取),如此個人認為在訊息傳輸不是很頻繁的場景下反而比push更好,即減少了輪詢次數,又不需要永遠占著一個連接,實時性也基本上能得到保障,RabbitMQ pull模式并不支持此機制,
其實對于吞吐量而言,除非我們預期有百萬級并發,否則兩者差別不大,另外對于上述RabbitMQ每個佇列的單點瓶頸,我們可以將一個佇列按一定邏輯拆分為多個佇列,在業務端將訊息分流,也能提高吞吐量,相比Kafka,RabbitMQ提供了較為完備的訊息路由、訊息到期洗掉/延遲/預定、訊息容錯機制,這些功能可不是短期內靠堆硬體能完成的,對此有要求的話,那么優選RabbitMQ沒錯了,由此我們也知道了為什么Kafka常用于日志系統,一是日志相對業務來說寫操作例外頻繁,可能一次請求會產生數十條日志,需要較高的吞吐量,且關聯日志一般都是跨系統跨業務的,無法進行細粒度拆分,限制了RabbitMQ提升吞吐量的空間;另外日志記錄對一致性實時性等要求不高,不需要什么策略,稍有丟失也無關大雅,無法體現RabbitMQ的優勢,
綜上所述,業務層建議使用RabbitMQ,
RabbitMQ概念及注意點
RabbitMQ主要概念:
Connection:在RabbitMQ中指的是AMQP 0-9-1 connection,它與底層的TCP鏈接是一一對應的,Channel:信道,用于訊息的傳遞,Queue:佇列,訊息通過交換機被投遞到這里,Exchange:交換機,用于訊息路由,通過routeKey決定將訊息投遞到哪個佇列,有四種模式(fanout、direct、topic、header),routeKey:路由鍵,DeadLetter:死信機制,
關于它們的介紹網上資料很多,這里就不贅述了,我們把注意點放到具體細節上,以下部分摘自RabbitMQ最佳實踐,建議先了解了上述RabbitMQ主要概念再看,
在RabbitMQ中,訊息確認分為發送方確認和消費方確認兩個確認環節,
- 發送端:
ConfirmListener:訊息是否到達exchange的回呼,需要實作兩個方法——handleAck、handleNack,通常來講,發送端只需要保證訊息能夠發送到exchange即可,而無需關注訊息是否被正確地投遞到了某個queue,這個是RabbitMQ和訊息的接收方需要考慮的事情,基于此,如果RabbitMQ找不到任何需要投遞的queue,那么依然會ack給發送方,此時發送方可以認為訊息已經正確投遞,而不用關心訊息沒有queue接收的問題,此時可以為exchange設定alternate-exchange,即表示rabbitmq將把無法投遞到任何queue的訊息發送到alternate-exchange指定的exchange中,此時該指定的exchange就是一個死信交換機(DLX,所以DLX與普通交換機并無不同,只不過路由的是一些無法處理的訊息而已),
ReturnListener:事實上,對于exchange存在但是卻找不到任何接收queue時,如果發送時設定了mandatory=true,那么在訊息被ack前將return給發送端,此時發送端可以創建一個ReturnListener用于接識訓傳的訊息,
需要注意的是,在發送訊息時如果exchange不存在,訊息會被直接丟棄,并且不會ack或者nack操作, - 消費端:訊息默認是直接ack的,即訊息到達消費方立即ack,而不管消費方業務處理是否成功,大部分情況我們需要業務處理完畢才認為此訊息被正確消費了,為此可以開啟手動確認模式,即有消費方自行決定何時應該ack,通過設定
autoAck=false開啟手動確認模式,
requeue:消費端nack或reject時設定,告知rq是否將訊息重新投遞,
默認情況下,queue中被拋棄的訊息將被直接丟掉,但是可以通過設定queue的x-dead-letter-exchange引數,將被拋棄的訊息發送到x-dead-letter-exchange中指定的exchange中,這樣的exchange成為DLX,
Lazy Queue:一個重要的設計目標是能夠支持更長的佇列,即支持更多的訊息存盤,惰性佇列會將接收到的訊息直接存入檔案系統中,而不管是持久化的或者是非持久化的,注意如果惰性佇列中存盤的是非持久化的訊息,記憶體的使用率會一直很穩定,但是重啟之后訊息一樣會丟失,
實測在topic模式下,例如test#是沒用的,無法匹配test1,需要配置為test.#,也許這是RabbitMQ所要求的規范吧,
在RabbitMQ中,使用一個還是多個exchange,似乎網上并沒有關于這方面的廣泛討論(可見性能上兩種方案并無顯著差別),so,我們就從不增加復雜度出發,保持一個exchange對應多個queue的簡單模式,或按業務劃分,
創建鏈接時可以提供一個ConnectionName,如newConnection(ConnectionName),然而ConnectionName似乎只是給人看的(比如在管理后臺),并不要求唯一性,
接下來我們聊下Connection和Channel,
Connection和Channel
為什么要將這兩個東西單獨拎出來講呢,因為RabbitMQ并沒有為我們提供一個開箱即用的鏈接復用組件,眾所周知,Connection這東西,創建和銷毀是一筆不小的開支,然而以官方提供的Java Client SDK為例,ConnectionFactory.newConnection()每次都會new一個新的Connection,SDK并沒有內置連接池,這塊作業就需要另外處理,
NIO里也有這兩個概念(不了解NIO的可參看博主以前寫的也談Reactor模式),而它們都有鏈接復用的意思,也許RabbitMQ就是參考了NIO呢,
而Channel是干嘛的?資料傳輸嘛,Connection就做了,有了Connection為什么還要Channel?其實Channel是為了在另外一個層面復用Connection———解決多執行緒資料并發傳輸的問題,直接操作Connection進行資料傳輸,當有多個執行緒同時操作時,很容易出現資料幀錯亂的情況,一個Connection可以create多個Channel,訊息的收發、路由等操作都是和某個Channel系結而非Connection,每個訊息都由一個Channel ID標識,然而,竊以為這種細節完全可以對使用者隱藏,暴露出來反而會增加復雜度,關于Channel的使用方式和注意事項,官方檔案給出了一些描述,具體編碼時需要考量,
As a rule of thumb, Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
官方建議每個執行緒使用一個Channel,不同執行緒最好不要共享Channel,否則并發時容易產生資料幀交錯(同多個執行緒直接共用一個Connection一樣),這種情況下exchange會直接關閉下層Connection,
Channels consume resources,所以在一個行程中同時存在成百上千個打開狀態的Channel不是一個好注意,但是用完即關也不是一個好主意, A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient. 開辟一個新的channel需要一個網路的往返,這種模式是很低效的,
Consuming in one thread and publishing in another thread on a shared channel can be safe.
一個Connection上的多個Channel的調度是由一個
java.util.concurrent.ExecutorService負責的,我們可以使用ConnectionFactory#setSharedExecutor設定自定義調度器,
在消費者端,訊息Ack需要由接收訊息(received the delivery)的執行緒完成,否則可能會產生Channel級別的例外,并且Channel會被關閉,
總得來說,Channel也需要復用,但是數量可以比Connection多一兩個數量級,我們可以設計一個簡單的連接池方案PooledConnectionFactory,它是一個Connection容器,保持若干long-lived Connection提供給外部使用,而每個Connection又有自己的PooledChannelFactory,其中維持著一些long-lived Channel,Spring-boot提供了一個AMQP組件Spring AMQP,已經幫我們實作了類似的方案,并且還隱藏了Channel這個東東,但Spring的原罪——代碼碎片化,注解滿天飛——又增加了組件本身使用的復雜度,且無法掌控細節,當然它還提供了其它一些可有可無的特性,其實,我們只需要一個簡單的連接池而已,so,讓我們自己實作吧,
簡單連接池實作&使用
直接上代碼,先定義一個鏈接配置類:
@Component
data class ConnectionConfig(
@Value("\${rabbitmq.userName:guest}")
var userName: String,
@Value("\${rabbitmq.password:guest}")
var password: String,
@Value("\${rabbitmq.host:localhost}")
var host: String,
@Value("\${rabbitmq.port:5672}")
var port: Int,
@Value("\${rabbitmq.virtualHost:/}")
var virtualHost: String
)
配置項可在組態檔中配置,
工廠類,內部有個BlockingQueue存放PooledConnection實體,PooledConnection封裝了RabbitMQ的Connection,至于為啥要封裝一層稍后說,
@Component
class PooledConnectionFactory(@Autowired private val connectionConfig: ConnectionConfig,
@Value("\${rabbitmq.maxConnectionCount:5}")
private val maxConnectionCount: Int) {
private val _logger: Logger by lazy {
LoggerFactory.getLogger(PooledConnectionFactory::class.java)
}
private val _connQueue = ArrayBlockingQueue<PooledConnection>(maxConnectionCount)
//已創建了幾個connection
private val _connCreatedCount = AtomicInteger()
private val _factory by lazy {
buildConnectionFactory()
}
private fun buildConnectionFactory(): ConnectionFactory {
val factory = ConnectionFactory()
with(connectionConfig) {
factory.username = userName
factory.password = password
factory.virtualHost = virtualHost
factory.host = host
factory.port = port
}
return factory
}
@Throws(IOException::class, TimeoutException::class)
fun newConnection(): PooledConnection {
var conn = _connQueue.poll()
if (conn == null) {
if (_connCreatedCount.getAndIncrement() < maxConnectionCount) {
try {
conn = PooledConnection(_factory.newConnection(), _connQueue)
} catch (e: Exception) {
_connCreatedCount.decrementAndGet()
_logger.error("創建RabbitMQ連接出錯", e)
throw e
}
} else {
_connCreatedCount.decrementAndGet()
conn = _connQueue.take()
}
}
return conn
}
}
注意newConnection方法使用了AtomicInteger保證執行緒安全,
再來看PooledConnection,它實作了Closeable介面,而我是用kotlin寫的代碼,對于Closeable介面,kotlin提供了一個擴展函式use(),use函式會在代碼塊執行后自動關閉呼叫者(無論中間是否出現例外),類似于C#的using()操作,等會我們就會看到如何使用,
class PooledConnection(private val connection: Connection, private val container: BlockingQueue<PooledConnection>) : Closeable {
private val _logger: Logger by lazy {
LoggerFactory.getLogger(PooledConnection::class.java)
}
override fun close() {
val offered = container.offer(this)
if (!offered) {
val message = "RabbitMQ連接池已滿,無法釋放當前連接"
_logger.error(message)
throw IOException(message)
}
}
fun get() = connection
}
注意close()函式不是真的close,而是將Connection放回連接池,如果用的是RabbitMQ.Connection的話,就直接關閉了,
get()函式將RabbitMQ.Connection暴露出來供生產者和消費者使用,
That's all! 關于連接池的代碼就這么簡單,Channel池也可照貓畫虎,以此類推:)
使用的話,以生產端為例:
/**
* 發送訊息
*
* @param data 需要發送的資料
* @param exchange the name of the exchange sent to
* @param routeKey 路由鍵,用于exchange投遞訊息到佇列
*/
@Throws(IOException::class)
fun send(data: Any, exchange: String, routeKey: String) = factory.newConnection().use{
val conn = it.get()
val channel = conn.createChannel()
try {
val properties = AMQP.BasicProperties.Builder()
.contentType("application/json")
.deliveryMode(2) //訊息持久化,防處理之前丟失,默認1,
.build()
it.basicPublish(exchange, routeKey, properties, JSON.toJSONString(data).toByteArray())
}catch (e: Exception) {
logger.error(e.message)
throw e
} finally {
channel.close()
}
}
so easy! 注意use()的用法,
Channel池可以類似方式實作,
參考資料
RabbitMQ和Kafka到底怎么選?
Kafka與RabbitMQ區別
RabbitMQ發布訂閱實戰-實作延時重試佇列
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/1038.html
標籤:架構設計
