服务端
处理 post 请求
1
2
3
4
5
6
7
8
9
10
11
12
13 | from aiohttp import web
async def post_handler(request):
data = await request.post()
data_list = list(data.items())
print(data_list)
return web.Response(text="hhh")
app = web.Application()
app.add_routes([web.post('/post', post_handler)])
web.run_app(app)
|
服务端使用SSL验证
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 | from aiohttp import web
import ssl
def extract_cert(request):
peercert = request.transport.get_extra_info("peercert")
subject = peercert['subject']
ans = {}
for k in subject:
for k, v in k:
ans[k] = v
return ans
async def handle(request):
user = extract_cert(request)
text = f"Hello, {user['commonName']}"
return web.Response(text=text)
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile='xxxca.pem') # 根CA证书
# ssl_context.verify_mode = ssl.CERT_OPTIONAL
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_cert_chain('xxx.crt.pem', 'xxx.key.pem') # 服务端证书
web.run_app(app, ssl_context=ssl_context)
|
使用WebSockets
aiohttp.web
直接提供WebSockets
支持
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 | from aiohttp import web
import aiohttp
app = web.Application()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
received_message = msg.data
print(f"Received: {received_message}")
await ws.send_str(f"Received: {received_message}")
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket Error: {ws.exception()}")
return ws
app.router.add_get('/', websocket_handler)
web.run_app(app,port=3000)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | import asyncio
import aiohttp
from aiohttp import web
async def handle(request):
ws = web.WebSocketResponse(
autoping=True, heartbeat=10.0, compress=True, max_msg_size=0)
print(request.match_info.get('name', "nocilantro"))
await ws.prepare(request)
for i in range(10):
await ws.send_str(str(i))
await asyncio.sleep(1)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = msg.data
if data:
await ws.send_str('recevive' + data)
return ws
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
web.run_app(app)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58 | import asyncio
import aiohttp
from aiohttp import web
fut_flag = asyncio.Future()
async def client():
async with aiohttp.ClientSession() as session:
async with session.ws_connect('ws://hq.sinajs.cn/wskt?list=s_sh000001',
heartbeat=1) as ws:
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSED:
print("ws close.")
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print("error!")
break
except Exception as e:
print(f"ERROR: {type(e)}:{e}")
raise e
async def handle(request):
ws = web.WebSocketResponse(
autoping=True, heartbeat=10.0, compress=True, max_msg_size=0)
print(request.match_info.get('name', "nocilantro"))
await ws.prepare(request)
for i in range(10):
await ws.send_str(str(i))
await asyncio.sleep(1)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = msg.data
if data:
await ws.send_str('recevive' + data)
return ws
async def server():
app = web.Application()
app.add_routes([web.get('/', handle)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner)
await site.start()
await fut_flag
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
server(),
client()
))
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | import asyncio
import aiohttp
import collections
from aiohttp import web
fut_flag = asyncio.Future()
req_lists = set()
loop = asyncio.get_event_loop()
req_dict = collections.defaultdict(lambda: [])
async def client(list_str):
async with aiohttp.ClientSession() as session:
async with session.ws_connect(f"ws://hq.sinajs.cn/wskt?list={list_str}",
heartbeat=1) as ws:
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
for ws in req_dict[list_str]:
print(msg.data)
await ws.send_str(msg.data)
elif msg.type == aiohttp.WSMsgType.CLOSED:
print("ws close.")
break
elif msg.type == aiohttp.WSMsgType.ERROR:
print("error!")
break
except Exception as e:
print(f"ERROR: {type(e)}:{e}")
raise e
async def handle(request):
list_str = request.match_info.get('list', None)
if list_str is None or list_str in req_lists:
return
print(list_str)
ws = web.WebSocketResponse(
autoping=True, heartbeat=10.0, compress=True, max_msg_size=0)
await ws.prepare(request)
req_lists.add(list_str)
req_dict[list_str].append(ws)
loop.create_task(client(list_str))
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = msg.data
if data:
await ws.send_str('recevive' + data)
return ws
async def server():
app = web.Application()
app.add_routes([web.get('/list={list}', handle)])
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner)
await site.start()
await fut_flag
loop.run_until_complete(server())
|