Python-TCP编程-UDP编程-SocketServer-IO各种概念及多路复用-asyncio-学习笔记

发布于:2025-07-18 ⋅ 阅读:(54) ⋅ 点赞:(0)

欠4前年的一份笔记 ,献给今后的自己。

网络编程

Socket介绍
Socket套接字

Python中提供socket.py标准库,非常底层的接口库。
Socket是一种通用的网络编程接口,和网络层次没有一一对应的关系。
协议族

AF表示Address Family,用于socket()第一个参数
在这里插入图片描述

TCP编程

Socket编程,需要两端,一般来说需要一个服务端、一个客户端,服务端称为Server,客户端称为Client

TCP服务端编程

服务器端编程步骤

  • 创建Socket对象
  • 绑定IP地址Address和端口Port。bind()方法
    IPv4地址为一个二元组(IP地址字符串,Port)
  • 开始监听,将在指定的IP的端口上监听。listen()方法
  • 获取用于传送数据的Socket对象
    socket.accept() -> (socket object, address info)
    accept方法阻塞等待客户端建立连接,返回一个新的Socket对象和客户端地址的二元组
    地址是远程客户端的地址,IPv4中它是一个二元组(clientaddr, port)
    1、接收数据 : recv(bufsize[, flags]) 使用缓冲区接收数据
    2、发送数据 : send(bytes) 发送数据

在这里插入图片描述

问题

两次绑定同一个监听端口会怎么样?

import socket

s = socket.socket()  # 创建socket对象
s.bind(('127.0.0.1', 9999))  # 一个二元组

s.listen()  # 开始监听
# 开启一个连接

s1, info = s.accept()  # 阻塞直到和客户端成功建立连接,返回一个socket对象和客户端地址

# 使用缓冲区获取数据

data = s1.recv(1024)

print(data, info)

s1.send(b'magedu.com')

# 开启另外一个连接
s2, _ = s.accept()

data = s2.recv(1024)

s2.send(b'hello python')

s.close()

上例accept和recv是阻塞的,主线程经常被阻塞住而不能工作。怎么办?

练习一一写一个群聊程序

需求分析

聊天工具是CS程序,C是每一个客户端,S是服务器端。
服务器应该具有的功能:
启动服务,包括绑定地址和端口,监听
建立连接,能和多个客户端建立连接
接收不同用户的信息
分发,将接收的某个用户的信息转发到已连接的所有客户端
停止服务
记录连接的客户端

代码实现

服务端应该对应一个类

class ChatServer:
    def __init__(self, ip, port):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)

    def start(self):  # 启动监听
        pass

    def accept(self):  # 多人连接
        pass

    def recv(self):  # 接收客户端数据
        pass

    def stop(self):  # 停止服务
        pass

在此基础上,扩展完成

import logging
import socket
import threading
import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")


class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # $B
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while True:
            sock, client = self.sock.accept()  # 阻塞
            self.clients[client] = sock  # 添加到客户端字典
            # 准备接收数据,recv是阻塞的,开启新的线程
            threading.Thread(target=self.recv, args=(sock, client)).start()

    def recv(self, sock: socket.socket, client):  # 接收客户端数据
        while True:
            data = sock.recv(1024)  # 阻塞到数据到来
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())

            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()


cs = ChatServer()
cs.start

在此基础上,扩展完成

import threading
import datetime
import logging
import socket

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")


class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        logging.info('Connecting to Chat Server')
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 378
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # BE%
            self.clients[client] = sock  # 添加到客户端字典
            # 准备接收数据,recv是阻塞的,开启新的线程
            threading.Thread(target=self.recv, args=(sock, client)).start()

    def recv(self, sock: socket.socket, client):  # 接收客户端数据
        while not self.event.is_set():
            data = sock.recv(1024)  # 阻塞到数据到来
            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())

            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()


cs = ChatServer()
cs.start()

while True:
    cmd = input('>>').strip()
    if cmd == 'quit':
        cs.stop()
        threading.Event().wait(3)
        break

基本功能完成,但是有问题。使用Event改进。

import logging
import socket

import threading
import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")


class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  #绑定 
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # BE
            self.clients[client] = sock  # 添加到客户端字典
            # 准备接收数据,recv是阻塞的,开启新的线程
            threading.Thread(target=self.recv, args=(sock, client)).start()

    def recv(self, sock: socket.socket, client):  # 接收客户端数据
        while not self.event.is_set():
            data = sock.recv(1024)  # 阻塞到数据到来
            msg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())
            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()


cs = ChatServer()
cs.start()

while True:
    cmd = input('>>').strip()
    if cmd == 'quit':
        cs.stop()
        threading.Event().wait(3)
        break

这一版基本能用了,测试通过。但是还有要完善的地方。

例如各种异常的判断,客户端断开连接后字典中的移除客户端数据等。

客户端主动断开带来的问题

服务端知道自己何时断开,如果客户端断开,服务器不知道。

所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发

送一个空消息,超时返回异常,捕获异常并清理连接。

即使 客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能。

增加客户端退出命令

import logging
import socket

import threading
import datetime

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")

class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # BE
            self.clients[client] = sock  # 添加到客户端字典
            # 准备接收数据,recv是阻塞的,开启新的线程
            threading.Thread(target=self.recv, args=(sock, client)).start()

    def recv(self, sock: socket.socket, client):  # 接收客户端数据
        while not self.event.is_set():
            data = sock.recv(1024)  # 阻塞到数据到来

            msg = data.decode().strip()
            # 客户端退出命令
            if msg == 'quit':
                self.clients.pop(client)
                sock.close()
                logging.info('{} quits'.format(client))
                break

            msg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())
            logging.info(msg)
            msg = msg.encode()
            for s in self.clients.values():
                s.send(msg)

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()


cs = ChatServer()
cs.start()

while True:
    cmd = input('>>').strip()
    if cmd == 'quit':
        cs.stop()
        threading.Event().wait(3)
        break

    logging.info(threading.enumerate())  # 用来观察断开后线程的变化

程序还有瑕疵,但是业务功能基本完成了
socket常用方法

在这里插入图片描述

MakeFile

socket-makefile(mode=‘r’, buffering=None, *, encoding=None, errors=None, newline=None)
创建一个与该套接字相关连的文件对象,将recv方法看做读方法,将send方法看做写方法。

# 使用makefile简单例子
import socket

sockserver = socket.socket()

ip = '127.0.0.1'

port = 9999

addr = (ip, port)

sockserver.bind(addr)

sockserver.listen()

print(' - ' * 30)
s, _ = sockserver.accept()

print(' - ' * 30)

f = s.makefile(mode='rw')

line = f.read(10)  # 阻塞等
print(' - ' * 30)
print(line)
f.write('Return your msg: {}'.format(line))
f.flush()

上例不能循环接收消息,修改一下

# 使用makefile简单例子
import socket
import threading

sockserver = socket.socket()

ip = '127.0.0.1'

port = 9999

addr = (ip, port)

sockserver.bind(addr)

sockserver.listen()

print(' - ' * 30)

event = threading.Event()


def accept(sock: socket.socket, e: threading.Event):
    s, _ = sock.accept()
    f = s.makefile(mode='rw')
    while True:
        line = f.readline()
        print(line)
        if line.strip() == "quit":  # 注意要发quit\n
            break
        f.write('Return your msg: (}'.format(line))
        f.flush()
    f.close()
    sock.close()
    e.wait(3)


t = threading.Thread(target=accept, args=(sockserver, event))
t.start()
t.join()
print(sockserver)

makefile练习
使用makefile改写群聊类

import logging
import socket
import threading
import datetime
import socket

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")


class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # 阻塞
            # 准备接收数据,recv是阻塞的,开启新的线程
            f = sock.makefile('rw')  # 支持读写
            self.clients[client] = f  # 添加到客户端字典
            threading.Thread(target=self.recv, args=(f, client), name='recv').start()

    def recv(self, f, client):  # 接收客户端数据
        while not self.event.is_set():
            data = f.readline()  # 阻塞到换行符
            msg = data.strip()
            # 客户端退出命令
            if msg == 'quit':
                self.clients.pop(client)
                f.close()
                logging.info('{}  quits'.format(client))
                break

            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)
            logging.info(msg)
            for s in self.clients.values():
                s.write(msg)
                s.flush()

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()


cs = ChatServer()
cs.start()

while True:
    cmd = input('>>').strip()
    if cmd == 'quit':
        cs.stop()
        threading.Event().wait(3)
        break

logging.info(threading.enumerate())  # 用来观察断开后线程的变化

上例完成了基本功能,但是,如果客户端主动断开,或者readline出现异常,就不会从clients中移除作废的
socket。可以使用异常处理解决这个问题。

ChatServer实验用完整代码

注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。
增加一些异常处理。

import logging
import socket
import threading
import datetime
import socket

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")


class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # 阻塞
            # 准备接收数据,recv是阻塞的,开启新的线程
            f = sock.makefile('rw')  # 支持读写
            self.clients[client] = f  # 添加到客户端字典
            threading.Thread(target=self.recv, args=(f, client), name='recv').start()

    def recv(self, f, client):  # 接收客户端数据
        while not self.event.is_set():
            try:
                data = f.readline()  # 阻塞到换行符
            except Exception as e:
                logging.error(e)
                data = 'quit'

            msg = data.strip()
            # 客户端退出命令
            if msg == 'quit':
                self.clients.pop(client)
                f.close()
                logging.info('{}  quits'.format(client))
                break

            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)
            logging.info(msg)
            for s in self.clients.values():
                s.write(msg)
                s.flush()

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()


def main():
    cs = ChatServer()
    cs.start()

    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break

    logging.info(threading.enumerate())  # 用来观察断开后线程的变化


if __name__ == '__main__':
    main()

TCP客户端编程

客户端编程步骤
  • 创建Socket对象
  • 连接到远端服务端的ip和port,connect()方法
  • 传输数据
    使用send、recv方法发送、接收数据
  • 关闭连接,释放资源
import socket

client = socket.socket()
ipaddr = ('127.0.0.1', 9999)
client.connect(ipaddr)  # 直接连接服务器
client.send(b'abcd\n')
data = client.recv(1024)  # 阻塞等待
print(data)
client.close()

开始编写客户端类

import logging
import socket
import threading
import datetime
import socket

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")


class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}  # 客户端
        self.event = threading.Event()

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        # accept会阻塞主线程,所以开一个新线程
        threading.Thread(target=self.accept).start()

    def accept(self):  # 多人连接
        while not self.event.is_set():
            sock, client = self.sock.accept()  # 阻塞
            # 准备接收数据,recv是阻塞的,开启新的线程
            f = sock.makefile('rw')  # 支持读写
            self.clients[client] = f  # 添加到客户端字典
            threading.Thread(target=self.recv, args=(f, client), name='recv').start()

    def recv(self, f, client):  # 接收客户端数据
        while not self.event.is_set():
            try:
                data = f.readline()  # 阻塞到换行符
            except Exception as e:
                logging.error(e)
                data = 'quit'

            msg = data.strip()
            # 客户端退出命令
            if msg == 'quit':
                self.clients.pop(client)
                f.close()
                logging.info('{}  quits'.format(client))
                break

            msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)
            logging.info(msg)
            for s in self.clients.values():
                s.write(msg)
                s.flush()

    def stop(self):  # 停止服务
        for s in self.clients.values():
            s.close()
        self.sock.close()
        self.event.set()


def main():
    cs = ChatServer()
    cs.start()

    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break

    logging.info(threading.enumerate())  # 用来观察断开后线程的变化


if __name__ == '__main__':
    main()

在这里插入图片描述

在这里插入图片描述

同样,这样的客户端还是有些问题的,仅用于测试。

UDP编程

测试命令

> netstat -anp udp | find “9988” # windows查找udp是否启动端口
$ echo “123abc” | nc -u 127.0.0.1 9988 # linux下发给服务端数据\

UDP服务端编程

UDP服务端编程流程

在这里插入图片描述

  • 创建socket对象。socket.SOCK_DGRAM
  • 绑定IP和Port,bind0方法
  • 传输数据
    接收数据,socket.recvfrom(bufsize[, flagsl),获得一个二元组(string, address)
    发送数据,socket.sendto(string, address)发给某地址某信息
  • 释放资源
import socket

server = socket.socket(type=socket.SOCK_DGRAM)
server.bind(('0.0.0.0', 9999))  # 立即绑定一个udp端口
data = server.recv(1024)  # 阻塞等待数据
data = server.recvfrom(1024)  # 阻塞等待数据(value,(ip,port))
server.sendto(b'7', ('192.168.142.1', 10000))
server.close()

UDP客户端编程流程

  • 创建socket对象。 socket.SOCK_DGRAM
  • 发送数据,socket. sendto(string, address)发给某地址某信息
  • 接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组(string, address)
  • 释放资源
import socket

client = socket.socket(type=socket.SOCK_DGRAM)
raddr = ('192.168.142.1', 10000)
client.connect(raddr)
client.sendto(b'8', raddr)
client.send(b'9')
data = client.recvfrom(1024)  # 阻塞等待数据(value,(ip,port))
data = client.recv(1024)  # 阻塞等待数据
client.close() 

注意:UDP是无连接协议,所以可以只有任何一端,例如客户端数据发往服务端,服务端存在与否无所谓。

UDP编程中bind、connect、send、sendto、recv、recvfrom方法使用

UDP的socket对象创建后,是没有占用本地地址和端口的。

在这里插入图片描述

练习——UDP版群聊
UDP版群聊服务端代码

import socket


# 服务端类的基本架构
class ChatUDPServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = (ip, port)
        self.sock = socket.socket(type=socket.SOCK_DGRAM)

    def start(self):
        self.sock.bind(self.addr)  # 立即绑定
        self.sock.recvfrom(1024)  # 阻塞接收数据

    def stop(self):
        self.sock.close()

在上面代码的基础之上扩充

import socket
import threading
import datetime
import logging
from tkinter.font import names

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"

logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatUDPServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.addr = (ip, port)
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.clients = set()  # 记录客户端
        self.event = threading.Event()

    def start(self):
        self.sock.bind(self.addr)  # 立即绑定
        # 启动线程
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            data, raddr = self.sock.recvfrom(1024)  # 阻塞接收数据
            if data.strip() == b'quit':
                # 有可能发来数据的不在clients中
                if raddr in self.clients:
                    self.clients.remove(raddr)
                logging.info('{} leaving'.format(raddr))
                continue

            self.clients.add(raddr)

            msg = '{}. from {}: {}'.format(data.decode(), *raddr)
            logging.info(msg)
            msg = msg.encode()

            for c in self.clients:
                self.sock.sendto(msg, c)  # 不保证对方能够收到

    def stop(self):
        for c in self.clients:
            self.sock.sendto(b'bye', c)
        self.sock.close()
        self.event.set()


def main():
    cs = ChatUDPServer()
    cs.start()
    while True:
        cmd = input(">>>")
        if cmd.strip() == 'quit':
            cs.stop()
            break

        logging.info(threading.enumerate())

        logging.info(cs.clients)


if __name__ == '__main__':
    main()

UDP群聊客户端代码

import threading
import socket
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"

logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatUdpClient:
    def __init__(self, rip='127.0.0.1', rport=9999):
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.raddr = (rip, rport)
        self.event = threading.Event()

    def start(self):
        self.sock.connect(self.raddr)  # 占用本地地址和端口,设置远端地址和端口
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            data, raddr = self.sock.recvfrom(1024)
            msg = '{}. from {}:{}'.format(data.decode(), *raddr)
            logging.info(msg)

    def send(self, msg: str):
        self.sock.sendto(msg.encode(), self.raddr)

    def stop(self):
        self.sock.close()
        self.event.set()


def main():
    cc1 = ChatUdpClient()
    cc2 = ChatUdpClient()
    cc1.start()
    cc2.start()

    print(cc1.sock)
    print(cc2.sock)

    while True:
        cmd = input('Input your words >>')
        if cmd.strip() == 'quit':
            cc1.stop()
            cc2.stop()
            break
        cc1.send(cmd)
        cc2.send(cmd)


if __name__ == '__main__':
    main()

上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。

代码改进

服务端代码改进

加一个ack机制和心跳heartbeat。心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间
隔约定好就行。ack即响应,一端收到另一端的消息后返回的信息。

心跳机制

1、一般来说是客户端定时发往服务端的,服务端并不需要ack回复客户端,只需要记录该客户端还活着就行了。
2、如果是服务端定时发往客户端的,一般需要客户端ack响应来表示活着,如果没有收到ack的客户端,服务端
移除其信息。这种实现较为复杂,用的较少。
3、也可以双向都发心跳的,用的更少。

在服务器端代码中使用第一种机制改进

import socket
import threading
import datetime
import logging
from tkinter.font import names

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"

logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatUDPServer:
    def __init__(self, ip='127.0.0.1', port=9999,interval=10):
        self.addr = (ip, port)
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.clients = set()  # 记录客户端
        self.event = threading.Event()
        self.interval = interval  # 默认10秒,超时就要移除对应的客户端


    def start(self):
        self.sock.bind(self.addr)  # 立即绑定
        # 启动线程
        threading.Thread(target=self.recv, name='recv').start()

    def recv(self):
        while not self.event.is_set():
            localset = set() # 清理超时
            data, raddr = self.sock.recvfrom(1024)  # 阻塞接收数据
            current = datetime.datetime.now().timestamp()  # float
            if data.strip() == b'^hb^':  # 心跳信息 if
                self.clients[raddr] = current
                continue
            elif data.strip() == b'quit':
                # 有可能发来数据的不在clients中
                self.clients.pop(raddr, None)
                logging.info('{} leaving'.format(raddr))


                continue
            # 有信息来就更新时间
            # 什么时候比较心跳时间呢?发送信息的时候,反正要遍历一遍
            self.clients[raddr] = current

            msg = '{}. from {}:{}'.format(data.decode(), *raddr)
            logging.info(msg)
            msg = msg.encode()

            for c, stamp in self.clients.items():
                if current - stamp > self.interval:
                    localset.add(c)
                else:
                    self.sock.sendto(msg, c)   # 不保证对方能够收到

            for c in localset:
                self.clients.pop(c)

    def stop(self):
        for c in self.clients:
            self.sock.sendto(b'bye', c)
        self.sock.close()
        self.event.set()


def main():
    cs = ChatUDPServer()
    cs.start()
    while True:
        cmd = input(">>>")
        if cmd.strip() == 'quit':
            cs.stop()
            break

        logging.info(threading.enumerate())

        logging.info(cs.clients)


if __name__ == '__main__':
    main()

客户端代码改进
增加定时发送心跳代码

import threading
import socket
import logging

from pydantic.v1 import parse_file_as

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"

logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatUdpClient:
    def __init__(self, rip='127.0.0.1', rport=9999):
        self.sock = socket.socket(type=socket.SOCK_DGRAM)
        self.raddr = (rip, rport)
        self.event = threading.Event()

    def start(self):
        self.sock.connect(self.raddr)  # 占用本地地址和端口,设置远端地址和端口
        threading.Thread(target=self._sendhb, name='heartbeat', daemon=True).start()
        threading.Thread(target=self.recv, name='recv').start()

    def _sendhb(self):# 心跳
        while not self.event.wait(5):
            self.send(' ^hb^')

    def recv(self):
        while not self.event.is_set():
            data, raddr = self.sock.recvfrom(1024)
            msg = '{}. from {}:{}'.format(data.decode(), *raddr)
            logging.info(msg)

    def send(self, msg: str):
        print(msg)
        self.sock.sendto(msg.encode(), self.raddr)

    def stop(self):
        self.send('quit')  # 通知服务端退出
        self.sock.close()
        self.event.set()

def main():
    cc1 = ChatUdpClient()
    cc2 = ChatUdpClient()
    cc1.start()
    cc2.start()

    print(cc1.sock)
    print(cc2.sock)

    while True:
        cmd = input('Input your words >>')
        if cmd.strip() == 'quit':
            cc1.stop()
            cc2.stop()
            break
        cc1.send(cmd)
        cc2.send(cmd)


if __name__ == '__main__':
    main()

UDP协议应用

UDP是无连接协议,它基于以下假设:网络足够好 消息不会丢包 包不会乱序
但是,即使是在局域网,也不能保证不丢包,而且包的到达不一定有序。
应用场景 视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决。
海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系。DNS协议,数据内容小,一个包就能
查询到结果,不存在乱序,丟包,重新请求解析。
一般来说,UDP性能优于TCP,但是可靠性要求高的场合的还是要选择TCP协议。

SocketServer

socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层

AP进行封装,Python的封装就是——socketserver模块。它是网络服务编程框架,便于企业级快速开发。

类的继承关系

在这里插入图片描述

SocketServer简化了网络服务器的编写。

它有4个同步类:TCPServer ,UDPServer , UnixStreamServer,UnixDatagramServer。
2个Mixin类:ForkingMixIn 和 ThreadingMixIn类,用来支持异步。
class ForkingUDPServer(ForkingMixin, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServerThreadingMixIn, UDPServer): pass
class ThreadingTCPServerThreadingMixIn, TCPServer): pass
fork是创建多进程,thread是创建多线程

编程接口
socketserver.BaseServer(server_address, RequestHandlerClass)

需要提供服务器绑定的地址信息,和用于处理请求的RequestHandlerClass类。

RequestHandlerClass类必须是BaseRequestHandler类的子类,在BaseServer中代码如下:

import threading


# BaseServer代码
class BaseServer:
    def __init__(self, server_address, RequestHandlerClass):
        """Constructor. May be extended, do not override."""
        self.server_address = server_address
        self.RequestHandlerClass = RequestHandlerClass
        self.__is_shut_down = threading.Event()
        self.__shutdown_request = False


def finish_request(self, request, client_address):  # 处理请求的方法
    """Finish one request by instantiating RequestHandlerClass."""
    self.RequestHandlerClass(request, client_address, self)  # RequestHandlerClass*i*

BaseRequestHandler类

它是和用户连接的用户请求处理类的基类,定义为BaseRequestHandler(request, client_address, server)
服务端Server实例接收用户请求后,最后会实例化这个类。
它被初始化时,送入3个构造参数:request, client_address, server自身
以后就可以在BaseRequestHandler类的实例上使用以下属性:
self.request是和客户端的连接的socket对象
self.server是TCPServer本身
self.client_address是客户端地址

这个类在初始化的时候,它会依次调用3个方法。子类可以覆盖这些方法。

# BaseRequestHandler要子类覆盖的方法
class BaseRequestHandler:
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

    def setup(self):  # 每一个连接初始化
        pass

    def handle(self):  # 每一次请求处理
        pass

    def finish(self):  # 每一个连接清理
        pass

测试代码

import threading
import socketserver


class MyHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # super().handle()# 可以不调用,父类handle什么都没有做
        print('-' * 30)
        print(self.server)  # 服务
        print(self.request)  # 服务端负责客户端连接请求的socket对象
        print(self.client_address)  # 客户端地址
        print(self.__dict__)
        print(self.server.__dict__)  # 能看到负责accept的socket
        print(threading.enumerate())
        print(threading.current_thread())
        print('-' * 30)


addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever()  # 永久

测试结果说明,handle方法相当于socket的recv方法。每个不同的连接上的请求过来后,生成这个连接的socket对象即self.request,客户端地址是self.dlient_address。
问题

测试过程中,上面代码,连接后立即断开了,为什么?

怎样才能客户端和服务器端长时间连接?

import threading
import socketserver
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


class MyHandler(socketserver.BaseRequestHandler):
    def handle(self):
        # super().handle()# 可以不调用,父类handle什么都没有做
        print('-' * 30)
        print(self.server)  # 服务
        print(self.request)  # 服务端负责客户端连接请求的socket对象
        print(self.Client_address)  # 客户端地址
        print(self.__dict__)
        print(self.server.__dict__)  # 能看到负责accept的
        print(threading.enumerate())
        print(threading.current_thread())

        print('-' * 30)
        for i in range(3):
            data = self.request.recv(1024)
            logging.info(data)

        logging.info('====end====')


addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever()  # 永久

将ThreadingTCPServer换成TCPServer,同时连接2个客户端观察效果。

ThreadingTCPServer是异步的,可以同时处理多个连接。

TCPServer是同步的,一个连接处理完了,即一个连接的handle方法执行完了,才能处理另一个连接,且只有主线程。

总结

创建服务器需要几个步骤:
1.从BaseRequestHandler类派生出子类,并覆盖其handle(方法来创建请求处理程序类,此方法将处理
传入请求
2.实例化一个服务器类,传参服务器的地址和请求处理类
3.调用服务器实例的handle_request()或serve_forever()方法
4.调用server_close()关闭套接字

实现EchoServer

顾名思义,Echo,来什么消息回显什么消息
客户端发来什么信息,返回什么信息

import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys


class EchoHandler(BaseRequestHandler):
    def setup(self):
        super().setup()
        self.event = threading.Event()  # 初始工作

    def finish(self):
        super().finish()
        self.event.set()

    def handle(self):
        super().handle()
        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            msg = "{} {}".format(self.client_address, data).encode()

            self.request.send(msg)

        print('End')


addr = ('127.0.0.1', 9999)
server = ThreadingTCPServer(addr, EchoHandler)

server_thread = threading.Thread(target=server.serve_forever, name='EchoServer', daemon=True)
server_thread.start()

try:
    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            break
        print(threading.enumerate())
except Exception as e:
    print(e)
except KeyboardInterrupt:
    pass
finally:
    print('Exit')
    sys.exit(0)

练习一一改写ChatServer

使用ThreadingTCPServer改写ChatServer

import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"

logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatHandler(BaseRequestHandler):
    clients = {}

    def setup(self):
        super().setup()
        self.event = threading.Event()  # 初始工作
        self.clients[self.client_address] = self.request

    def finish(self):
        super().finish()  # 清理工作
        self.clients.pop(self.client_address)  # 能执行到吗?
        self.event.set()

    def handle(self):
        super().handle()

        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            if data == 'quit':
                break
            msg = "{} {}".format(self.client_address, data).encode()
            logging.info(msg)
            for c in self.clients.values():
                self.request.send(msg)
        print('End')


addr = ('127.0.0.1', 9999)

server = ThreadingTCPServer(addr, ChatHandler)

server_thread = threading.Thread(target=server.serve_forever, name='ChatServer', daemon=True)

server_thread.start()

try:
    while True:
        cmd = input('>>>')
        if cmd.strip() == 'quit':
            break
        print(threading.enumerate())
except Exception as e:
    print(e)
except KeyboardInterrupt:
    pass
finally:
    print('Exit')
    sys.exit(0)

问题
上例 self.clients.pop(self.client_address) 能执行到吗?

如果连接的线程中handle方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients的pop方法还能执行吗?

当然能执行,基类源码保证了即使异常,也能执行finish方法。但不代表不应该不捕获客户端各种异常。

解决客户端主动连接断开问题

如果客户端主动断开,总是抛出一个异常。看看到底发生了什么,在handle方法中增加一些语句。

import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys


class ChatHandler(BaseRequestHandler):
    clients = {}

    def setup(self):
        super().setup()
        self.event = threading.Event()  # 初始工作
        self.clients[self.client_address] = self.request


    def finish(self):
        super().finish()  # 清理工作
        self.clients.pop(self.client_address)  # 能执行到吗?
        self.event.set()


    def handle(self):
        super().handle()

        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            print(data, '~' * 30)  # 增加
            if data == 'quit':
                break
            msg = "{} {}".format(self.client_address, data).encode()
            logging.info(msg)
            for c in self.clients.values:
                print('+++++++++++++')  # 增加
                self.request.send(msg)
        print('End')

通过打印可以看到,客户端主动断开,会导致recv方法立即返回一个空bytes,并没有同时抛出异常。当循环回到recv这一句的时候就会抛出异常。所以,可以通过判断data数据是否为空来客户端是否断开。

import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys


class ChatHandler(BaseRequestHandler):
    clients = {}

    def setup(self):
        super().setup()
        self.event = threading.Event()  # 初始工作
        self.clients[self.client_address] = self.request

    def finish(self):
        super().finish()  # 清理工作
        self.Clients.pop(self.client_address)  # 能执行到吗?
        self.event.set()

    def handle(self):
        super().handle()
        while not self.event.is_set():
            data = self.request.recv(1024).decode()
            print(data, '~' * 30)
            if not data or data == 'quit':
                print('Broken pipe')
                break
            msg = "{} {}".format(self.client_address, data).encode()
            logging.info(msg)
            for c in self.clients.values:
                self.request.send(msg)
        print('End')

总结
为每一个连接提供RequestHandlerClass类实例,依次调用setup、handle、finish方法,且使用了try...finally结构保证finish方法一定能被调用。这些方法依次执行完成,如果想维持这个连接和客户端通信,就需要在handle函数中使用循环。socketserver模块提供的不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度。

异步编程

重要概念

同步、异步

函数或方法被调用的时候,调用者是否得到最终结果的。

直接得到最终结果结果的,就是同步调用;

不直接得到最终结果的,就是异步调用。

同步就是我让你打饭,你不打好给我不走开,直到你打饭给了我。
异步就是我让你打饭,你打着,我不等你,但是我会盯着你,你打完,我会过来拿走的。异步并不保证多长时间最终打完饭。

阻塞、非阻塞

函数或方法调用的时候,是否立刻返回。
立即返回就是非阻塞调用;
不立即返回就是阻塞调用。

区别

同步、异步,与阻塞、非阻塞不相关。
同步、异步强调的是,是否得到(最终的)结果;
阻塞、非阻塞强调是时间,是否等待。

同步与异步区别在于:调用者是否得到了想要的最终结果。

同步就是一直要执行到返回最终结果;

异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,还要通过被调用者,使用其它方式通知调用者,来取回最终结果。

阻塞与非阻塞的区别在于,调用者是否还能干其他事。

阻塞,调用者就只能干等;

非阻塞,调用者可以先去忙会别的,不用一直等。

联系

同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。
同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等
异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,叫号
异步非阻塞,我要打饭,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我。

同步IO、异步IO、IO 多路复用

IO 两个阶段

IO过程分两阶段:
1.数据准备阶段
2.内核空间复制回用户进程缓冲区阶段
发生I0的时候: 人的高薪职业学
1、内核从输入设备读、写数据(淘米,把米放饭锅里煮饭)
2、进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来)
系统调用一—read函数

IO模型

同步IO
同步IO 模型包括 阻塞IO 、非阻塞IO 、IO 多路复用
阻塞IO

在这里插入图片描述

进程等待(阻塞),直到读写完成。(全程等待)
read/write函数
非阻塞1O

在这里插入图片描述
进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。
第一阶段数据没有准备好,就先忙别的,等会再来看看。检 数据是否准备好了的过程是非阻塞的。
第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。
淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。
read/write

IO多路复用

所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。

select几乎所有操作系统平台都支持,poll是对的select的升级。

epoll, Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有
kqueue , Windows有iocp.

在这里插入图片描述

以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核"“监视"select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。

select举例,食堂供应很多菜(众多的I0),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待。其中一样菜好了,大师傅叫你过来说你点的菜有好的了,你得自己找找看哪一样才好了,请服务员把做好的菜打给你。

epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。

一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。

epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。

异步IO

在这里插入图片描述

进程发起异步IO请求,立即返回。内核完成1O的两个阶段,内核给进程发一个信号。

举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。

在整个过程中,进程都可以忙别的,等好了才过来。

举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。

Linux的aio的系统调用,内核从版本2.6开始支持

Python 中IO多路复用

  • IO多路复用
    1、 大多数操作系统都支持select和poll
    2、 Linux 2.5+ 支持epoll
    3、 BSD、Mac支持kqueue
    4、 Windows JOCP

Python的select库

实现了select、poll系统调用,这个基本上操作系统都支持。部分实现了epoll

底层的I0多路复用模块。

开发中的选择
1、完全跨平台,使用select、poll。但是性能较差
2、针对不同操作系统自行选择支持的技术,这样做会提高10处理的性能

selectors库

3.4 版本提供这个库,高级 IO 复用库。

类层次结构:
BaseSelector
+-- SelectSelector 实现select
+-- PollSelector 实现po11
+-- EpollSelector 实现epol1
+-- DevpollSelector 实现devpo11
+-- KqueueSelector 实现kqueue

selectors.DefaultSelector返回当前平台最有效、性能最高的实现。

但是,由于没有实现Windows下的IOCP,所以,只能退化为select。

# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
	DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
	DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
	DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
	DefaultSelector = PollSelector
else:
	DefaultSelector = SelectSelector 

abstractmethod register(fileobj, events, data=None)
为selector注册一个文件对象,监视它的IO事件。
fileobj被监视文件对象,例如socket对象
events 事件,该文件对象必须等待的事件
data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个

参数在关注的事件产生后让selector干什么事。
在这里插入图片描述

# 使用举例
import selectors
import threading
import socket
import logging

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)


# 回调函数,自己定义形参


def accept(sock: socket.socket, mask):
    """mask:事件掩码的或值"""
    conn, raddr = sock.accept()
    conn.setblocking(False)  # 不阻塞
    # 监视conn这个文件对象
    key = selector.register(conn, selectors.EVENT_READ, read)
    logging.info(key)


# 回调函数
def read(conn: socket.socket, mask):
    data = conn.recv(1024)
    msg = "Your msg is {}.", format(data.decode())
    conn.send(msg.encode())


# 构造缺省性能最优selector
selector = selectors.DefaultSelector()
# 创建Tcp Server
sock = socket.socket()
sock.bind(('127.0.0.1', 9999))

sock.listen()
logging.info(sock)

sock.setblocking(False)  # 非阻塞

# 注册文件对象sock关注读事件,返回SelectorKey
# 将sock、关注事件、data都绑定到key实例属性上
key = selector.register(sock, selectors.EVENT_READ, accept)

logging.info(key)
e = threading.Event()


def select(e):
    while not e.is_set():
        # 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组
        events = selector.select()
        print('-' * 30)
        for key, mask in events:
            logging.info(key)
            logging.info(mask)
            callback = key.data  # 回调函数
            callback(key.fileobj, mask)


threading.Thread(target=select, args=(e,), name='select').start()


def main():
    while not e.is_set():
        cmd = input('>>')
        if cmd.strip() == 'quit':
            e.set()
            fobjs = []
            logging.info('{}'.format(list(selector.get_map().items())))

            for fd, key in selector.get_map().items():  # 返回注册的项
                print(fd, key)
                print(key.fileobj)
                fobjs.append(key.fileobj)
            for fobj in fobjs:
                selector.unregister(fobj)
                fobj.close()  # 关闭socket
            selector.close()


if __name__ == '__main__':
    main()

练习

将ChatServer改写成IO多路复用的方式
不需要启动多线程来执行socket的accept、recv方法了

import socket
import threading
import datetime
import logging
import selectors

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"

logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatServer:

    def __init__(self, ip='127.0.0.1', port=9999):  # 启动服务
        self.sock = socket.socket()
        self.addr = (ip, port)

        self.event = threading.Event()
        self.selector = selectors.DefaultSelector()  # 创建selector

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        self.sock.setblocking(False)  # 不阻塞
        # 注册
        self.selector.register(self.sock, selectors.EVENT_READ, self.accept)

        threading.Thread(target=self.select, name='selector', daemon=True).start()

    def select(self):  # 阻基
        while not self.event.is_set():
            # 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组
            events = self.selector.select()

            print('-' * 30)
            for key, mask in events:
                logging.info(key)
                logging.info(mask)
                callback = key.data  # 回调函数
                callback(key.fileobj)

    def accept(self, sock: socket.socket):  # 多人连接
        conn, addr = sock.accept()  # BaZE
        conn.setblocking(False)  # 非阻塞
        # 注册,监视每一个连接的socket对象
        self.selector.register(conn, selectors.EVENT_READ, self.recv)

    def recv(self, sock: socket.socket):  # 接收客户端数据
        data = sock.recv(1024)  # 阻塞到数据到来
        if data == b'':  # 客户端主动断开,注销并关闭socket
            self.selector.unregister(sock)
            sock.close()
            return

        msg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *sock.getpeername(), data.decode())
        logging.info(msg)
        msg = msg.encode()
        # 群发
        for key in self.selector.get_map().values():
            if key.data == self.recv:  # #ißself.accept
                key.fileobj.send(msg)

    def stop(self):  # 停止服务
        self.event.set()
        fobjs = []
        for fd, key in self.selector.get_map().items():
            fobjs.append(key.fileobj)
        for fobj in fobjs:
            self.selector.unregister(fobj)
            fobj.close()
        self.selector.close()


def main():
    cs = ChatServer()
    cs.start()
    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break
        logging.info(threading.enumerate())


if __name__ == '__main__':
    main()

基本完成功能,但是退出机制、异常处理没有加,这个和以前的处理方式一样,请自行完成。

进阶
send是写操作,也可以让selector监听,如何监听?
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)
注册语句,要监听selectors.EVENT_READ | selectors.EVENT_WRITE 读与写事件。

回调的时候,需要mask来判断究竟是读触发还是写触发了。所以,需要修改方法声明,增加mask。

def recv(self, sock, mask)但是由于recv 方法处理读和写事件,所以叫recv不太合适,改名为
def handle(self, sock, mask)
注意读和写是分离的,那么handle函数应该写成下面这样

def handle(self, sock:socket.socket,mask) #  接收客户端数据
    if mask & selectors. EVENT_READ:
        pass
    # 注意,这里是某一个socket的写操作
    if mask & selectors.EVENT_WRITE:# 写缓冲区准备好了,可以写入数据了
        pass

handle方法里面处理读、写,mask有可能是0b01、0b10、0b11。

问题是,假没读取到了客户端发来的数据后,如何写出去?

为每一个与客户端连接的socket对象增加对应的队列。

与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到信息后,会遍历发给所有客户端的队

列。这里完成一对多,即一份数据放到了所有队列中。

与每一个客户端连接的socket对象,发现自己队列有数据,就发送给客户端。

import socket
import threading
import datetime
import logging
import selectors
from queue import Queue

FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"

logging.basicConfig(format=FORMAT, level=logging.INFO)


class ChatServer:
    def __init__(self, ip='127.0.0.1', port=9999):
        self.sock = socket.socket()
        self.addr = (ip, port)
        self.clients = {}
        self.event = threading.Event()
        self.selector = selectors.DefaultSelector()  # 创建 selector

    def start(self):  # 启动监听
        self.sock.bind(self.addr)  # 绑定
        self.sock.listen()  # 监听
        self.sock.setblocking(False)  # 不阻塞

        # 注册
        self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
        threading.Thread(target=self.select, name='selector', daemon=True).start()

    def select(self):  # 阻塞
        while not self.event.is_set():
            # 开始监视,等到某文件对象被监控的事件产生,返回(key,mask)元组
            events = self.selector.select()  # 阻塞,直到events
            for key, mask in events:
                if callable(key.data):
                    callback = key.data  # key对象的data属性,回调
                    callback(key.fileobj, mask)
                else:
                    callback = key.data[0]
                    callback(key, mask)

    def accept(self, sock: socket.socket, mask):  # 接收客户端连接
        conn, raddr = sock.accept()
        conn.setblocking(False)  # 非阻塞
        self.clients[raddr] = (self.handle, Queue())
        # 注册,监视每一个与客户端的连接的socket对象
        self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.clients[raddr])

    def handle(self, key: selectors.SelectorKey, mask):  # 接收客户端数据
        if mask & selectors.EVENT_READ:
            sock = key.fileobj
            raddr = sock.getpeername()
            data = sock.recv(1024)
            if not data or data == b'quit':
                self.selector.unregister(sock)
                sock.close()
                self.clients.pop(raddr)
                return

        msg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *raddr, data.decode())
        logging.info(msg)
        msg = msg.encode()
        for k in self.selector.get_map().values():
            logging.info(k)
            if isinstance(k.data, tuple):
                k.data[1].put(data)

        if mask & selectors.EVENT_WRITE:
            # 因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样
            if not key.data[1].empty():
                key.fileobj.send(key.data[1].get())

    def stop(self):  # 停止服务
        self.event.set()
        fobjs = []
        for fd, key in self.selector.get_map().items():
            fobjs.append(key.fileobj)

        for fobj in fobjs:
            self.selector.unregister(fobj)
            fobj.close()
        self.selector.close()


def main():
    cs = ChatServer()
    cs.start()
    while True:
        cmd = input('>>').strip()
        if cmd == 'quit':
            cs.stop()
            threading.Event().wait(3)
            break
        logging.info(threading.enumerate())
        logging.info('-' * 30)
        logging.info("{} {}".format(len(cs.clients), cs.clients))
        logging.info(list(map(lambda x: (x.fileobj.fileno(), x.data), cs.selector.get_map().values())))
        logging.info('-' * 30)


if __name__ == '__main__':
    main()

这个程序最大的问题,在select0一直判断可写,几乎一直循环不停。所以对于写不频繁的情况下,就不要监听EVENT_WRITE。

对于Server来说,一般来说,更多的是等待对方发来数据后响应时才发出数据,而不是积极的等着发送数据。所以监听EVENT_READ,收到数据之后再发送就可以了。

本例只完成基本功能,其他功能如有需要,请自行完成。

asyncio

3.4版本加入标准库。
asyncio底层基于selectors实现,看似库,其实就是个框架,包含异步IO、事件循环、协程、任务等内容。

问题的引出

def a():
    for x in range(3):
        print(x)


def b():
    for x in "abc":
        print(x)


a()
b()

输出:
0
1
2
a
b
c

这是一个串行的程序,单线程中根本没有做任何能否并行?

import threading
import time


def a():
    for x in range(3):
        time.sleep(0.001)  #
   	 	print(x)


def b():
    for x in "abc":
        time.sleep(0.001)
        print(x)


threading.Thread(target=a, name='a').start()
threading.Thread(target=b, name='b').start()
# 运行结果
0
a
1
b
2
c

import multiprocessing
import time


def a():
    for x in range(3):
        time.sleep(0.001)
        print(x)


def b():
    for x in "abc":
        time.sleep(0.001)
        print(x)


if __name__ == "__main__":
    multiprocessing.Process(target=a, name='a').start()
    multiprocessing.Process(target=b, name='b').start()
输出:
a
0
b
1
c
2

生成器版本

def a():
    for x in range(3):
        print(x)
        yield


def b():
    for x in "abc":
        print(x)
        yield


x = a()
y = b()

for i in range(3):
    next(x)
    next(y)
输出:
0
a
1
b
2
c

上例在一个线程内通过生成器完成了调度,让两个函数都有机会执行,这样的调度不是操作系统的进程、线程完成的,而是用户自己设计的。

这个程序编写:
需要使用yield来让出控制权

需要循环帮助交替执行

事件循环

事件循环是asyncio提供的核心运行机制。

在这里插入图片描述

协程

  • 协程不是进程、也不是线程,它是用户空间调度的完成并发处理的方式。
  • 进程、线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,自然也没有多线程切换
    带来的开销。
  • 协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度。
  • 协程也不需要使用锁机制,因为是在同一个线程中执行。
  • 多CPU下,可以使用多进程和协程配合,既能进程并发又能发挥协程在单线程中的优势。
  • Python中协程是基于生成器的。

协程的使用
3.4引|入asyncio,使用装饰器

import asyncio


@asyncio.coroutine
def sleep(x):  # 协程函数
    for i in range(3):
        print('sleep {}'.format(i))
        yield from asyncio.sleep(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()

将生成器函数转换成协程函数,就可以在事件循环中执行了。

3.5版本开始,Python提供关键字async、await,在语言上原生支持协程。

import asyncio


async def sleep(x):
    for i in range(3):
        print('sleep (}'.format(i))
        await asyncio.sleep(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()

async def用来定义协程函数,iscoroutinefunction() 返回True。协程函数中可以不包含await、 async关键字,但不能使用yield关键字。

如同生成器函数调用返回生成器对象一样,协程函数调用也会返回一个对象称为协程对象,iscoroutine()返回True.

再来做个例子

import asyncio
import threading


async def sleep(x):
    for i in range(3):
        print('sleep {}'.format(i))
        await asyncio.sleep(x)


async def showthread(x):
    for i in range(3):
        print(threading.enumerate())
        await asyncio.sleep(2)


loop = asyncio.get_event_loop()

tasks = [sleep(3), showthread(3)]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()
# 协程版本
import asyncio
import threading


@asyncio.coroutine
def a():
    for x in range(3):
        print('a.x', x)
        yield


@asyncio.coroutine
def b():
    for x in 'abc':
        print('b.x', x)
        yield


print(asyncio.iscoroutinefunction(a))
print(asyncio.iscoroutinefunction(b))

# 大循环
loop = asyncio.get_event_loop()
tasks = [a(), b()]

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

TCP Echo Server 举例

import asyncio


# TCP Echo Server 举例
async def handle(reader, writer):
    while True:
        data = await reader.read(1024)
        print(dir(reader))
        print(dir(writer))

        client = writer.get_extra_info('peername')

        message = "{} Your msg (}".format(client, data.decode()).encode()

        writer.write(message)

        await writer.drain()


loop = asyncio.get_event_loop()
ip = '127.0.0.1'
port = 9999
crt = asyncio.start_server(handle, ip, port, loop=loop)

server = loop.run_until_complete(crt)

print(server)  # server是监听的socket对象

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    server.close()
    loop.close()

aiohttp库

安装
$ pip install aiohttp
开发

HTTP Server

from aiohttp import web


async def indexhandle(request: web.Request):
    return web.Response(text=request.path, status=201)


async def handle(request: web.Request):
    print(request.match_info)
    print(request.query_string)  # http://127.0.0.1:8080/1?name=12301

    return web.Response(text=request.match_info.get('id', '0000'), status=200)


app = web.Application()

app.router.add_get("/", indexhandle)  # http://127.0.0.1:8080/

app.router.add_get("/{id}", handle)  # http://127.0.0.1:8080/12301

web.run_app(app, host='0.0.0.0', port=9977)

HTTP Client

import asyncio
from aiohttp import ClientSession


async def get_html(url: str):
    async with ClientSession() as session:
        async with session.get(url) as res:
            print(res.status)
            print(await res.text())


url = 'http://127.0.0.1/ziroom-web/'

loop = asyncio.get_event_loop()
loop.run_until_complete(get_html(url))

loop.close()


网站公告

今日签到

点亮在社区的每一天
去签到