主頁 > 後端開發 > 從原始碼層面深度剖析Redisson實作分布式鎖的原理(全程干貨,注意收藏)

從原始碼層面深度剖析Redisson實作分布式鎖的原理(全程干貨,注意收藏)

2021-10-22 06:19:24 後端開發

Redis實作分布式鎖的原理

前面講了Redis在實際業務場景中的應用,那么下面再來了解一下Redisson功能性場景的應用,也就是大家經常使用的分布式鎖的實作場景,

  • 引入redisson依賴

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.16.0</version>
    </dependency>
    
  • 撰寫簡單的測驗代碼

    public class RedissonTest {
    
        private static RedissonClient redissonClient;
    
        static {
            Config config=new Config();
            config.useSingleServer().setAddress("redis://192.168.221.128:6379");
            redissonClient=Redisson.create(config);
        }
    
        public static void main(String[] args) throws InterruptedException {
            RLock rLock=redissonClient.getLock("updateOrder");
            //最多等待100秒、上鎖10s以后自動解鎖
            if(rLock.tryLock(100,10,TimeUnit.SECONDS)){
                System.out.println("獲取鎖成功");
            }
            Thread.sleep(2000);
            rLock.unlock();
    
            redissonClient.shutdown();
        }
    }
    
    

Redisson分布式鎖的實作原理

你們會發現,通過redisson,非常簡單就可以實作我們所需要的功能,當然這只是redisson的冰山一角,redisson最強大的地方就是提供了分布式特性的常用工具類,使得原本作為協調單機多執行緒并發程式的并發程式的工具包獲得了協調分布式多級多執行緒并發系統的能力,降低了程式員在分布式環境下解決分布式問題的難度,下面分析一下RedissonLock的實作原理

RedissonLock.tryLock

@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    //通過tryAcquire方法嘗試獲取鎖
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) { //表示成功獲取到鎖,直接回傳
        return true;
    }
	//省略部分代碼....
}

tryAcquire

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    //leaseTime就是租約時間,就是redis key的過期時間,
    if (leaseTime != -1) { //如果設定過期時間
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {//如果沒設定了過期時間,則從配置中獲取key超時時間,默認是30s過期
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    //當tryLockInnerAsync執行結束后,觸發下面回呼
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) { //說明出現例外,直接回傳
            return;
        }
        // lock acquired
        if (ttlRemaining == null) { //表示第一次設定鎖鍵
            if (leaseTime != -1) { //表示設定過超時時間,更新internalLockLeaseTime,并回傳
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { //leaseTime=-1,啟動Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

tryLockInnerAsync

通過lua腳本來實作加鎖的操作

  1. 判斷lock鍵是否存在,不存在直接呼叫hset存盤當前執行緒資訊并且設定過期時間,回傳nil,告訴客戶端直接獲取到鎖,

  2. 判斷lock鍵是否存在,存在則將重入次數加1,并重新設定過期時間,回傳nil,告訴客戶端直接獲取到鎖,

  3. 被其它執行緒已經鎖定,回傳鎖有效期的剩余時間,告訴客戶端需要等待,

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

關于Lua腳本,我們稍后再解釋,

unlock釋放鎖流程

釋放鎖的流程,腳本看起來會稍微復雜一點

  1. 如果lock鍵不存在,通過publish指令發送一個訊息表示鎖已經可用,

  2. 如果鎖不是被當前執行緒鎖定,則回傳nil

  3. 由于支持可重入,在解鎖時將重入次數需要減1

  4. 如果計算后的重入次數>0,則重新設定過期時間

  5. 如果計算后的重入次數<=0,則發訊息說鎖已經可用

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

RedissonLock有競爭的情況

有競爭的情況在redis端的lua腳本是相同的,只是不同的條件執行不同的redis命令,當通過tryAcquire發現鎖被其它執行緒申請時,需要進入等待競爭邏輯中

  1. this.await回傳false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱并回傳獲取鎖失敗

  2. this.await回傳true,進入回圈嘗試獲取鎖,

繼續看RedissonLock.tryLock后半部分代碼如下:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//省略部分代碼
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        current = System.currentTimeMillis();
       // 訂閱監聽redis訊息,并且創建RedissonLockEntry
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
      // 阻塞等待subscribe的future的結果物件,如果subscribe方法呼叫超過了time,說明已經超過了客戶端設定的最大wait time,則直接回傳false,取消訂閱,不再繼續申請鎖了,
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) { //取消訂閱
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId); //表示搶占鎖失敗
            return false; //回傳false
        }
        try {
            //判斷是否超時,如果等待超時,回傳獲的鎖失敗
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
            //通過while回圈再次嘗試競爭鎖
            while (true) { 
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId); //競爭鎖,回傳鎖超時時間
                // lock acquired
                if (ttl == null) { //如果超時時間為null,說明獲得鎖成功
                    return true;
                }
                //判斷是否超時,如果超時,表示獲取鎖失敗
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }

                // 通過信號量(共享鎖)阻塞,等待解鎖訊息.  (減少申請鎖呼叫的頻率)
				// 如果剩余時間(ttl)小于wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可),
				// 否則就在wait time 時間范圍內等待可以通過信號量
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                // 更新等待時間(最大等待時間-已經消耗的阻塞時間)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) { //獲取鎖失敗
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId); //取消訂閱
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

鎖過期了怎么辦?

一般來說,我們去獲得分布式鎖時,為了避免死鎖的情況,我們會對鎖設定一個超時時間,但是有一種情況是,如果在指定時間內當前執行緒沒有執行完,由于鎖超時導致鎖被釋放,那么其他執行緒就會拿到這把鎖,從而導致一些故障,

為了避免這種情況,Redisson引入了一個Watch Dog機制,這個機制是針對分布式鎖來實作鎖的自動續約,簡單來說,如果當前獲得鎖的執行緒沒有執行完,那么Redisson會自動給Redis中目標key延長超時時間,

默認情況下,看門狗的續期時間是30s,也可以通過修改Config.lockWatchdogTimeout來另行指定,

@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
    return tryLock(waitTime, -1, unit);  //leaseTime=-1
}

實際上,當我們通過tryLock方法沒有傳遞超時時間時,默認會設定一個30s的超時時間,避免出現死鎖的問題,

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime != -1) { 
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else { //當leaseTime為-1時,leaseTime=internalLockLeaseTime,默認是30s,表示當前鎖的過期時間,
        
        //this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                                               TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) { //說明出現例外,直接回傳
            return;
        }
        // lock acquired
        if (ttlRemaining == null) { //表示第一次設定鎖鍵
            if (leaseTime != -1) { //表示設定過超時時間,更新internalLockLeaseTime,并回傳
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else { //leaseTime=-1,啟動Watch Dog
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}

由于默認設定了一個30s的過期時間,為了防止過期之后當前執行緒還未執行完,所以通過定時任務對過期時間進行續約,

  • 首先,會先判斷在expirationRenewalMap中是否存在了entryName,這是個map結構,主要還是判斷在這個服務實體中的加鎖客戶端的鎖key是否存在,
  • 如果已經存在了,就直接回傳;主要是考慮到RedissonLock是可重入鎖,
protected void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {// 第一次加鎖的時候會呼叫,內部會啟動WatchDog
        entry.addThreadId(threadId);
        renewExpiration();
    
    }
}

定義一個定時任務,該任務中呼叫renewExpirationAsync方法進行續約,

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
	//用到了時間輪機制
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // renewExpirationAsync續約租期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);//每次間隔租期的1/3時間執行

    ee.setTimeout(task);
}

執行Lua腳本,對指定的key進行續約,

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

Lua腳本

Lua是一個高效的輕量級腳本語言(和JavaScript類似),用標準C語言撰寫并以源代碼形式開放, 其設計目的是為了嵌入應用程式中,從而為應用程式提供靈活的擴展和定制功能,Lua在葡萄牙語中是“月亮”的意思,它的logo形式衛星,寓意是Lua是一個“衛星語言”,能夠方便地嵌入到其他語言中使用;其實在很多常見的框架中,都有嵌入Lua腳本的功能,比如OpenResty、Redis等,

使用Lua腳本的好處:

  1. 減少網路開銷,在Lua腳本中可以把多個命令放在同一個腳本中運行

  2. 原子操作,redis會將整個腳本作為一個整體執行,中間不會被其他命令插入,換句話說,撰寫腳本的程序中無需擔心會出現競態條件

  3. 復用性,客戶端發送的腳本會永遠存盤在redis中,這意味著其他客戶端可以復用這一腳本來完成同樣的邏輯

Lua的下載和安裝

Lua是一個獨立的腳本語言,所以它有專門的編譯執行工具,下面簡單帶大家安裝一下,

  • 下載Lua原始碼包: https://www.lua.org/download.html

    https://www.lua.org/ftp/lua-5.4.3.tar.gz

  • 安裝步驟如下

    tar -zxvf lua-5.4.3.tar.gz
    cd lua-5.4.3
    make linux
    make install
    

如果報錯,說找不到readline/readline.h, 可以通過yum命令安裝

yum -y install readline-devel ncurses-devel

最后,直接輸入lua命令即可進入lua的控制臺,Lua腳本有自己的語法、變數、邏輯運算子、函式等,這塊我就不在這里做過多的說明,用過JavaScript的同學,應該只需要花幾個小時就可以全部學完,簡單演示兩個案例如下,

array = {"Lua", "mic"}
for i= 0, 2 do
   print(array[i])
end
array = {"mic", "redis"}

for key,value in ipairs(array)
do
   print(key, value)
end

Redis與Lua

Redis中集成了Lua的編譯和執行器,所以我們可以在Redis中定義Lua腳本去執行,同時,在Lua腳本中,可以直接呼叫Redis的命令,來操作Redis中的資料,

redis.call(‘set’,'hello','world')

local value=https://www.cnblogs.com/mic112/archive/2021/10/21/redis.call(‘get’,’hello’) 

redis.call 函式的回傳值就是redis命令的執行結果,前面我們介紹過redis的5中型別的資料回傳的值的型別也都不一樣,redis.call函式會將這5種型別的回傳值轉化對應的Lua的資料型別

在很多情況下我們都需要腳本可以有回傳值,畢竟這個腳本也是一個我們所撰寫的命令集,我們可以像呼叫其他redis內置命令一樣呼叫我們自己寫的腳本,所以同樣redis會自動將腳本返回值的Lua資料型別轉化為Redis的回傳值型別, 在腳本中可以使用return 陳述句將值回傳給redis客戶端,通過return陳述句來執行,如果沒有執行return,默認回傳為nil,

Redis中執行Lua腳本相關的命令

撰寫完腳本后最重要的就是在程式中執行腳本,Redis提供了EVAL命令可以使開發者像呼叫其他Redis內置命令一樣呼叫腳本,

EVAL命令-執行腳本

[EVAL] [腳本內容] [key引數的數量] [key …] [arg …]

可以通過key和arg這兩個引數向腳本中傳遞資料,他們的值可以在腳本中分別使用KEYSARGV 這兩個型別的全域變數訪問,

比如我們通過腳本實作一個set命令,通過在redis客戶端中呼叫,那么執行的陳述句是:

eval "return redis.call('set',KEYS[1],ARGV[1])" 1 lua hello

上述腳本相當于使用Lua腳本呼叫了Redis的set命令,存盤了一個key=lua,value=https://www.cnblogs.com/mic112/archive/2021/10/21/hello到Redis中,

EVALSHA命令

考慮到我們通過eval執行lua腳本,腳本比較長的情況下,每次呼叫腳本都需要把整個腳本傳給redis,比較占用帶寬,為了解決這個問題,redis提供了EVALSHA命令允許開發者通過腳本內容的SHA1摘要來執行腳本,該命令的用法和EVAL一樣,只不過是將腳本內容替換成腳本內容的SHA1摘要

  1. Redis在執行EVAL命令時會計算腳本的SHA1摘要并記錄在腳本快取中

  2. 執行EVALSHA命令時Redis會根據提供的摘要從腳本快取中查找對應的腳本內容,如果找到了就執行腳本,否則回傳“NOSCRIPT No matching script,Please use EVAL”

# 將腳本加入快取并生成sha1命令
script load "return redis.call('get','lua')"
# ["13bd040587b891aedc00a72458cbf8588a27df90"]
# 傳遞sha1的值來執行該命令
evalsha "13bd040587b891aedc00a72458cbf8588a27df90" 0

Redisson執行Lua腳本

通過lua腳本來實作一個訪問頻率限制功能,

思路,定義一個key,key中包含ip地址, value為指定時間內的訪問次數,比如說是10秒內只能訪問3次,

  • 定義Lua腳本,

    local times=redis.call('incr',KEYS[1])
    -- 如果是第一次進來,設定一個過期時間
    if times == 1 then
       redis.call('expire',KEYS[1],ARGV[1])
    end
    -- 如果在指定時間內訪問次數大于指定次數,則回傳0,表示訪問被限制
    if times > tonumber(ARGV[2]) then
       return 0
    end
    -- 回傳1,允許被訪問
    return 1
    
  • 定義controller,提供訪問測驗方法

    @RestController
    public class RedissonController {
        @Autowired
        RedissonClient redissonClient;
    
        private final String LIMIT_LUA=
            "local times=redis.call('incr',KEYS[1])\n" +
            "if times == 1 then\n" +
            "   redis.call('expire',KEYS[1],ARGV[1])\n" +
            "end\n" +
            "if times > tonumber(ARGV[2]) then\n" +
            "   return 0\n" +
            "end\n" +
            "return 1";
    
        @GetMapping("/lua/{id}")
        public String lua(@PathVariable("id") Integer id) throws ExecutionException, InterruptedException {
            List<Object> keys= Arrays.asList("LIMIT:"+id);
            RFuture<Object> future=redissonClient.getScript().
                evalAsync(RScript.Mode.READ_WRITE,LIMIT_LUA, RScript.ReturnType.INTEGER,keys,10,3);
            return future.get().toString();
        }
    
    }
    

需要注意,上述腳本執行的時候會有問題,因為redis默認的序列化方式導致value的值在傳遞到腳本中時,轉成了物件型別,需要修改redisson.yml檔案,增加codec的序列化方式,

  • application.yml

    spring:
      redis:
        redisson:
          file: classpath:redisson.yml
    
  • redisson.yml

    singleServerConfig:
      address: redis://192.168.221.128:6379
    
    codec: !<org.redisson.codec.JsonJacksonCodec> {}
    

Lua腳本的原子性

redis的腳本執行是原子的,即腳本執行期間Redis不會執行其他命令,所有的命令必須等待腳本執行完以后才能執行,為了防止某個腳本執行時間程序導致Redis無法提供服務,Redis提供了lua-time-limit引數限制腳本的最長運行時間,默認是5秒鐘,

非事務性操作

當腳本運行時間超過這個限制后,Redis將開始接受其他命令但不會執行(以確保腳本的原子性),而是回傳BUSY的錯誤,下面演示一下這種情況,

打開兩個客戶端視窗,在第一個視窗中執行lua腳本的死回圈

eval "while true do end" 0

在第二個視窗中運行get lua,會得到如下的例外,

(error) BUSY Redis is busy running a script. You can only call SCRIPT KILL or SHUTDOWN NOSAVE.

我們會發現執行結果是Busy, 接著我們通過script kill 的命令終止當前執行的腳本,第二個視窗的顯示又恢復正常了,

存在事務性操作

如果當前執行的Lua腳本對Redis的資料進行了修改(SET、DEL等),那么通過SCRIPT KILL 命令是不能終止腳本運行的,因為要保證腳本運行的原子性,如果腳本執行了一部分終止,那就違背了腳本原子性的要求,最終要保證腳本要么都執行,要么都不執行

同樣打開兩個視窗,第一個視窗運行如下命令

eval "redis.call('set','name','mic') while true do end" 0

在第二個視窗運行

get lua

結果一樣,仍然是busy,但是這個時候通過script kill命令,會發現報錯,沒辦法kill,

(error) UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command.

遇到這種情況,只能通過shutdown nosave命令來強行終止redis,

shutdown nosave和shutdown的區別在于 shutdown nosave不會進行持久化操作,意味著發生在上一次快照后的資料庫修改都會丟失,

Redisson的Lua腳本

了解了lua之后,我們再回過頭來看看Redisson的Lua腳本,就不難理解了,

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                          "end; " +
                          "return redis.call('pttl', KEYS[1]);",
                          Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

Redis中的Pub/Sub機制

下面是Redisson中釋放鎖的代碼,在代碼中我們發現一個publish的指令redis.call('publish', KEYS[2], ARGV[1]),這個指令是干啥的呢?

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                          "return nil;" +
                          "end; " +
                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                          "if (counter > 0) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                          "return 0; " +
                          "else " +
                          "redis.call('del', KEYS[1]); " +
                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return nil;",
                          Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

Redis提供了一組命令可以讓開發者實作“發布/訂閱”模式(publish/subscribe) . 該模式同樣可以實作行程間的訊息傳遞,它的實作原理是:

  • 發布/訂閱模式包含兩種角色,分別是發布者和訂閱者,訂閱者可以訂閱一個或多個頻道,而發布者可以向指定的頻道發送訊息,所有訂閱此頻道的訂閱者都會收到該訊息

  • 發布者發布訊息的命令是PUBLISH, 用法是

    PUBLISH channel message
    

    比如向channel.1發一條訊息:hello

    PUBLISH channel.1 “hello”
    

這樣就實作了訊息的發送,該命令的回傳值表示接收到這條訊息的訂閱者數量,因為在執行這條命令的時候還沒有訂閱者訂閱該頻道,所以回傳為0. 另外值得注意的是訊息發送出去不會持久化,如果發送之前沒有訂閱者,那么后續再有訂閱者訂閱該頻道,之前的訊息就收不到了

訂閱者訂閱訊息的命令是:

SUBSCRIBE channel [channel …]

該命令同時可以訂閱多個頻道,比如訂閱channel.1的頻道:SUBSCRIBE channel.1,執行SUBSCRIBE命令后客戶端會進入訂閱狀態,

一般情況下,我們不會用pub/sub來做訊息發送機制,畢竟有這么多MQ技術在,
關注[跟著Mic學架構]公眾號,獲取更多精品原創

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/330023.html

標籤:其他

上一篇:ArrayList和Vector

下一篇:如何用精確的通用型別制作一個型別腳本還原器

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more