原创

【redis】客户端缓存实例

前言

Redis是最流行的kv数据库,可以极大的减少数据库压力。支持不同的数据结构。我将重点介绍一个Redis 6.0发布的特性,并解释Python的Async是如何与其一起使用的 ,可以在这里看到更多细节:client-side caching.

什么是客户端缓存(client-side caching)

我们这里假设你有一个程序要从Redis中获取数据,通常你需要连到redis服务器,设置键值或者请求获取键值,redis服务处理,再接收响应。通常,这样的处理方式是很快,但是如果服务器很繁忙的情况,或者说网络带宽占满的情况,形势可能会急转直下,如何更快的处理数据呢?

本地程序从Redis中获取或者设置值的时候,本地的内存中也会保留着一份副本。如果多台机器,同时获取了redis数据,如何能够保持一致呢?这时候,就需要有一种方法来知道,值是否过期,或者有人覆盖了它。

从6.0开始,Redis提供了一种方法来简化这一过程。应用程序可以订阅更改,订阅后可以接收到变更的消息提示。有两种不同地策略可以用:

  • 普通模式:redis记住连接的客户端访问了哪些key,在这些key对应的value被修改的时候,通知客户端key对应的值失效了,让客户端重新获取。
  • 广播模式:服务器不记住客户端访问的key,客户端订阅key前缀,并且会自动收到修订的通知消息。

你可以根据实际情况来选择对应的策略如果想要连接更多内容,可以点这里查看 Redis documentation .

本文使用了 broadcasting mode.

现在开始操作

安装redis服务器

先安装一台redis服务器

docker run --name redis -p 6379:6379 -d --rm redis:6-alpine

上面的命令,新建了一个简单的redis服务器,下面是python代码:

import asyncio

from redis.asyncio import BlockingConnectionPool, Redis

class ClientSideCache:
    def __init__(self, redis_host):
        self.__pool = BlockingConnectionPool(host=redis_host, decode_responses=True)

    def __await__(self):
        return self.init().__await__()

    async def init(self):
        self._pool = await Redis(connection_pool=self.__pool)
        return self

    async def set(self, key, value):
        await self._pool.set(key, value)

    async def get(self, key):
        return await self._pool.get(key)

async def main():
    client = await ClientSideCache("localhost")
    await client.set("my_key", "my_value")
    print(await client.get("my_key"))

if __name__ == "__main__":
    asyncio.run(main())

运行后,检查下是否生效了:

> docker exec -it redis redis-cli monitor
OK
1652658570.800467 [0 172.17.0.1:64786] "SET" "my_key" "my_value"
1652658570.804626 [0 172.17.0.1:64786] "GET" "my_key"

把数据存到本地:

class ClientSideCache:
    def __init__(self, redis_host):
        self._local_cache = {}

    ...

    async def set(self, key, value):
        self._local_cache[key] = value
        await self._pool.set(key, value)

    async def get(self, key):
        if key in self._local_cache:
            return self._local_cache[key]
        value = await self._pool.get(key)
        if value is not None:
            self._local_cache[key] = value
        return value

变量_local_cache 就是客户端级别的缓存。现在检查下,客户端程序是否有对redis的请求:

async def main(): client = await ClientSideCache("localhost")
    await client.set("my_key", "my_value")
    await client.get("my_key")
> docker exec -it redis redis-cli monitor
OK
1652659187.314014 [0 172.17.0.1:64830] "SET" "my_key" "my_value"

接下来:

  1. 打开Redis 连接。
  2. 执行CLIENT ID命令去获取一个ID
  3. 执行CLIENT TRACKING on REDIRECT {client_id} BCAST PREFIX '' 告诉Redis 这个客户端订阅了value更改的通知。
  4. 执行SUBSCRIBE __redis__:invalidate,订阅无效的消息

现在,你可以打开一个新连接并设置一个带有值的key。

_1) "message"
2) "__redis__:invalidate"
3) 1) "key"_

基本上,以上做法应该足以保证内存副本中的一致了。在Python中实现:

class ClientSideCache:
    ...
    async def init(self):
        self._pool = await Redis(connection_pool=self.__pool)
        asyncio.create_task(self._listen_invalidate())
        return self

    async def _listen_invalidate(self):
        pubsub = self._pool.pubsub()
        await pubsub.execute_command(b"CLIENT", b"ID")
        client_id = await pubsub.connection.read_response()
        await pubsub.execute_command(f"CLIENT TRACKING on REDIRECT {client_id} BCAST")
        await pubsub.connection.read_response()
        await pubsub.subscribe("__redis__:invalidate")

        while True:
            message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
            if message is None or not message.get("data"):
                continue
            key = message["data"][0]
            del self._local_cache[key]
    ...

init方法中,你需要创建一个任务。
asyncio.create_task(self._listen_invalidate()).

在此任务中,创建一个PubSub对象并且使用它,这样你就可以使用一个连接来打开客户端跟踪和订阅的消息。基本上就这样就行了。

下面是完整代码:

import asyncio

from aioredis import BlockingConnectionPool, Redis


class ClientSideCache:
    def __init__(self, redis_host):
        self._local_cache = {}
        self.__pool = BlockingConnectionPool(host=redis_host, decode_responses=True)

    def __await__(self):
        return self.init().__await__()

    async def init(self):
        self._pool = await Redis(connection_pool=self.__pool)
        asyncio.create_task(self._listen_invalidate())
        return self

    async def _listen_invalidate(self):
        pubsub = self._pool.pubsub()
        await pubsub.execute_command(b"CLIENT", b"ID")
        client_id = await pubsub.connection.read_response()
        await pubsub.execute_command(
            f"CLIENT TRACKING on REDIRECT {client_id} BCAST"
        )
        await pubsub.connection.read_response()
        await pubsub.subscribe("__redis__:invalidate")

        while True:
            message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
            if message is None or not message.get("data"):
                continue
            key = message["data"][0]
            if key in self._local_cache:
                del self._local_cache[key]

    async def set(self, key, value):
        self._local_cache[key] = value
        await self._pool.set(key, value)

    async def get(self, key):
        if key in self._local_cache:
            return self._local_cache[key]
        value = await self._pool.get(key)
        if value is not None:
            self._local_cache[key] = value
        return value

要验证效果,下面开始生成10个values:

for i in {1..10}
do
  docker exec redis redis-cli set key_${i} value_${i}
done

在运行Redis的监视器的终端中执行如下命令:

docker exec -it redis redis-cli monitor

然后运行如下脚本:

async def main():
    client = await ClientSideCache("localhost")
    # getting values and store it in local cache
    for i in range(1, 11):
        print(await client.get(f"key_{i}"))

    input("Stop! Press enter to continue")
    # second block
    for i in range(1, 11):
        print(await client.get(f"key_{i}"))

输入一些数据,改变redis的一些值。修改key_5的值,把key_6的值设置为过期:

> docker exec -it redis redis-cli set key_5 new
OK
> docker exec -it redis redis-cli expire key_6 1
(integer) 1

返回到python中,并且按回车,结束程序。之后,将会在redis监视器中会看下以下输出:

> docker exec -it redis redis-cli monitor
OK
 "CLIENT" "ID"
 "CLIENT" "TRACKING" "on" "REDIRECT" "1361" "BCAST"
 "SUBSCRIBE" "__redis__:invalidate"
 "GET" "key_1"  
 "GET" "key_2"  
 "GET" "key_3"    
 "GET" "key_4"    
 "GET" "key_5"    
 "GET" "key_6"    
 "GET" "key_7"
 "GET" "key_8"
 "GET" "key_9"
 "GET" "key_10"
 # here we did manual commands to change 2 keys
 "set" "key_5" "new"
 "expire" "key_6" "10"
 # here we pressed enter for our script
 "GET" "key_5"
 "GET" "key_6"

可以看到,只有两个请求。这是因为这些key已经失效了。其他key是从应用的内存中存储的。

温馨提示:
本文最后更新于 2023年01月19日,已超过 18 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我
正文到此结束
本文目录