Redis實作分布式鎖的原理
前面講了Redis在實際業務場景中的應用,那么下面再來了解一下Redisson功能性場景的應用,也就是大家經常使用的分布式鎖的實作場景,
-
引入redisson依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.16.0</version> </dependency> -
撰寫簡單的測驗代碼
public class RedissonTest { private static RedissonClient redissonClient; static { Config config=new Config(); config.useSingleServer().setAddress("redis://192.168.221.128:6379"); redissonClient=Redisson.create(config); } public static void main(String[] args) throws InterruptedException { RLock rLock=redissonClient.getLock("updateOrder"); //最多等待100秒、上鎖10s以后自動解鎖 if(rLock.tryLock(100,10,TimeUnit.SECONDS)){ System.out.println("獲取鎖成功"); } Thread.sleep(2000); rLock.unlock(); redissonClient.shutdown(); } }
Redisson分布式鎖的實作原理
你們會發現,通過redisson,非常簡單就可以實作我們所需要的功能,當然這只是redisson的冰山一角,redisson最強大的地方就是提供了分布式特性的常用工具類,使得原本作為協調單機多執行緒并發程式的并發程式的工具包獲得了協調分布式多級多執行緒并發系統的能力,降低了程式員在分布式環境下解決分布式問題的難度,下面分析一下RedissonLock的實作原理
RedissonLock.tryLock
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
//通過tryAcquire方法嘗試獲取鎖
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) { //表示成功獲取到鎖,直接回傳
return true;
}
//省略部分代碼....
}
tryAcquire
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
//leaseTime就是租約時間,就是redis key的過期時間,
if (leaseTime != -1) { //如果設定過期時間
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {//如果沒設定了過期時間,則從配置中獲取key超時時間,默認是30s過期
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
//當tryLockInnerAsync執行結束后,觸發下面回呼
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { //說明出現例外,直接回傳
return;
}
// lock acquired
if (ttlRemaining == null) { //表示第一次設定鎖鍵
if (leaseTime != -1) { //表示設定過超時時間,更新internalLockLeaseTime,并回傳
internalLockLeaseTime = unit.toMillis(leaseTime);
} else { //leaseTime=-1,啟動Watch Dog
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
tryLockInnerAsync
通過lua腳本來實作加鎖的操作
-
判斷lock鍵是否存在,不存在直接呼叫hset存盤當前執行緒資訊并且設定過期時間,回傳nil,告訴客戶端直接獲取到鎖,
-
判斷lock鍵是否存在,存在則將重入次數加1,并重新設定過期時間,回傳nil,告訴客戶端直接獲取到鎖,
-
被其它執行緒已經鎖定,回傳鎖有效期的剩余時間,告訴客戶端需要等待,
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
關于Lua腳本,我們稍后再解釋,
unlock釋放鎖流程
釋放鎖的流程,腳本看起來會稍微復雜一點
-
如果lock鍵不存在,通過
publish指令發送一個訊息表示鎖已經可用, -
如果鎖不是被當前執行緒鎖定,則回傳nil
-
由于支持可重入,在解鎖時將重入次數需要減1
-
如果計算后的重入次數>0,則重新設定過期時間
-
如果計算后的重入次數<=0,則發訊息說鎖已經可用
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
RedissonLock有競爭的情況
有競爭的情況在redis端的lua腳本是相同的,只是不同的條件執行不同的redis命令,當通過tryAcquire發現鎖被其它執行緒申請時,需要進入等待競爭邏輯中
-
this.await回傳false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱并回傳獲取鎖失敗
-
this.await回傳true,進入回圈嘗試獲取鎖,
繼續看RedissonLock.tryLock后半部分代碼如下:
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//省略部分代碼
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
// 訂閱監聽redis訊息,并且創建RedissonLockEntry
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// 阻塞等待subscribe的future的結果物件,如果subscribe方法呼叫超過了time,說明已經超過了客戶端設定的最大wait time,則直接回傳false,取消訂閱,不再繼續申請鎖了,
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) { //取消訂閱
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId); //表示搶占鎖失敗
return false; //回傳false
}
try {
//判斷是否超時,如果等待超時,回傳獲的鎖失敗
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
//通過while回圈再次嘗試競爭鎖
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //競爭鎖,回傳鎖超時時間
// lock acquired
if (ttl == null) { //如果超時時間為null,說明獲得鎖成功
return true;
}
//判斷是否超時,如果超時,表示獲取鎖失敗
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 通過信號量(共享鎖)阻塞,等待解鎖訊息. (減少申請鎖呼叫的頻率)
// 如果剩余時間(ttl)小于wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可),
// 否則就在wait time 時間范圍內等待可以通過信號量
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 更新等待時間(最大等待時間-已經消耗的阻塞時間)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) { //獲取鎖失敗
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId); //取消訂閱
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
鎖過期了怎么辦?
一般來說,我們去獲得分布式鎖時,為了避免死鎖的情況,我們會對鎖設定一個超時時間,但是有一種情況是,如果在指定時間內當前執行緒沒有執行完,由于鎖超時導致鎖被釋放,那么其他執行緒就會拿到這把鎖,從而導致一些故障,
為了避免這種情況,Redisson引入了一個Watch Dog機制,這個機制是針對分布式鎖來實作鎖的自動續約,簡單來說,如果當前獲得鎖的執行緒沒有執行完,那么Redisson會自動給Redis中目標key延長超時時間,
默認情況下,看門狗的續期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定,
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit); //leaseTime=-1
}
實際上,當我們通過tryLock方法沒有傳遞超時時間時,默認會設定一個30s的超時時間,避免出現死鎖的問題,
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else { //當leaseTime為-1時,leaseTime=internalLockLeaseTime,默認是30s,表示當前鎖的過期時間,
//this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { //說明出現例外,直接回傳
return;
}
// lock acquired
if (ttlRemaining == null) { //表示第一次設定鎖鍵
if (leaseTime != -1) { //表示設定過超時時間,更新internalLockLeaseTime,并回傳
internalLockLeaseTime = unit.toMillis(leaseTime);
} else { //leaseTime=-1,啟動Watch Dog
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
由于默認設定了一個30s的過期時間,為了防止過期之后當前執行緒還未執行完,所以通過定時任務對過期時間進行續約,
- 首先,會先判斷在expirationRenewalMap中是否存在了entryName,這是個map結構,主要還是判斷在這個服務實體中的加鎖客戶端的鎖key是否存在,
- 如果已經存在了,就直接回傳;主要是考慮到RedissonLock是可重入鎖,
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {// 第一次加鎖的時候會呼叫,內部會啟動WatchDog
entry.addThreadId(threadId);
renewExpiration();
}
}
定義一個定時任務,該任務中呼叫renewExpirationAsync方法進行續約,
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//用到了時間輪機制
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// renewExpirationAsync續約租期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次間隔租期的1/3時間執行
ee.setTimeout(task);
}
執行Lua腳本,對指定的key進行續約,
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
Lua腳本
Lua是一個高效的輕量級腳本語言(和JavaScript類似),用標準C語言撰寫并以源代碼形式開放, 其設計目的是為了嵌入應用程式中,從而為應用程式提供靈活的擴展和定制功能,Lua在葡萄牙語中是“月亮”的意思,它的logo形式衛星,寓意是Lua是一個“衛星語言”,能夠方便地嵌入到其他語言中使用;其實在很多常見的框架中,都有嵌入Lua腳本的功能,比如OpenResty、Redis等,
使用Lua腳本的好處:
-
減少網路開銷,在Lua腳本中可以把多個命令放在同一個腳本中運行
-
原子操作,redis會將整個腳本作為一個整體執行,中間不會被其他命令插入,換句話說,撰寫腳本的程序中無需擔心會出現競態條件
-
復用性,客戶端發送的腳本會永遠存盤在redis中,這意味著其他客戶端可以復用這一腳本來完成同樣的邏輯
Lua的下載和安裝
Lua是一個獨立的腳本語言,所以它有專門的編譯執行工具,下面簡單帶大家安裝一下,
-
下載Lua原始碼包: https://www.lua.org/download.html
https://www.lua.org/ftp/lua-5.4.3.tar.gz
-
安裝步驟如下
tar -zxvf lua-5.4.3.tar.gz cd lua-5.4.3 make linux make install
如果報錯,說找不到readline/readline.h, 可以通過yum命令安裝
yum -y install readline-devel ncurses-devel
最后,直接輸入lua命令即可進入lua的控制臺,Lua腳本有自己的語法、變數、邏輯運算子、函式等,這塊我就不在這里做過多的說明,用過JavaScript的同學,應該只需要花幾個小時就可以全部學完,簡單演示兩個案例如下,
array = {"Lua", "mic"}
for i= 0, 2 do
print(array[i])
end
array = {"mic", "redis"}
for key,value in ipairs(array)
do
print(key, value)
end
Redis與Lua
Redis中集成了Lua的編譯和執行器,所以我們可以在Redis中定義Lua腳本去執行,同時,在Lua腳本中,可以直接呼叫Redis的命令,來操作Redis中的資料,
redis.call(‘set’,'hello','world')
local value=https://www.cnblogs.com/mic112/archive/2021/10/21/redis.call(‘get’,’hello’)
redis.call 函式的回傳值就是redis命令的執行結果,前面我們介紹過redis的5中型別的資料回傳的值的型別也都不一樣,redis.call函式會將這5種型別的回傳值轉化對應的Lua的資料型別
在很多情況下我們都需要腳本可以有回傳值,畢竟這個腳本也是一個我們所撰寫的命令集,我們可以像呼叫其他redis內置命令一樣呼叫我們自己寫的腳本,所以同樣redis會自動將腳本返回值的Lua資料型別轉化為Redis的回傳值型別, 在腳本中可以使用return 陳述句將值回傳給redis客戶端,通過return陳述句來執行,如果沒有執行return,默認回傳為nil,
Redis中執行Lua腳本相關的命令
撰寫完腳本后最重要的就是在程式中執行腳本,Redis提供了EVAL命令可以使開發者像呼叫其他Redis內置命令一樣呼叫腳本,
EVAL命令-執行腳本
[EVAL] [腳本內容] [key引數的數量] [key …] [arg …]
可以通過key和arg這兩個引數向腳本中傳遞資料,他們的值可以在腳本中分別使用KEYS和ARGV 這兩個型別的全域變數訪問,
比如我們通過腳本實作一個set命令,通過在redis客戶端中呼叫,那么執行的陳述句是:
eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello
上述腳本相當于使用Lua腳本呼叫了Redis的set命令,存盤了一個key=lua,value=https://www.cnblogs.com/mic112/archive/2021/10/21/hello到Redis中,
EVALSHA命令
考慮到我們通過eval執行lua腳本,腳本比較長的情況下,每次呼叫腳本都需要把整個腳本傳給redis,比較占用帶寬,為了解決這個問題,redis提供了EVALSHA命令允許開發者通過腳本內容的SHA1摘要來執行腳本,該命令的用法和EVAL一樣,只不過是將腳本內容替換成腳本內容的SHA1摘要
-
Redis在執行EVAL命令時會計算腳本的SHA1摘要并記錄在腳本快取中
-
執行EVALSHA命令時Redis會根據提供的摘要從腳本快取中查找對應的腳本內容,如果找到了就執行腳本,否則回傳“NOSCRIPT No matching script,Please use EVAL”
# 將腳本加入快取并生成sha1命令
script load "return redis.call('get','lua')"
# ["13bd040587b891aedc00a72458cbf8588a27df90"]
# 傳遞sha1的值來執行該命令
evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0
Redisson執行Lua腳本
通過lua腳本來實作一個訪問頻率限制功能,
思路,定義一個key,key中包含ip地址, value為指定時間內的訪問次數,比如說是10秒內只能訪問3次,
-
定義Lua腳本,
local times=redis.call('incr',KEYS[1]) -- 如果是第一次進來,設定一個過期時間 if times == 1 then redis.call('expire',KEYS[1],ARGV[1]) end -- 如果在指定時間內訪問次數大于指定次數,則回傳0,表示訪問被限制 if times > tonumber(ARGV[2]) then return 0 end -- 回傳1,允許被訪問 return 1 -
定義controller,提供訪問測驗方法
@RestController public class RedissonController { @Autowired RedissonClient redissonClient; private final String LIMIT_LUA= "local times=redis.call('incr',KEYS[1])\n" + "if times == 1 then\n" + " redis.call('expire',KEYS[1],ARGV[1])\n" + "end\n" + "if times > tonumber(ARGV[2]) then\n" + " return 0\n" + "end\n" + "return 1"; @GetMapping("/lua/{id}") public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException { List<Object> keys= Arrays.asList("LIMIT:"+id); RFuture<Object> future=redissonClient.getScript(). evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3); return future.get().toString(); } }
需要注意,上述腳本執行的時候會有問題,因為redis默認的序列化方式導致value的值在傳遞到腳本中時,轉成了物件型別,需要修改redisson.yml檔案,增加codec的序列化方式,
-
application.yml
spring: redis: redisson: file: classpath:redisson.yml -
redisson.yml
singleServerConfig: address: redis://192.168.221.128:6379 codec: !<org.redisson.codec.JsonJacksonCodec> {}
Lua腳本的原子性
redis的腳本執行是原子的,即腳本執行期間Redis不會執行其他命令,所有的命令必須等待腳本執行完以后才能執行,為了防止某個腳本執行時間程序導致Redis無法提供服務,Redis提供了lua-time-limit引數限制腳本的最長運行時間,默認是5秒鐘,
非事務性操作
當腳本運行時間超過這個限制后,Redis將開始接受其他命令但不會執行(以確保腳本的原子性),而是回傳BUSY的錯誤,下面演示一下這種情況,
打開兩個客戶端視窗,在第一個視窗中執行lua腳本的死回圈
eval "while true do end" 0
在第二個視窗中運行get lua,會得到如下的例外,
(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.
我們會發現執行結果是Busy, 接著我們通過script kill 的命令終止當前執行的腳本,第二個視窗的顯示又恢復正常了,
存在事務性操作
如果當前執行的Lua腳本對Redis的資料進行了修改(SET、DEL等),那么通過SCRIPT KILL 命令是不能終止腳本運行的,因為要保證腳本運行的原子性,如果腳本執行了一部分終止,那就違背了腳本原子性的要求,最終要保證腳本要么都執行,要么都不執行
同樣打開兩個視窗,第一個視窗運行如下命令
eval "redis.call('set','name','mic') while true do end" 0
在第二個視窗運行
get lua
結果一樣,仍然是busy,但是這個時候通過script kill命令,會發現報錯,沒辦法kill,
(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.
遇到這種情況,只能通過shutdown nosave命令來強行終止redis,
shutdown nosave和shutdown的區別在于 shutdown nosave不會進行持久化操作,意味著發生在上一次快照后的資料庫修改都會丟失,
Redisson的Lua腳本
了解了lua之后,我們再回過頭來看看Redisson的Lua腳本,就不難理解了,
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
Redis中的Pub/Sub機制
下面是Redisson中釋放鎖的代碼,在代碼中我們發現一個publish的指令redis.call('publish', KEYS[2], ARGV[1]),這個指令是干啥的呢?
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
Redis提供了一組命令可以讓開發者實作“發布/訂閱”模式(publish/subscribe) . 該模式同樣可以實作行程間的訊息傳遞,它的實作原理是:
-
發布/訂閱模式包含兩種角色,分別是發布者和訂閱者,訂閱者可以訂閱一個或多個頻道,而發布者可以向指定的頻道發送訊息,所有訂閱此頻道的訂閱者都會收到該訊息
-
發布者發布訊息的命令是PUBLISH, 用法是
PUBLISH channel message比如向channel.1發一條訊息:hello
PUBLISH channel.1 “hello”
這樣就實作了訊息的發送,該命令的回傳值表示接收到這條訊息的訂閱者數量,因為在執行這條命令的時候還沒有訂閱者訂閱該頻道,所以回傳為0. 另外值得注意的是訊息發送出去不會持久化,如果發送之前沒有訂閱者,那么后續再有訂閱者訂閱該頻道,之前的訊息就收不到了
訂閱者訂閱訊息的命令是:
SUBSCRIBE channel [channel …]
該命令同時可以訂閱多個頻道,比如訂閱channel.1的頻道:SUBSCRIBE channel.1,執行SUBSCRIBE命令后客戶端會進入訂閱狀態,
一般情況下,我們不會用pub/sub來做訊息發送機制,畢竟有這么多MQ技術在,
關注[跟著Mic學架構]公眾號,獲取更多精品原創

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/330023.html
標籤:其他
上一篇:ArrayList和Vector
