Skip to content

发布/订阅

可以有多个订阅者,它们同时收到相同的消息

简介

发布:

1
2
3
4
5
6
➜  ~ redis-cli
127.0.0.1:6379> PUBLISH news hello
(integer) 0
127.0.0.1:6379> PUBLISH news hello
(integer) 1
127.0.0.1:6379>

订阅:

1
2
3
4
5
6
7
8
9
➜  ~ redis-cli
127.0.0.1:6379> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news"
3) (integer) 1
1) "message"
2) "news"
3) "hello"

第一次发布时,订阅客户端尚未启动,返回 0,第二次发布时订阅客户端启动,返回 1,订阅客户端收到消息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 订阅
import redis
import time
r = redis.Redis("127.0.0.1")
p = r.pubsub()
p.subscribe('my-first-channel', 'my-second-channel')
cnt = 0
while True:
    msg = p.get_message()
    cnt += 1
    if msg is not None:
        print(cnt, msg)
1
2
3
4
5
6
# 发布
import redis

r = redis.Redis("127.0.0.1")
r.publish('my-second-channel', '2222')
r.publish('my-first-channel', '111134')

订阅输出结果:

1
2
3
4
2 {'type': 'subscribe', 'pattern': None, 'channel': b'my-first-channel', 'data': 1}
3 {'type': 'subscribe', 'pattern': None, 'channel': b'my-second-channel', 'data': 2}
360282 {'type': 'message', 'pattern': None, 'channel': b'my-second-channel', 'data': b'2222'}
360284 {'type': 'message', 'pattern': None, 'channel': b'my-first-channel', 'data': b'111134'}

模式匹配订阅

1
2
3
4
➜  ~ redis-cli
127.0.0.1:6379> PUBLISH news2 hello
(integer) 1
127.0.0.1:6379>
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
➜  ~ redis-cli
127.0.0.1:6379> PSUBSCRIBE news*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news*"
3) (integer) 1
1) "pmessage"
2) "news*"
3) "news2"
4) "hello"

aioredis

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 订阅
import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')

    ch1, ch2 = await redis.subscribe('channel:1', 'channel:2')
    assert isinstance(ch1, aioredis.Channel)
    assert isinstance(ch2, aioredis.Channel)

    async def reader(channel):
        async for message in channel.iter():
            print("Got message:", message)
    loop = asyncio.get_running_loop()
    loop.create_task(reader(ch1))
    loop.create_task(reader(ch2))

    await asyncio.sleep(30)
    redis.close()
    await redis.wait_closed()

asyncio.run(main())
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
# 订阅
import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool("redis://localhost")

    ch = await redis.subscribe('test_ch')

    async for message in ch[0].iter():
        print(type(message))
        print("Got trader msg:", message)
    redis.close()
    await redis.wait_closed()

asyncio.run(main())
# <class 'bytes'>
# Got trader msg: b'Hello'
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 发布
import asyncio
import aioredis


async def main():
    redis = await aioredis.create_redis_pool('redis://localhost')

    await redis.publish('channel:1', 'Hello')
    await redis.publish('channel:2', 'World')

    redis.close()
    await redis.wait_closed()

asyncio.run(main())

注意

发布订阅是不区分 db 的,redis 使用 db1 发布的消息,redis 使用 db5 的连接也能订阅到