发布/订阅
可以有多个订阅者,它们同时收到相同的消息
简介
发布:
| ➜ ~ redis-cli
127.0.0.1:6379> PUBLISH news hello
(integer) 0
127.0.0.1:6379> PUBLISH news hello
(integer) 1
127.0.0.1:6379>
|
订阅:
| ➜ ~ redis-cli
127.0.0.1:6379> SUBSCRIBE news
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news"
3) (integer) 1
1) "message"
2) "news"
3) "hello"
|
第一次发布时,订阅客户端尚未启动,返回 0,第二次发布时订阅客户端启动,返回 1,订阅客户端收到消息
1
2
3
4
5
6
7
8
9
10
11
12 | # 订阅
import redis
import time
r = redis.Redis("127.0.0.1")
p = r.pubsub()
p.subscribe('my-first-channel', 'my-second-channel')
cnt = 0
while True:
msg = p.get_message()
cnt += 1
if msg is not None:
print(cnt, msg)
|
| # 发布
import redis
r = redis.Redis("127.0.0.1")
r.publish('my-second-channel', '2222')
r.publish('my-first-channel', '111134')
|
订阅输出结果:
| 2 {'type': 'subscribe', 'pattern': None, 'channel': b'my-first-channel', 'data': 1}
3 {'type': 'subscribe', 'pattern': None, 'channel': b'my-second-channel', 'data': 2}
360282 {'type': 'message', 'pattern': None, 'channel': b'my-second-channel', 'data': b'2222'}
360284 {'type': 'message', 'pattern': None, 'channel': b'my-first-channel', 'data': b'111134'}
|
模式匹配订阅
| ➜ ~ redis-cli
127.0.0.1:6379> PUBLISH news2 hello
(integer) 1
127.0.0.1:6379>
|
| ➜ ~ redis-cli
127.0.0.1:6379> PSUBSCRIBE news*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news*"
3) (integer) 1
1) "pmessage"
2) "news*"
3) "news2"
4) "hello"
|
aioredis
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24 | # 订阅
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis_pool('redis://localhost')
ch1, ch2 = await redis.subscribe('channel:1', 'channel:2')
assert isinstance(ch1, aioredis.Channel)
assert isinstance(ch2, aioredis.Channel)
async def reader(channel):
async for message in channel.iter():
print("Got message:", message)
loop = asyncio.get_running_loop()
loop.create_task(reader(ch1))
loop.create_task(reader(ch2))
await asyncio.sleep(30)
redis.close()
await redis.wait_closed()
asyncio.run(main())
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 | # 订阅
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis_pool("redis://localhost")
ch = await redis.subscribe('test_ch')
async for message in ch[0].iter():
print(type(message))
print("Got trader msg:", message)
redis.close()
await redis.wait_closed()
asyncio.run(main())
# <class 'bytes'>
# Got trader msg: b'Hello'
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 | # 发布
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis_pool('redis://localhost')
await redis.publish('channel:1', 'Hello')
await redis.publish('channel:2', 'World')
redis.close()
await redis.wait_closed()
asyncio.run(main())
|
注意
发布订阅是不区分 db 的,redis 使用 db1 发布的消息,redis 使用 db5 的连接也能订阅到