redisson 分布式锁lua脚本解析

简介

由于redis是单线程的,所以看lua脚本的时候只需要使用单线程的思维去看就行了,而且个人不建议使用lua脚本编写太复杂的业务,特别是有循环的,写得不好可能会使redis陷入卡顿,甚至死循环直接卡死无法运行其他命令。感兴趣可以自己试一下。

问题1 :为什么 redisson 实现分布式锁的数据类型是 hash 而不是 string。

我的理解:为了支持两个参数 ( 可重入计数 + 线程标识 ) ,如果不使用hash无法实现。其中这个hash里只有一个元素,不会出现两个元素。

非公平锁

实现方式 :hash + PubSub

加锁(tryLockInnerAsync)

--[[ 
    参数
    Collections.singletonList(
        this.getName()  -- key1
    ), 
    
    new Object[]{
        this.internalLockLeaseTime,  --arg1
        this.getLockName(threadId)}  --arg2
]]--
-- 如果锁对应的hash不存在
if (redis.call(\'exists\', KEYS[1]) == 0) 
then 
    -- 加锁成功,并且设置过期时间
    redis.call(\'hset\', KEYS[1], ARGV[2], 1); 
    redis.call(\'pexpire\', KEYS[1], ARGV[1]); 
    return nil; 
end; 
-- 如果锁存在
if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) 
then 
    -- 进行计数+1 (为了可重入)
    redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); 
    redis.call(\'pexpire\', KEYS[1], ARGV[1]); 
    return nil; 
end;
-- 获取剩余过期时间
return redis.call(\'pttl\', KEYS[1]);

解锁(unlockInnerAsync)

--[[
    参数
    Arrays.asList(
        this.getName(), --key1
        this.getChannelName() --key2
    ), 
    new Object[]{
        LockPubSub.unlockMessage, --arg1
        this.internalLockLeaseTime, --arg2
        this.getLockName(threadId) --arg3
    }
]]--
​
-- 锁对应的hash不存在
if (redis.call(\'exists\', KEYS[1]) == 0) 
then 
    -- 通知抢锁。
    redis.call(\'publish\', KEYS[2], ARGV[1]); 
    --结束
    return 1; 
end;
-- 如果锁不存在,不处理
if (redis.call(\'hexists\', KEYS[1], ARGV[3]) == 0) 
then 
    return nil;
end; 
--对其中的元素进行计数-1 实现可重入
local counter = redis.call(\'hincrby\', KEYS[1], ARGV[3], -1); 
-- 如果此时还有计数
if (counter > 0) 
then 
    -- 刷新过期时间
    redis.call(\'pexpire\', KEYS[1], ARGV[2]); 
    return 0; 
else 
    -- 解锁,通知其他线程争抢锁。
    redis.call(\'del\', KEYS[1]); 
    redis.call(\'publish\', KEYS[2], ARGV[1]); 
    return 1; 
end; 
return nil;

公平锁

实现方式 :list + zset + hash + PubSub

zset用于处理过期时间

list用于处理先后顺序

加锁 (tryLockInnerAsync)

--[[ 
    参数
    Arrays.asList(
        this.getName(), --key1
        this.threadsQueueName, --key2
        this.timeoutSetName), --key3
        
    new Object[]{
        this.internalLockLeaseTime, --arg1
        this.getLockName(threadId), --arg2
        currentTime + 5000L, --arg3
        currentTime}  --arg4
]]--
while true do 
    -- 取出队列中队头元素
    local firstThreadId2 = redis.call(\'lindex\', KEYS[2], 0);
    
    --队列中没有元素了就结束
    if firstThreadId2 == false 
    then 
        break; 
    end; 
​
    -- 从zset中获取对应元素的过期时间
    local timeout = tonumber(redis.call(\'zscore\', KEYS[3], firstThreadId2));
    -- 已到过期时间
    if timeout <= tonumber(ARGV[4]) 
    then 
        -- 从过期zset和队列中移除
        redis.call(\'zrem\', KEYS[3], firstThreadId2); 
        redis.call(\'lpop\', KEYS[2]); 
    else 
        break; --直至没有过期元素就结束
    end; 
end;
​
-- 没有获取锁
if (redis.call(\'exists\', KEYS[1]) == 0) 
    and (
        (redis.call(\'exists\', KEYS[2]) == 0) -- 队列中没有元素
        or (redis.call(\'lindex\', KEYS[2], 0) == ARGV[2]) -- 或者元素在队列的头部
    ) 
then 
    --从set和队列中移除,并且加锁成功
    redis.call(\'lpop\', KEYS[2]); 
    redis.call(\'zrem\', KEYS[3], ARGV[2]); 
    redis.call(\'hset\', KEYS[1], ARGV[2], 1); 
    redis.call(\'pexpire\', KEYS[1], ARGV[1]); 
    return nil; 
end; 
​
--  如果已经获取到了锁,那么就进行计数+1 表示重入。
if (redis.call(\'hexists\', KEYS[1], ARGV[2]) == 1) 
then 
    redis.call(\'hincrby\', KEYS[1], ARGV[2], 1); 
    redis.call(\'pexpire\', KEYS[1], ARGV[1]); 
    return nil; 
end; 
​
-- 获取队列的队头元素
local firstThreadId = redis.call(\'lindex\', KEYS[2], 0); 
local ttl; 
​
-- 如果元素不存在并且不是加锁的元素
-- (判断是否是刚加锁成功的) *只用于计算超时时间
if firstThreadId ~= false and firstThreadId ~= ARGV[2] 
then 
    -- 计算剩余的时间 (zset获取后计算)
    ttl = tonumber(redis.call(\'zscore\', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);
else 
    -- 获取剩余时间 (直接获取)
    ttl = redis.call(\'pttl\', KEYS[1]);
end; 
​
--计算超时时间
local timeout = ttl + tonumber(ARGV[3]);
-- 向set中保存超时时间
if redis.call(\'zadd\', KEYS[3], timeout, ARGV[2]) == 1 
then 
    -- 保存成功向队列尾部添加
    redis.call(\'rpush\', KEYS[2], ARGV[2]);
end; 
return ttl;

解锁(unlockInnerAsync)

--[[
    Arrays.asList(
        this.getName(),  --key1
        this.threadsQueueName,  --key2
        this.timeoutSetName,  --key3
        this.getChannelName()  --key4
    ), 
    new Object[]{
        LockPubSub.unlockMessage,  --arg1
        this.internalLockLeaseTime,  --arg2
        this.getLockName(threadId),  --arg3
        System.currentTimeMillis --arg4
    }
]]--
​
-- 与加锁过程相同,作用是清除过期的等待者
while true do 
    local firstThreadId2 = redis.call(\'lindex\', KEYS[2], 0);
    if firstThreadId2 == false then 
        break;
    end; 
    local timeout = tonumber(redis.call(\'zscore\', KEYS[3], firstThreadId2));
    if timeout <= tonumber(ARGV[4]) then 
        redis.call(\'zrem\', KEYS[3], firstThreadId2); 
        redis.call(\'lpop\', KEYS[2]); 
    else 
        break;
    end; 
end;
​
--如果锁对应的hash不存在
if (redis.call(\'exists\', KEYS[1]) == 0) then 
    -- 并且队列中有元素,发消息通知下一个线程竞争锁
    local nextThreadId = redis.call(\'lindex\', KEYS[2], 0);
    -- 队列中还有元素
    if nextThreadId ~= false then 
        redis.call(\'publish\', KEYS[4] .. \':\' .. nextThreadId, ARGV[1]); 
    end; 
    --结束
    return 1; 
end;
​
-- 如果锁不存在 直接结束
if (redis.call(\'hexists\', KEYS[1], ARGV[3]) == 0) then 
    return nil;
end; 
--对持有的锁进行计数-1,表示可重入
local counter = redis.call(\'hincrby\', KEYS[1], ARGV[3], -1); 
-- 如果大于0说明还有持有,刷新持有锁的过期时间。
if (counter > 0) then 
    redis.call(\'pexpire\', KEYS[1], ARGV[2]); 
    --结束
    return 0; 
end; 
-- 解锁
redis.call(\'del\', KEYS[1]); 
-- 获取队列的队头元素
local nextThreadId = redis.call(\'lindex\', KEYS[2], 0); 
-- 如果还有元素,通知下一个线程抢锁。
if nextThreadId ~= false then 
    redis.call(\'publish\', KEYS[4] .. \':\' .. nextThreadId, ARGV[1]); 
end; 
--结束
return 1;