Skip to content

服务端

处理 post 请求

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
from aiohttp import web


async def post_handler(request):
    data = await request.post()
    data_list = list(data.items())
    print(data_list)
    return web.Response(text="hhh")


app = web.Application()
app.add_routes([web.post('/post', post_handler)])
web.run_app(app)

服务端使用SSL验证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from aiohttp import web
import ssl
def extract_cert(request):
    peercert = request.transport.get_extra_info("peercert")
    subject = peercert['subject']
    ans = {}
    for k in subject:
        for k, v in k:
            ans[k] = v
    return ans
async def handle(request):
    user = extract_cert(request)
    text = f"Hello, {user['commonName']}"
    return web.Response(text=text)
app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH, cafile='xxxca.pem') # 根CA证书
# ssl_context.verify_mode = ssl.CERT_OPTIONAL
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.load_cert_chain('xxx.crt.pem', 'xxx.key.pem') # 服务端证书
web.run_app(app, ssl_context=ssl_context)

使用WebSockets

aiohttp.web直接提供WebSockets支持

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
from aiohttp import web
import aiohttp

app = web.Application()

async def websocket_handler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            received_message = msg.data
            print(f"Received: {received_message}") 
            await ws.send_str(f"Received: {received_message}")
        elif msg.type == aiohttp.WSMsgType.ERROR:
            print(f"WebSocket Error: {ws.exception()}")

    return ws

app.router.add_get('/', websocket_handler) 
web.run_app(app,port=3000)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
import aiohttp
from aiohttp import web


async def handle(request):
    ws = web.WebSocketResponse(
            autoping=True, heartbeat=10.0, compress=True, max_msg_size=0)
    print(request.match_info.get('name', "nocilantro"))
    await ws.prepare(request)
    for i in range(10):
        await ws.send_str(str(i))
        await asyncio.sleep(1)
    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            data = msg.data
            if data:
                await ws.send_str('recevive' + data)
    return ws


app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])
web.run_app(app)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import asyncio
import aiohttp
from aiohttp import web


fut_flag = asyncio.Future()


async def client():
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect('ws://hq.sinajs.cn/wskt?list=s_sh000001',
                                      heartbeat=1) as ws:
            try:
                async for msg in ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        print(msg.data)
                    elif msg.type == aiohttp.WSMsgType.CLOSED:
                        print("ws close.")
                        break
                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        print("error!")
                        break
            except Exception as e:
                print(f"ERROR: {type(e)}:{e}")
                raise e


async def handle(request):
    ws = web.WebSocketResponse(
            autoping=True, heartbeat=10.0, compress=True, max_msg_size=0)
    print(request.match_info.get('name', "nocilantro"))
    await ws.prepare(request)
    for i in range(10):
        await ws.send_str(str(i))
        await asyncio.sleep(1)
    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            data = msg.data
            if data:
                await ws.send_str('recevive' + data)
    return ws


async def server():
    app = web.Application()
    app.add_routes([web.get('/', handle)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner)
    await site.start()
    await fut_flag


loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    server(),
    client()
))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
import aiohttp
import collections
from aiohttp import web


fut_flag = asyncio.Future()
req_lists = set()
loop = asyncio.get_event_loop()
req_dict = collections.defaultdict(lambda: [])


async def client(list_str):
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(f"ws://hq.sinajs.cn/wskt?list={list_str}",
                                      heartbeat=1) as ws:
            try:
                async for msg in ws:
                    if msg.type == aiohttp.WSMsgType.TEXT:
                        for ws in req_dict[list_str]:
                            print(msg.data)
                            await ws.send_str(msg.data)
                    elif msg.type == aiohttp.WSMsgType.CLOSED:
                        print("ws close.")
                        break
                    elif msg.type == aiohttp.WSMsgType.ERROR:
                        print("error!")
                        break
            except Exception as e:
                print(f"ERROR: {type(e)}:{e}")
                raise e


async def handle(request):
    list_str = request.match_info.get('list', None)
    if list_str is None or list_str in req_lists:
        return
    print(list_str)
    ws = web.WebSocketResponse(
        autoping=True, heartbeat=10.0, compress=True, max_msg_size=0)
    await ws.prepare(request)
    req_lists.add(list_str)
    req_dict[list_str].append(ws)
    loop.create_task(client(list_str))
    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            data = msg.data
            if data:
                await ws.send_str('recevive' + data)
    return ws


async def server():
    app = web.Application()
    app.add_routes([web.get('/list={list}', handle)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner)
    await site.start()
    await fut_flag


loop.run_until_complete(server())