Redis分布式鎖如何自動續(xù)期的實現(xiàn)
Redis 實現(xiàn)分布式鎖
- 指定一個 key 作為鎖標記,存入 Redis 中,指定一個 唯一的用戶標識作為 value。
- 當 key 不存在時才能設(shè)置值,確保同一時間只有一個客戶端進程獲得鎖,滿足互斥性特性。
- 設(shè)置一個過期時間,防止因系統(tǒng)異常導致沒能刪除這個 key,滿足防死鎖特性。
- 當處理完業(yè)務(wù)之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗 value 值,需要滿足只有加鎖的人才能釋放鎖 。
問題
如果這個鎖的過期時間是30秒,但是業(yè)務(wù)運行超過了30秒,比如40秒,當業(yè)務(wù)運行到30秒的時候,鎖過期了,其他客戶端拿到了這個鎖,怎么辦
我們可以設(shè)置一個合理的過期時間,讓業(yè)務(wù)能夠在這個時間內(nèi)完成業(yè)務(wù)邏輯,但LockTime的設(shè)置原本就很不容易。
- LockTime設(shè)置過小,鎖自動超時的概率就會增加,鎖異常失效的概率也就會增加;
- LockTime設(shè)置過大,萬一服務(wù)出現(xiàn)異常無法正常釋放鎖,那么出現(xiàn)這種異常鎖的時間也就越長。
我們只能通過經(jīng)驗去配置,一個可以接受的值,基本上是這個服務(wù)歷史上的平均耗時再增加一定的buff??傮w來說,設(shè)置一個合理的過期時間并不容易
我們也可以不設(shè)置過期時間,讓業(yè)務(wù)運行結(jié)束后解鎖,但是如果客戶端出現(xiàn)了異常結(jié)束了或宕機了,那么這個鎖就無法解鎖,變成死鎖;
自動續(xù)期
我們可以先給鎖設(shè)置一個LockTime,然后啟動一個守護線程,讓守護線程在一段時間后,重新去設(shè)置這個鎖的LockTime。
看起來很簡單,但實現(xiàn)起來并不容易
- 和釋放鎖的情況一樣,我們需要先判斷持有鎖客戶端是否有變化。否則會造成無論誰持有鎖,守護線程都會去重新設(shè)置鎖的LockTime。
- 守護線程要在合理的時間再去重新設(shè)置鎖的LockTime,否則會造成資源的浪費。不能動不動就去續(xù)。
- 如果持有鎖的線程已經(jīng)處理完業(yè)務(wù)了,那么守護線程也應(yīng)該被銷毀。不能業(yè)務(wù)運行結(jié)束了,守護者還在那里繼續(xù)運行,浪費資源。
看門狗
Redisson的看門狗機制就是這種機制實現(xiàn)自動續(xù)期的
Redissson tryLock
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); // 1.嘗試獲取鎖 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 申請鎖的耗時如果大于等于最大等待時間,則申請鎖失敗. time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } current = System.currentTimeMillis(); /** * 2.訂閱鎖釋放事件,并通過 await 方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題: * 基于信息量,當鎖被其它資源占用時,當前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發(fā)消息通知待等待的線程進行競爭. * * 當 this.await 返回 false,說明等待時間已經(jīng)超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗. * 當 this.await 返回 true,進入循環(huán)嘗試獲取鎖. */ RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // await 方法內(nèi)部是用 CountDownLatch 來實現(xiàn)阻塞,獲取 subscribe 異步執(zhí)行的結(jié)果(應(yīng)用了 Netty 的 Future) if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.onComplete((res, e) -> { if (e == null) {unsubscribe(subscribeFuture, threadId); } }); } acquireFailed(threadId); return false; } try { // 計算獲取鎖的總耗時,如果大于等于最大等待時間,則獲取鎖失敗. time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } /** * 3.收到鎖釋放的信號后,在最大等待時間之內(nèi),循環(huán)一次接著一次的嘗試獲取鎖 * 獲取鎖成功,則立馬返回 true, * 若在最大等待時間之內(nèi)還沒獲取到鎖,則認為獲取鎖失敗,返回 false 結(jié)束循環(huán) */ while (true) { long currentTime = System.currentTimeMillis(); // 再次嘗試獲取鎖 ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return true; } // 超過最大等待時間則返回 false 結(jié)束循環(huán),獲取鎖失敗 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } /** * 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息): */ currentTime = System.currentTimeMillis(); if (ttl >= 0 && ttl < time) { //如果剩余時間(ttl)小于wait time ,就在 ttl 時間內(nèi),從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。 getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } else { //則就在wait time 時間范圍內(nèi)等待可以通過信號量 getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } // 更新剩余的等待時間(最大等待時間-已經(jīng)消耗的阻塞時間) time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } } finally { // 7.無論是否獲得鎖,都要取消訂閱解鎖消息 unsubscribe(subscribeFuture, threadId); } return get(tryLockAsync(waitTime, leaseTime, unit)); }
- 嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個數(shù)值,則說明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時間。
- 如果此時客戶端 2 進程獲取鎖失敗,那么使用客戶端 2 的線程 id(其實本質(zhì)上就是進程 id)通過 Redis 的 channel 訂閱鎖釋放的事件。如果等待的過程中一直未等到鎖的釋放事件通知,當超過最大等待時間則獲取鎖失敗,返回 false,也就是第 39 行代碼。如果等到了鎖的釋放事件的通知,則開始進入一個不斷重試獲取鎖的循環(huán)。
- 循環(huán)中每次都先試著獲取鎖,并得到已存在的鎖的剩余存活時間。如果在重試中拿到了鎖,則直接返回。如果鎖當前還是被占用的,那么等待釋放鎖的消息,具體實現(xiàn)使用了信號量 Semaphore 來阻塞線程,當鎖釋放并發(fā)布釋放鎖的消息后,信號量的 release() 方法會被調(diào)用,此時被信號量阻塞的等待隊列中的一個線程就可以繼續(xù)嘗試獲取鎖了。
- 當鎖正在被占用時,等待獲取鎖的進程并不是通過一個 while(true) 死循環(huán)去獲取鎖,而是利用了 Redis 的發(fā)布訂閱機制,通過 await 方法阻塞等待鎖的進程,有效的解決了無效的鎖申請浪費資源的問題。
看門狗如何自動續(xù)期
Redisson看門狗機制, 只要客戶端加鎖成功,就會啟動一個 Watch Dog。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
leaseTime 必須是 -1 才會開啟 Watch Dog 機制,如果需要開啟 Watch Dog 機制就必須使用默認的加鎖時間為 30s。
如果你自己自定義時間,超過這個時間,鎖就會自定釋放,并不會自動續(xù)期。
續(xù)期原理
續(xù)期原理其實就是用lua腳本,將鎖的時間重置為30s
private void scheduleExpirationRenewal(long threadId) { ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { oldEntry.addThreadId(threadId); } else { entry.addThreadId(threadId); renewExpiration(); } } protected RFuture<Boolean> renewExpirationAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
Watch Dog 機制其實就是一個后臺定時任務(wù)線程,獲取鎖成功之后,會將持有鎖的線程放入到一個 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 還持有鎖 key(判斷客戶端是否還持有 key,其實就是遍歷 EXPIRATION_RENEWAL_MAP 里面線程 id 然后根據(jù)線程 id 去 Redis 中查,如果存在就會延長 key 的時間),那么就會不斷的延長鎖 key 的生存時間。
如果服務(wù)宕機了,Watch Dog 機制線程也就沒有了,此時就不會延長 key 的過期時間,到了 30s 之后就會自動過期了,其他線程就可以獲取到鎖。
到此這篇關(guān)于Redis分布式鎖如何自動續(xù)期的實現(xiàn)的文章就介紹到這了,更多相關(guān)Redis分布式鎖自動續(xù)期內(nèi)容請搜索本站以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持本站!
版權(quán)聲明:本站文章來源標注為YINGSOO的內(nèi)容版權(quán)均為本站所有,歡迎引用、轉(zhuǎn)載,請保持原文完整并注明來源及原文鏈接。禁止復(fù)制或仿造本網(wǎng)站,禁止在非www.sddonglingsh.com所屬的服務(wù)器上建立鏡像,否則將依法追究法律責任。本站部分內(nèi)容來源于網(wǎng)友推薦、互聯(lián)網(wǎng)收集整理而來,僅供學習參考,不代表本站立場,如有內(nèi)容涉嫌侵權(quán),請聯(lián)系alex-e#qq.com處理。