【redis】客户端缓存实例
前言
Redis是最流行的kv数据库,可以极大的减少数据库压力。支持不同的数据结构。我将重点介绍一个Redis 6.0发布的特性,并解释Python的Async是如何与其一起使用的 ,可以在这里看到更多细节:client-side caching.
什么是客户端缓存(client-side caching)
我们这里假设你有一个程序要从Redis中获取数据,通常你需要连到redis服务器,设置键值或者请求获取键值,redis服务处理,再接收响应。通常,这样的处理方式是很快,但是如果服务器很繁忙的情况,或者说网络带宽占满的情况,形势可能会急转直下,如何更快的处理数据呢?
本地程序从Redis中获取或者设置值的时候,本地的内存中也会保留着一份副本。如果多台机器,同时获取了redis数据,如何能够保持一致呢?这时候,就需要有一种方法来知道,值是否过期,或者有人覆盖了它。
从6.0开始,Redis提供了一种方法来简化这一过程。应用程序可以订阅更改,订阅后可以接收到变更的消息提示。有两种不同地策略可以用:
- 普通模式:redis记住连接的客户端访问了哪些key,在这些key对应的value被修改的时候,通知客户端key对应的值失效了,让客户端重新获取。
- 广播模式:服务器不记住客户端访问的key,客户端订阅key前缀,并且会自动收到修订的通知消息。
你可以根据实际情况来选择对应的策略如果想要连接更多内容,可以点这里查看 Redis documentation .
本文使用了 broadcasting mode.
现在开始操作
安装redis服务器
先安装一台redis服务器
docker run --name redis -p 6379:6379 -d --rm redis:6-alpine
上面的命令,新建了一个简单的redis服务器,下面是python代码:
import asyncio
from redis.asyncio import BlockingConnectionPool, Redis
class ClientSideCache:
def __init__(self, redis_host):
self.__pool = BlockingConnectionPool(host=redis_host, decode_responses=True)
def __await__(self):
return self.init().__await__()
async def init(self):
self._pool = await Redis(connection_pool=self.__pool)
return self
async def set(self, key, value):
await self._pool.set(key, value)
async def get(self, key):
return await self._pool.get(key)
async def main():
client = await ClientSideCache("localhost")
await client.set("my_key", "my_value")
print(await client.get("my_key"))
if __name__ == "__main__":
asyncio.run(main())
运行后,检查下是否生效了:
> docker exec -it redis redis-cli monitor
OK
1652658570.800467 [0 172.17.0.1:64786] "SET" "my_key" "my_value"
1652658570.804626 [0 172.17.0.1:64786] "GET" "my_key"
把数据存到本地:
class ClientSideCache:
def __init__(self, redis_host):
self._local_cache = {}
...
async def set(self, key, value):
self._local_cache[key] = value
await self._pool.set(key, value)
async def get(self, key):
if key in self._local_cache:
return self._local_cache[key]
value = await self._pool.get(key)
if value is not None:
self._local_cache[key] = value
return value
变量_local_cache
就是客户端级别的缓存。现在检查下,客户端程序是否有对redis的请求:
async def main(): client = await ClientSideCache("localhost")
await client.set("my_key", "my_value")
await client.get("my_key")
> docker exec -it redis redis-cli monitor
OK
1652659187.314014 [0 172.17.0.1:64830] "SET" "my_key" "my_value"
接下来:
- 打开Redis 连接。
- 执行
CLIENT ID
命令去获取一个ID
。 - 执行
CLIENT TRACKING on REDIRECT {client_id} BCAST PREFIX ''
告诉Redis 这个客户端订阅了value更改的通知。 - 执行
SUBSCRIBE __redis__:invalidate
,订阅无效的消息
现在,你可以打开一个新连接并设置一个带有值的key。
_1) "message"
2) "__redis__:invalidate"
3) 1) "key"_
基本上,以上做法应该足以保证内存副本中的一致了。在Python中实现:
class ClientSideCache:
...
async def init(self):
self._pool = await Redis(connection_pool=self.__pool)
asyncio.create_task(self._listen_invalidate())
return self
async def _listen_invalidate(self):
pubsub = self._pool.pubsub()
await pubsub.execute_command(b"CLIENT", b"ID")
client_id = await pubsub.connection.read_response()
await pubsub.execute_command(f"CLIENT TRACKING on REDIRECT {client_id} BCAST")
await pubsub.connection.read_response()
await pubsub.subscribe("__redis__:invalidate")
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
if message is None or not message.get("data"):
continue
key = message["data"][0]
del self._local_cache[key]
...
在init
方法中,你需要创建一个任务。asyncio.create_task(self._listen_invalidate()).
在此任务中,创建一个PubSub
对象并且使用它,这样你就可以使用一个连接来打开客户端跟踪和订阅的消息。基本上就这样就行了。
下面是完整代码:
import asyncio
from aioredis import BlockingConnectionPool, Redis
class ClientSideCache:
def __init__(self, redis_host):
self._local_cache = {}
self.__pool = BlockingConnectionPool(host=redis_host, decode_responses=True)
def __await__(self):
return self.init().__await__()
async def init(self):
self._pool = await Redis(connection_pool=self.__pool)
asyncio.create_task(self._listen_invalidate())
return self
async def _listen_invalidate(self):
pubsub = self._pool.pubsub()
await pubsub.execute_command(b"CLIENT", b"ID")
client_id = await pubsub.connection.read_response()
await pubsub.execute_command(
f"CLIENT TRACKING on REDIRECT {client_id} BCAST"
)
await pubsub.connection.read_response()
await pubsub.subscribe("__redis__:invalidate")
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
if message is None or not message.get("data"):
continue
key = message["data"][0]
if key in self._local_cache:
del self._local_cache[key]
async def set(self, key, value):
self._local_cache[key] = value
await self._pool.set(key, value)
async def get(self, key):
if key in self._local_cache:
return self._local_cache[key]
value = await self._pool.get(key)
if value is not None:
self._local_cache[key] = value
return value
要验证效果,下面开始生成10个values:
for i in {1..10}
do
docker exec redis redis-cli set key_${i} value_${i}
done
在运行Redis的监视器的终端中执行如下命令:
docker exec -it redis redis-cli monitor
然后运行如下脚本:
async def main():
client = await ClientSideCache("localhost")
# getting values and store it in local cache
for i in range(1, 11):
print(await client.get(f"key_{i}"))
input("Stop! Press enter to continue")
# second block
for i in range(1, 11):
print(await client.get(f"key_{i}"))
输入一些数据,改变redis的一些值。修改key_5的值,把key_6的值设置为过期:
> docker exec -it redis redis-cli set key_5 new
OK
> docker exec -it redis redis-cli expire key_6 1
(integer) 1
返回到python中,并且按回车,结束程序。之后,将会在redis监视器中会看下以下输出:
> docker exec -it redis redis-cli monitor
OK
"CLIENT" "ID"
"CLIENT" "TRACKING" "on" "REDIRECT" "1361" "BCAST"
"SUBSCRIBE" "__redis__:invalidate"
"GET" "key_1"
"GET" "key_2"
"GET" "key_3"
"GET" "key_4"
"GET" "key_5"
"GET" "key_6"
"GET" "key_7"
"GET" "key_8"
"GET" "key_9"
"GET" "key_10"
# here we did manual commands to change 2 keys
"set" "key_5" "new"
"expire" "key_6" "10"
# here we pressed enter for our script
"GET" "key_5"
"GET" "key_6"
可以看到,只有两个请求。这是因为这些key已经失效了。其他key是从应用的内存中存储的。
本文来自:【redis】客户端缓存实例-小码农,转载请保留本条链接,感谢!
- 本文标签: redis python cache
- 本文链接: https://djc8.cn/archives/redis-client-cache-instance.html
- 版权声明: 本文由小码农原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权