主頁 > 軟體設計 > RabbitMQ入門指南

RabbitMQ入門指南

2020-09-10 05:55:35 軟體設計

訊息佇列(Message Queue,以下簡稱MQ)常用于異步系統的資料傳遞,若不用MQ,我們只能[在應用層]使用輪詢或介面回呼等方式處理,這在效率或耦合度上是難以讓人滿意的,當然我們也可以在系統間保持一個長連接,基于底層socket機制進行資料的實時收發,如果再將這部分功能獨立成一個中間件,供專案中所有系統使用,就是我們今天所指的MQ,


對比&選擇

以下以當前較為流行社區活躍度較高的兩個MQ——RabbitMQKafka做一比較,順帶提一提redis

簡單的小型系統可以使用redis,redis簡單易用,本身就提供了佇列結構,也支持發布訂閱模式,不過說到底redis是一個快取資料庫,主要職責并不是訊息佇列,缺少訊息可達(防丟失)、可靠性(分布式、集群)、異步、事務處理等特性,需要應用層額外處理,

RabbitMQ:erlang開發,單機吞吐高,但是它只支持集群模式,不支持分布式,可靠性依靠的是集群中分屬兩個不同節點的master queuemirror queue同步資料,在master queue所在節點掛掉之后,系統把mirror queue提升為master queue,負責處理客戶端佇列操作請求,注意,mirror queue只做鏡像,設計目的不是為了承擔客戶端讀寫壓力,讀寫都走的master queue,這就有了單點性能瓶頸,RabbitMQ支持消費端pull和push模式,

Kafka:Scala開發,支持分布式,因此如果是相同佇列,集群吞吐量肯定是大于RabbitMQ的,Kafka只支持pull模式,pull有個缺點是,如果broker沒有可供消費的訊息,將導致consumer不斷輪詢,直到新訊息到達,為了避免這點,Kafka有個引數可以讓consumer阻塞知道新訊息到達(當然也可以阻塞直到訊息的數量達到某個特定的量這樣就可以批量獲取),如此個人認為在訊息傳輸不是很頻繁的場景下反而比push更好,即減少了輪詢次數,又不需要永遠占著一個連接,實時性也基本上能得到保障,RabbitMQ pull模式并不支持此機制,

其實對于吞吐量而言,除非我們預期有百萬級并發,否則兩者差別不大,另外對于上述RabbitMQ每個佇列的單點瓶頸,我們可以將一個佇列按一定邏輯拆分為多個佇列,在業務端將訊息分流,也能提高吞吐量,相比Kafka,RabbitMQ提供了較為完備的訊息路由、訊息到期洗掉/延遲/預定、訊息容錯機制,這些功能可不是短期內靠堆硬體能完成的,對此有要求的話,那么優選RabbitMQ沒錯了,由此我們也知道了為什么Kafka常用于日志系統,一是日志相對業務來說寫操作例外頻繁,可能一次請求會產生數十條日志,需要較高的吞吐量,且關聯日志一般都是跨系統跨業務的,無法進行細粒度拆分,限制了RabbitMQ提升吞吐量的空間;另外日志記錄對一致性實時性等要求不高,不需要什么策略,稍有丟失也無關大雅,無法體現RabbitMQ的優勢,

綜上所述,業務層建議使用RabbitMQ,


RabbitMQ概念及注意點

RabbitMQ主要概念:

  1. Connection:在RabbitMQ中指的是AMQP 0-9-1 connection,它與底層的TCP鏈接是一一對應的,
  2. Channel:信道,用于訊息的傳遞,
  3. Queue:佇列,訊息通過交換機被投遞到這里,
  4. Exchange:交換機,用于訊息路由,通過routeKey決定將訊息投遞到哪個佇列,有四種模式(fanout、direct、topic、header),
  5. routeKey:路由鍵,
  6. DeadLetter:死信機制,

關于它們的介紹網上資料很多,這里就不贅述了,我們把注意點放到具體細節上,以下部分摘自RabbitMQ最佳實踐,建議先了解了上述RabbitMQ主要概念再看,

在RabbitMQ中,訊息確認分為發送方確認和消費方確認兩個確認環節,

  • 發送端:
    ConfirmListener:訊息是否到達exchange的回呼,需要實作兩個方法——handleAckhandleNack,通常來講,發送端只需要保證訊息能夠發送到exchange即可,而無需關注訊息是否被正確地投遞到了某個queue,這個是RabbitMQ和訊息的接收方需要考慮的事情,基于此,如果RabbitMQ找不到任何需要投遞的queue,那么依然會ack給發送方,此時發送方可以認為訊息已經正確投遞,而不用關心訊息沒有queue接收的問題,此時可以為exchange設定alternate-exchange,即表示rabbitmq將把無法投遞到任何queue的訊息發送到alternate-exchange指定的exchange中,此時該指定的exchange就是一個死信交換機(DLX,所以DLX與普通交換機并無不同,只不過路由的是一些無法處理的訊息而已),
    ReturnListener:事實上,對于exchange存在但是卻找不到任何接收queue時,如果發送時設定了mandatory=true,那么在訊息被ack前將return給發送端,此時發送端可以創建一個ReturnListener用于接識訓傳的訊息,
    需要注意的是,在發送訊息時如果exchange不存在,訊息會被直接丟棄,并且不會ack或者nack操作,
  • 消費端:訊息默認是直接ack的,即訊息到達消費方立即ack,而不管消費方業務處理是否成功,大部分情況我們需要業務處理完畢才認為此訊息被正確消費了,為此可以開啟手動確認模式,即有消費方自行決定何時應該ack,通過設定autoAck=false開啟手動確認模式,
    requeue:消費端nack或reject時設定,告知rq是否將訊息重新投遞,
    默認情況下,queue中被拋棄的訊息將被直接丟掉,但是可以通過設定queue的x-dead-letter-exchange引數,將被拋棄的訊息發送到x-dead-letter-exchange中指定的exchange中,這樣的exchange成為DLX,

Lazy Queue:一個重要的設計目標是能夠支持更長的佇列,即支持更多的訊息存盤,惰性佇列會將接收到的訊息直接存入檔案系統中,而不管是持久化的或者是非持久化的,注意如果惰性佇列中存盤的是非持久化的訊息,記憶體的使用率會一直很穩定,但是重啟之后訊息一樣會丟失,

實測在topic模式下,例如test#是沒用的,無法匹配test1,需要配置為test.#,也許這是RabbitMQ所要求的規范吧,

在RabbitMQ中,使用一個還是多個exchange,似乎網上并沒有關于這方面的廣泛討論(可見性能上兩種方案并無顯著差別),so,我們就從不增加復雜度出發,保持一個exchange對應多個queue的簡單模式,或按業務劃分,

創建鏈接時可以提供一個ConnectionName,如newConnection(ConnectionName),然而ConnectionName似乎只是給人看的(比如在管理后臺),并不要求唯一性,

接下來我們聊下Connection和Channel,


Connection和Channel

為什么要將這兩個東西單獨拎出來講呢,因為RabbitMQ并沒有為我們提供一個開箱即用的鏈接復用組件,眾所周知,Connection這東西,創建和銷毀是一筆不小的開支,然而以官方提供的Java Client SDK為例,ConnectionFactory.newConnection()每次都會new一個新的Connection,SDK并沒有內置連接池,這塊作業就需要另外處理,
NIO里也有這兩個概念(不了解NIO的可參看博主以前寫的也談Reactor模式),而它們都有鏈接復用的意思,也許RabbitMQ就是參考了NIO呢,

而Channel是干嘛的?資料傳輸嘛,Connection就做了,有了Connection為什么還要Channel?其實Channel是為了在另外一個層面復用Connection———解決多執行緒資料并發傳輸的問題,直接操作Connection進行資料傳輸,當有多個執行緒同時操作時,很容易出現資料幀錯亂的情況,一個Connection可以create多個Channel,訊息的收發、路由等操作都是和某個Channel系結而非Connection,每個訊息都由一個Channel ID標識,然而,竊以為這種細節完全可以對使用者隱藏,暴露出來反而會增加復雜度,關于Channel的使用方式和注意事項,官方檔案給出了一些描述,具體編碼時需要考量,

As a rule of thumb, Applications should prefer using a Channel per thread instead of sharing the same Channel across multiple threads.
官方建議每個執行緒使用一個Channel,不同執行緒最好不要共享Channel,否則并發時容易產生資料幀交錯(同多個執行緒直接共用一個Connection一樣),這種情況下exchange會直接關閉下層Connection,
Channels consume resources,所以在一個行程中同時存在成百上千個打開狀態的Channel不是一個好注意,但是用完即關也不是一個好主意, A classic anti-pattern to be avoided is opening a channel for each published message. Channels are supposed to be reasonably long-lived and opening a new one is a network round-trip which makes this pattern extremely inefficient. 開辟一個新的channel需要一個網路的往返,這種模式是很低效的,

Consuming in one thread and publishing in another thread on a shared channel can be safe.

一個Connection上的多個Channel的調度是由一個java.util.concurrent.ExecutorService負責的,我們可以使用ConnectionFactory#setSharedExecutor設定自定義調度器,

在消費者端,訊息Ack需要由接收訊息(received the delivery)的執行緒完成,否則可能會產生Channel級別的例外,并且Channel會被關閉,

總得來說,Channel也需要復用,但是數量可以比Connection多一兩個數量級,我們可以設計一個簡單的連接池方案PooledConnectionFactory,它是一個Connection容器,保持若干long-lived Connection提供給外部使用,而每個Connection又有自己的PooledChannelFactory,其中維持著一些long-lived Channel,Spring-boot提供了一個AMQP組件Spring AMQP,已經幫我們實作了類似的方案,并且還隱藏了Channel這個東東,但Spring的原罪——代碼碎片化,注解滿天飛——又增加了組件本身使用的復雜度,且無法掌控細節,當然它還提供了其它一些可有可無的特性,其實,我們只需要一個簡單的連接池而已,so,讓我們自己實作吧,


簡單連接池實作&使用

直接上代碼,先定義一個鏈接配置類:

@Component
data class ConnectionConfig(
    @Value("\${rabbitmq.userName:guest}")
    var userName: String, 
    @Value("\${rabbitmq.password:guest}")
    var password: String,
    @Value("\${rabbitmq.host:localhost}")
    var host: String,
    @Value("\${rabbitmq.port:5672}")
    var port: Int,
    @Value("\${rabbitmq.virtualHost:/}")
    var virtualHost: String
)

配置項可在組態檔中配置,

工廠類,內部有個BlockingQueue存放PooledConnection實體,PooledConnection封裝了RabbitMQ的Connection,至于為啥要封裝一層稍后說,

@Component
class PooledConnectionFactory(@Autowired private val connectionConfig: ConnectionConfig,
                              @Value("\${rabbitmq.maxConnectionCount:5}")
                              private val maxConnectionCount: Int) {
    private val _logger: Logger by lazy {
        LoggerFactory.getLogger(PooledConnectionFactory::class.java)
    }

    private val _connQueue = ArrayBlockingQueue<PooledConnection>(maxConnectionCount)

    //已創建了幾個connection
    private val _connCreatedCount = AtomicInteger()

    private val _factory by lazy {
        buildConnectionFactory()
    }

    private fun buildConnectionFactory(): ConnectionFactory {
        val factory = ConnectionFactory()
        with(connectionConfig) {
            factory.username = userName
            factory.password = password
            factory.virtualHost = virtualHost
            factory.host = host
            factory.port = port
        }
        return factory
    }

    @Throws(IOException::class, TimeoutException::class)
    fun newConnection(): PooledConnection {
        var conn = _connQueue.poll()
        if (conn == null) {
            if (_connCreatedCount.getAndIncrement() < maxConnectionCount) {
                try {
                    conn = PooledConnection(_factory.newConnection(), _connQueue)
                } catch (e: Exception) {
                    _connCreatedCount.decrementAndGet()
                    _logger.error("創建RabbitMQ連接出錯", e)
                    throw e
                }
            } else {
                _connCreatedCount.decrementAndGet()
                conn = _connQueue.take()
            }
        }
        return conn
    }
}

注意newConnection方法使用了AtomicInteger保證執行緒安全,

再來看PooledConnection,它實作了Closeable介面,而我是用kotlin寫的代碼,對于Closeable介面,kotlin提供了一個擴展函式use(),use函式會在代碼塊執行后自動關閉呼叫者(無論中間是否出現例外),類似于C#的using()操作,等會我們就會看到如何使用,

class PooledConnection(private val connection: Connection, private val container: BlockingQueue<PooledConnection>) : Closeable {
    private val _logger: Logger by lazy {
        LoggerFactory.getLogger(PooledConnection::class.java)
    }

    override fun close() {
        val offered = container.offer(this)
        if (!offered) {
            val message = "RabbitMQ連接池已滿,無法釋放當前連接"
            _logger.error(message)
            throw IOException(message)
        }
    }

    fun get() = connection
}

注意close()函式不是真的close,而是將Connection放回連接池,如果用的是RabbitMQ.Connection的話,就直接關閉了,
get()函式將RabbitMQ.Connection暴露出來供生產者和消費者使用,

That's all! 關于連接池的代碼就這么簡單,Channel池也可照貓畫虎,以此類推:)

使用的話,以生產端為例:

    /**
     * 發送訊息
     *
     * @param data 需要發送的資料
     * @param exchange the name of the exchange sent to
     * @param routeKey 路由鍵,用于exchange投遞訊息到佇列
     */
    @Throws(IOException::class)
    fun send(data: Any, exchange: String, routeKey: String) = factory.newConnection().use{
        val conn = it.get()
        val channel = conn.createChannel()
        try {
            val properties = AMQP.BasicProperties.Builder()
                .contentType("application/json")
                .deliveryMode(2) //訊息持久化,防處理之前丟失,默認1,
                .build()
            it.basicPublish(exchange, routeKey, properties, JSON.toJSONString(data).toByteArray())
        }catch (e: Exception) {
            logger.error(e.message)
            throw e
        } finally {
            channel.close()
        }            
    }

so easy! 注意use()的用法,

Channel池可以類似方式實作,


參考資料

RabbitMQ和Kafka到底怎么選?
Kafka與RabbitMQ區別
RabbitMQ發布訂閱實戰-實作延時重試佇列

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/1038.html

標籤:架構設計

上一篇:Linux系統環境基于Docker搭建Jenkins實戰

下一篇:.net core學習筆記,組件篇:服務的注冊與發現(Consul)初篇(一)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more