Python基于异步库实现Redis分布式锁

本文最后更新于:2022年7月9日 凌晨

什么是分布式锁?

分布式锁用于控制分布式系统中不同进程共用访问共享资源的一种锁实现。在同一进程中,可以很轻易的使用锁。但是在不同进程中,就需要利用到一些中间件比如Redis实现锁的机制,以防止干扰,保证一致性。

需要用到Redis哪些特征?

分布式锁主要用到Redis的setnx以及expire。
setnx用于设置一个key,如果该key不存在,则设置成功。反之亦然。
expire用于给一个key添加一个过期时间,当过期时,Redis会将该key删除。

实现

在这里,我使用的是Python分布式Redis库‘aioredis’。

导入相关库:

1
2
3
4
5
import asyncio
import uuid
import aioredis
import math
import time

首先我们实现一个上锁的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
async def acquire_lock(app, lock_name, acquire_timeout=10, lock_timeout=10):
# 生成随机身份值
identifier = str(uuid.uuid4())
# 锁的key值
lock_name = 'lock:' + lock_name
# 锁的过期时间
lock_timeout = int(math.ceil(lock_timeout))

# 超时则不再尝试上锁
end_acquire_time = time.time() + acquire_timeout
# 从连接池获取redis对象
redis = aioredis.Redis(connection_pool=app.ctx.redis_pool)
while time.time() < end_acquire_time:
# 上锁成功
if await redis.set(
name=lock_name, # 锁名
value=identifier, # 锁值
ex=lock_timeout, # 过期时间
nx=True): # 如果key存在则不上锁
return identifier
# 上锁失败
await asyncio.sleep(1)
return False

这里首先会随机生成一个identifier,为什么需要这个值?这个值主要是用于对锁的持有者进行一个认证。
举个例子,当你对key‘test’上锁并设置了过期时间5秒,并且在代码执行完成后对该锁进行释放。然而这时候你的程序却执行了10秒,在你的key过期后,已经有其他的进程拿到了‘test’的锁。如果这时你的程序还是直接delete这个key的话,就会将别人的锁进行释放。因此这里需要一个identifier值,当你在进行delete前,会进行一个判断,如果这个key的identifier值与你的相同,则可以删除。反之不能删除。
如果上锁成功,该函数会返回identifier的值。反之,函数会sleep一秒,然后继续尝试上锁,默认时间是10秒。如果超过acquire_timeout,则会返回False.


接着我们再写一个释放锁的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
async def release_lock(app, lock_name, identifier):
# 锁名
lock_name = 'lock:' + lock_name
redis = aioredis.Redis(connection_pool=app.ctx.redis_pool)

# 开启pipeline
pipe = redis.pipeline()
while True:
try:
# 开启对key的watch
await pipe.watch(lock_name)

# 开启事务
pipe.multi()
await pipe.delete(lock_name)
await pipe.execute() # 执行事务

# 关闭watch
await pipe.unwatch()
break
except Exception as e:
print(f"ERROR:{e}")
continue

# 关闭pipeline
await pipe.close()
return True

这里的pipeline是什么?pipeline是Redis的一个机制,用于将多个set等指令合并,减少RTT(Round Time Trip)的影响。
multi则相当于事务,保证操作的原子性,但是不能实现回滚。因此这里还用了watch,watch会监听一个key,当pipe.execute时,如果这个key发现了变动,则execute的命令不会执行。

至此,上锁和解锁的函数已经完成。
通过这两个函数实现了分布式锁的上锁并返回identifier、设置锁的过期时间、上锁失败后的重试以及解锁的时候防止误解其他进程的锁。


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!