序
欠4前年的一份笔记 ,献给今后的自己。
网络编程
Socket介绍
Socket套接字
Python中提供socket.py标准库,非常底层的接口库。
Socket是一种通用的网络编程接口,和网络层次没有一一对应的关系。
协议族
AF表示Address Family,用于socket()第一个参数
TCP编程
Socket编程,需要两端,一般来说需要一个服务端、一个客户端,服务端称为Server,客户端称为Client
TCP服务端编程
服务器端编程步骤
- 创建Socket对象
- 绑定IP地址Address和端口Port。bind()方法
IPv4地址为一个二元组(IP地址字符串,Port) - 开始监听,将在指定的IP的端口上监听。listen()方法
- 获取用于传送数据的Socket对象
socket.accept() -> (socket object, address info)
accept方法阻塞等待客户端建立连接,返回一个新的Socket对象和客户端地址的二元组
地址是远程客户端的地址,IPv4中它是一个二元组(clientaddr, port)
1、接收数据 : recv(bufsize[, flags]) 使用缓冲区接收数据
2、发送数据 : send(bytes) 发送数据
问题
两次绑定同一个监听端口会怎么样?
import socket
s = socket.socket() # 创建socket对象
s.bind(('127.0.0.1', 9999)) # 一个二元组
s.listen() # 开始监听
# 开启一个连接
s1, info = s.accept() # 阻塞直到和客户端成功建立连接,返回一个socket对象和客户端地址
# 使用缓冲区获取数据
data = s1.recv(1024)
print(data, info)
s1.send(b'magedu.com')
# 开启另外一个连接
s2, _ = s.accept()
data = s2.recv(1024)
s2.send(b'hello python')
s.close()
上例accept和recv是阻塞的,主线程经常被阻塞住而不能工作。怎么办?
练习一一写一个群聊程序
需求分析
聊天工具是CS程序,C是每一个客户端,S是服务器端。
服务器应该具有的功能:
启动服务,包括绑定地址和端口,监听
建立连接,能和多个客户端建立连接
接收不同用户的信息
分发,将接收的某个用户的信息转发到已连接的所有客户端
停止服务
记录连接的客户端
代码实现
服务端应该对应一个类
class ChatServer:
def __init__(self, ip, port): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
def start(self): # 启动监听
pass
def accept(self): # 多人连接
pass
def recv(self): # 接收客户端数据
pass
def stop(self): # 停止服务
pass
在此基础上,扩展完成
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
def start(self): # 启动监听
self.sock.bind(self.addr) # $B
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while True:
sock, client = self.sock.accept() # 阻塞
self.clients[client] = sock # 添加到客户端字典
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock: socket.socket, client): # 接收客户端数据
while True:
data = sock.recv(1024) # 阻塞到数据到来
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())
logging.info(msg)
msg = msg.encode()
for s in self.clients.values():
s.send(msg)
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
cs = ChatServer()
cs.start
在此基础上,扩展完成
import threading
import datetime
import logging
import socket
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
logging.info('Connecting to Chat Server')
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 378
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # BE%
self.clients[client] = sock # 添加到客户端字典
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock: socket.socket, client): # 接收客户端数据
while not self.event.is_set():
data = sock.recv(1024) # 阻塞到数据到来
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())
logging.info(msg)
msg = msg.encode()
for s in self.clients.values():
s.send(msg)
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
基本功能完成,但是有问题。使用Event改进。
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) #绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # BE
self.clients[client] = sock # 添加到客户端字典
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock: socket.socket, client): # 接收客户端数据
while not self.event.is_set():
data = sock.recv(1024) # 阻塞到数据到来
msg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())
logging.info(msg)
msg = msg.encode()
for s in self.clients.values():
s.send(msg)
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
这一版基本能用了,测试通过。但是还有要完善的地方。
例如各种异常的判断,客户端断开连接后字典中的移除客户端数据等。
客户端主动断开带来的问题
服务端知道自己何时断开,如果客户端断开,服务器不知道。
所以,好的做法是,客户端断开发出特殊消息通知服务器端断开连接。但是,如果客户端主动断开,服务端主动发
送一个空消息,超时返回异常,捕获异常并清理连接。
即使 客户端提供了断开命令,也不能保证客户端会使用它断开连接。但是还是要增加这个退出功能。
增加客户端退出命令
import logging
import socket
import threading
import datetime
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # BE
self.clients[client] = sock # 添加到客户端字典
# 准备接收数据,recv是阻塞的,开启新的线程
threading.Thread(target=self.recv, args=(sock, client)).start()
def recv(self, sock: socket.socket, client): # 接收客户端数据
while not self.event.is_set():
data = sock.recv(1024) # 阻塞到数据到来
msg = data.decode().strip()
# 客户端退出命令
if msg == 'quit':
self.clients.pop(client)
sock.close()
logging.info('{} quits'.format(client))
break
msg = "{:%Y/%m/%d %H:%M:%S} {}: {}:{}\n{}\n".format(datetime.datetime.now(), *client, data.decode())
logging.info(msg)
msg = msg.encode()
for s in self.clients.values():
s.send(msg)
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate()) # 用来观察断开后线程的变化
程序还有瑕疵,但是业务功能基本完成了
socket常用方法
MakeFile
socket-makefile(mode=‘r’, buffering=None, *, encoding=None, errors=None, newline=None)
创建一个与该套接字相关连的文件对象,将recv方法看做读方法,将send方法看做写方法。
# 使用makefile简单例子
import socket
sockserver = socket.socket()
ip = '127.0.0.1'
port = 9999
addr = (ip, port)
sockserver.bind(addr)
sockserver.listen()
print(' - ' * 30)
s, _ = sockserver.accept()
print(' - ' * 30)
f = s.makefile(mode='rw')
line = f.read(10) # 阻塞等
print(' - ' * 30)
print(line)
f.write('Return your msg: {}'.format(line))
f.flush()
上例不能循环接收消息,修改一下
# 使用makefile简单例子
import socket
import threading
sockserver = socket.socket()
ip = '127.0.0.1'
port = 9999
addr = (ip, port)
sockserver.bind(addr)
sockserver.listen()
print(' - ' * 30)
event = threading.Event()
def accept(sock: socket.socket, e: threading.Event):
s, _ = sock.accept()
f = s.makefile(mode='rw')
while True:
line = f.readline()
print(line)
if line.strip() == "quit": # 注意要发quit\n
break
f.write('Return your msg: (}'.format(line))
f.flush()
f.close()
sock.close()
e.wait(3)
t = threading.Thread(target=accept, args=(sockserver, event))
t.start()
t.join()
print(sockserver)
makefile练习
使用makefile改写群聊类
import logging
import socket
import threading
import datetime
import socket
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # 阻塞
# 准备接收数据,recv是阻塞的,开启新的线程
f = sock.makefile('rw') # 支持读写
self.clients[client] = f # 添加到客户端字典
threading.Thread(target=self.recv, args=(f, client), name='recv').start()
def recv(self, f, client): # 接收客户端数据
while not self.event.is_set():
data = f.readline() # 阻塞到换行符
msg = data.strip()
# 客户端退出命令
if msg == 'quit':
self.clients.pop(client)
f.close()
logging.info('{} quits'.format(client))
break
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)
logging.info(msg)
for s in self.clients.values():
s.write(msg)
s.flush()
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate()) # 用来观察断开后线程的变化
上例完成了基本功能,但是,如果客户端主动断开,或者readline出现异常,就不会从clients中移除作废的
socket。可以使用异常处理解决这个问题。
ChatServer实验用完整代码
注意,这个代码为实验用,代码中瑕疵还有很多。Socket太底层了,实际开发中很少使用这么底层的接口。
增加一些异常处理。
import logging
import socket
import threading
import datetime
import socket
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # 阻塞
# 准备接收数据,recv是阻塞的,开启新的线程
f = sock.makefile('rw') # 支持读写
self.clients[client] = f # 添加到客户端字典
threading.Thread(target=self.recv, args=(f, client), name='recv').start()
def recv(self, f, client): # 接收客户端数据
while not self.event.is_set():
try:
data = f.readline() # 阻塞到换行符
except Exception as e:
logging.error(e)
data = 'quit'
msg = data.strip()
# 客户端退出命令
if msg == 'quit':
self.clients.pop(client)
f.close()
logging.info('{} quits'.format(client))
break
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)
logging.info(msg)
for s in self.clients.values():
s.write(msg)
s.flush()
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
def main():
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate()) # 用来观察断开后线程的变化
if __name__ == '__main__':
main()
TCP客户端编程
客户端编程步骤
- 创建Socket对象
- 连接到远端服务端的ip和port,connect()方法
- 传输数据
使用send、recv方法发送、接收数据 - 关闭连接,释放资源
import socket
client = socket.socket()
ipaddr = ('127.0.0.1', 9999)
client.connect(ipaddr) # 直接连接服务器
client.send(b'abcd\n')
data = client.recv(1024) # 阻塞等待
print(data)
client.close()
开始编写客户端类
import logging
import socket
import threading
import datetime
import socket
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(thread)d %(message)s")
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {} # 客户端
self.event = threading.Event()
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
# accept会阻塞主线程,所以开一个新线程
threading.Thread(target=self.accept).start()
def accept(self): # 多人连接
while not self.event.is_set():
sock, client = self.sock.accept() # 阻塞
# 准备接收数据,recv是阻塞的,开启新的线程
f = sock.makefile('rw') # 支持读写
self.clients[client] = f # 添加到客户端字典
threading.Thread(target=self.recv, args=(f, client), name='recv').start()
def recv(self, f, client): # 接收客户端数据
while not self.event.is_set():
try:
data = f.readline() # 阻塞到换行符
except Exception as e:
logging.error(e)
data = 'quit'
msg = data.strip()
# 客户端退出命令
if msg == 'quit':
self.clients.pop(client)
f.close()
logging.info('{} quits'.format(client))
break
msg = "{:%Y/%m/%d %H:%M:%S} {}:{}\n{}\n".format(datetime.datetime.now(), *client, data)
logging.info(msg)
for s in self.clients.values():
s.write(msg)
s.flush()
def stop(self): # 停止服务
for s in self.clients.values():
s.close()
self.sock.close()
self.event.set()
def main():
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate()) # 用来观察断开后线程的变化
if __name__ == '__main__':
main()
同样,这样的客户端还是有些问题的,仅用于测试。
UDP编程
测试命令
> netstat -anp udp | find “9988” # windows查找udp是否启动端口
$ echo “123abc” | nc -u 127.0.0.1 9988 # linux下发给服务端数据\
UDP服务端编程
UDP服务端编程流程
- 创建socket对象。socket.SOCK_DGRAM
- 绑定IP和Port,bind0方法
- 传输数据
接收数据,socket.recvfrom(bufsize[, flagsl),获得一个二元组(string, address)
发送数据,socket.sendto(string, address)发给某地址某信息 - 释放资源
import socket
server = socket.socket(type=socket.SOCK_DGRAM)
server.bind(('0.0.0.0', 9999)) # 立即绑定一个udp端口
data = server.recv(1024) # 阻塞等待数据
data = server.recvfrom(1024) # 阻塞等待数据(value,(ip,port))
server.sendto(b'7', ('192.168.142.1', 10000))
server.close()
UDP客户端编程流程
- 创建socket对象。 socket.SOCK_DGRAM
- 发送数据,socket. sendto(string, address)发给某地址某信息
- 接收数据,socket.recvfrom(bufsize[, flags]),获得一个二元组(string, address)
- 释放资源
import socket
client = socket.socket(type=socket.SOCK_DGRAM)
raddr = ('192.168.142.1', 10000)
client.connect(raddr)
client.sendto(b'8', raddr)
client.send(b'9')
data = client.recvfrom(1024) # 阻塞等待数据(value,(ip,port))
data = client.recv(1024) # 阻塞等待数据
client.close()
注意:UDP是无连接协议,所以可以只有任何一端,例如客户端数据发往服务端,服务端存在与否无所谓。
UDP编程中bind、connect、send、sendto、recv、recvfrom方法使用
UDP的socket对象创建后,是没有占用本地地址和端口的。
练习——UDP版群聊
UDP版群聊服务端代码
import socket
# 服务端类的基本架构
class ChatUDPServer:
def __init__(self, ip='127.0.0.1', port=9999):
self.addr = (ip, port)
self.sock = socket.socket(type=socket.SOCK_DGRAM)
def start(self):
self.sock.bind(self.addr) # 立即绑定
self.sock.recvfrom(1024) # 阻塞接收数据
def stop(self):
self.sock.close()
在上面代码的基础之上扩充
import socket
import threading
import datetime
import logging
from tkinter.font import names
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUDPServer:
def __init__(self, ip='127.0.0.1', port=9999):
self.addr = (ip, port)
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.clients = set() # 记录客户端
self.event = threading.Event()
def start(self):
self.sock.bind(self.addr) # 立即绑定
# 启动线程
threading.Thread(target=self.recv, name='recv').start()
def recv(self):
while not self.event.is_set():
data, raddr = self.sock.recvfrom(1024) # 阻塞接收数据
if data.strip() == b'quit':
# 有可能发来数据的不在clients中
if raddr in self.clients:
self.clients.remove(raddr)
logging.info('{} leaving'.format(raddr))
continue
self.clients.add(raddr)
msg = '{}. from {}: {}'.format(data.decode(), *raddr)
logging.info(msg)
msg = msg.encode()
for c in self.clients:
self.sock.sendto(msg, c) # 不保证对方能够收到
def stop(self):
for c in self.clients:
self.sock.sendto(b'bye', c)
self.sock.close()
self.event.set()
def main():
cs = ChatUDPServer()
cs.start()
while True:
cmd = input(">>>")
if cmd.strip() == 'quit':
cs.stop()
break
logging.info(threading.enumerate())
logging.info(cs.clients)
if __name__ == '__main__':
main()
UDP群聊客户端代码
import threading
import socket
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUdpClient:
def __init__(self, rip='127.0.0.1', rport=9999):
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.raddr = (rip, rport)
self.event = threading.Event()
def start(self):
self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口
threading.Thread(target=self.recv, name='recv').start()
def recv(self):
while not self.event.is_set():
data, raddr = self.sock.recvfrom(1024)
msg = '{}. from {}:{}'.format(data.decode(), *raddr)
logging.info(msg)
def send(self, msg: str):
self.sock.sendto(msg.encode(), self.raddr)
def stop(self):
self.sock.close()
self.event.set()
def main():
cc1 = ChatUdpClient()
cc2 = ChatUdpClient()
cc1.start()
cc2.start()
print(cc1.sock)
print(cc2.sock)
while True:
cmd = input('Input your words >>')
if cmd.strip() == 'quit':
cc1.stop()
cc2.stop()
break
cc1.send(cmd)
cc2.send(cmd)
if __name__ == '__main__':
main()
上面的例子并不完善,如果客户端断开了,服务端不知道。每一个服务端还需要对所有客户端发送数据,包括已经断开的客户端。
代码改进
服务端代码改进
加一个ack机制和心跳heartbeat。心跳,就是一端定时发往另一端的信息,一般每次数据越少越好。心跳时间间
隔约定好就行。ack即响应,一端收到另一端的消息后返回的信息。
心跳机制
1、一般来说是客户端定时发往服务端的,服务端并不需要ack回复客户端,只需要记录该客户端还活着就行了。
2、如果是服务端定时发往客户端的,一般需要客户端ack响应来表示活着,如果没有收到ack的客户端,服务端
移除其信息。这种实现较为复杂,用的较少。
3、也可以双向都发心跳的,用的更少。
在服务器端代码中使用第一种机制改进
import socket
import threading
import datetime
import logging
from tkinter.font import names
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUDPServer:
def __init__(self, ip='127.0.0.1', port=9999,interval=10):
self.addr = (ip, port)
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.clients = set() # 记录客户端
self.event = threading.Event()
self.interval = interval # 默认10秒,超时就要移除对应的客户端
def start(self):
self.sock.bind(self.addr) # 立即绑定
# 启动线程
threading.Thread(target=self.recv, name='recv').start()
def recv(self):
while not self.event.is_set():
localset = set() # 清理超时
data, raddr = self.sock.recvfrom(1024) # 阻塞接收数据
current = datetime.datetime.now().timestamp() # float
if data.strip() == b'^hb^': # 心跳信息 if
self.clients[raddr] = current
continue
elif data.strip() == b'quit':
# 有可能发来数据的不在clients中
self.clients.pop(raddr, None)
logging.info('{} leaving'.format(raddr))
continue
# 有信息来就更新时间
# 什么时候比较心跳时间呢?发送信息的时候,反正要遍历一遍
self.clients[raddr] = current
msg = '{}. from {}:{}'.format(data.decode(), *raddr)
logging.info(msg)
msg = msg.encode()
for c, stamp in self.clients.items():
if current - stamp > self.interval:
localset.add(c)
else:
self.sock.sendto(msg, c) # 不保证对方能够收到
for c in localset:
self.clients.pop(c)
def stop(self):
for c in self.clients:
self.sock.sendto(b'bye', c)
self.sock.close()
self.event.set()
def main():
cs = ChatUDPServer()
cs.start()
while True:
cmd = input(">>>")
if cmd.strip() == 'quit':
cs.stop()
break
logging.info(threading.enumerate())
logging.info(cs.clients)
if __name__ == '__main__':
main()
客户端代码改进
增加定时发送心跳代码
import threading
import socket
import logging
from pydantic.v1 import parse_file_as
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatUdpClient:
def __init__(self, rip='127.0.0.1', rport=9999):
self.sock = socket.socket(type=socket.SOCK_DGRAM)
self.raddr = (rip, rport)
self.event = threading.Event()
def start(self):
self.sock.connect(self.raddr) # 占用本地地址和端口,设置远端地址和端口
threading.Thread(target=self._sendhb, name='heartbeat', daemon=True).start()
threading.Thread(target=self.recv, name='recv').start()
def _sendhb(self):# 心跳
while not self.event.wait(5):
self.send(' ^hb^')
def recv(self):
while not self.event.is_set():
data, raddr = self.sock.recvfrom(1024)
msg = '{}. from {}:{}'.format(data.decode(), *raddr)
logging.info(msg)
def send(self, msg: str):
print(msg)
self.sock.sendto(msg.encode(), self.raddr)
def stop(self):
self.send('quit') # 通知服务端退出
self.sock.close()
self.event.set()
def main():
cc1 = ChatUdpClient()
cc2 = ChatUdpClient()
cc1.start()
cc2.start()
print(cc1.sock)
print(cc2.sock)
while True:
cmd = input('Input your words >>')
if cmd.strip() == 'quit':
cc1.stop()
cc2.stop()
break
cc1.send(cmd)
cc2.send(cmd)
if __name__ == '__main__':
main()
UDP协议应用
UDP是无连接协议,它基于以下假设:网络足够好 消息不会丢包 包不会乱序
但是,即使是在局域网,也不能保证不丢包,而且包的到达不一定有序。
应用场景 视频、音频传输,一般来说,丢些包,问题不大,最多丢些图像、听不清话语,可以重新发话语来解决。
海量采集数据,例如传感器发来的数据,丢几十、几百条数据也没有关系。DNS协议,数据内容小,一个包就能
查询到结果,不存在乱序,丟包,重新请求解析。
一般来说,UDP性能优于TCP,但是可靠性要求高的场合的还是要选择TCP协议。
SocketServer
socket编程过于底层,编程虽然有套路,但是想要写出健壮的代码还是比较困难的,所以很多语言都对socket底层
AP进行封装,Python的封装就是——socketserver模块。它是网络服务编程框架,便于企业级快速开发。
类的继承关系
SocketServer简化了网络服务器的编写。
它有4个同步类:TCPServer ,UDPServer , UnixStreamServer,UnixDatagramServer。
2个Mixin类:ForkingMixIn 和 ThreadingMixIn类,用来支持异步。
class ForkingUDPServer(ForkingMixin, UDPServer): pass
class ForkingTCPServer(ForkingMixIn, TCPServer): pass
class ThreadingUDPServerThreadingMixIn, UDPServer): pass
class ThreadingTCPServerThreadingMixIn, TCPServer): pass
fork是创建多进程,thread是创建多线程
编程接口
socketserver.BaseServer(server_address, RequestHandlerClass)
需要提供服务器绑定的地址信息,和用于处理请求的RequestHandlerClass类。
RequestHandlerClass类必须是BaseRequestHandler类的子类,在BaseServer中代码如下:
import threading
# BaseServer代码
class BaseServer:
def __init__(self, server_address, RequestHandlerClass):
"""Constructor. May be extended, do not override."""
self.server_address = server_address
self.RequestHandlerClass = RequestHandlerClass
self.__is_shut_down = threading.Event()
self.__shutdown_request = False
def finish_request(self, request, client_address): # 处理请求的方法
"""Finish one request by instantiating RequestHandlerClass."""
self.RequestHandlerClass(request, client_address, self) # RequestHandlerClass*i*
BaseRequestHandler类
它是和用户连接的用户请求处理类的基类,定义为BaseRequestHandler(request, client_address, server)
服务端Server实例接收用户请求后,最后会实例化这个类。
它被初始化时,送入3个构造参数:request, client_address, server自身
以后就可以在BaseRequestHandler类的实例上使用以下属性:
self.request是和客户端的连接的socket对象
self.server是TCPServer本身
self.client_address是客户端地址
这个类在初始化的时候,它会依次调用3个方法。子类可以覆盖这些方法。
# BaseRequestHandler要子类覆盖的方法
class BaseRequestHandler:
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
def setup(self): # 每一个连接初始化
pass
def handle(self): # 每一次请求处理
pass
def finish(self): # 每一个连接清理
pass
测试代码
import threading
import socketserver
class MyHandler(socketserver.BaseRequestHandler):
def handle(self):
# super().handle()# 可以不调用,父类handle什么都没有做
print('-' * 30)
print(self.server) # 服务
print(self.request) # 服务端负责客户端连接请求的socket对象
print(self.client_address) # 客户端地址
print(self.__dict__)
print(self.server.__dict__) # 能看到负责accept的socket
print(threading.enumerate())
print(threading.current_thread())
print('-' * 30)
addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever() # 永久
测试结果说明,handle方法相当于socket的recv方法。每个不同的连接上的请求过来后,生成这个连接的socket对象即self.request,客户端地址是self.dlient_address。
问题
测试过程中,上面代码,连接后立即断开了,为什么?
怎样才能客户端和服务器端长时间连接?
import threading
import socketserver
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class MyHandler(socketserver.BaseRequestHandler):
def handle(self):
# super().handle()# 可以不调用,父类handle什么都没有做
print('-' * 30)
print(self.server) # 服务
print(self.request) # 服务端负责客户端连接请求的socket对象
print(self.Client_address) # 客户端地址
print(self.__dict__)
print(self.server.__dict__) # 能看到负责accept的
print(threading.enumerate())
print(threading.current_thread())
print('-' * 30)
for i in range(3):
data = self.request.recv(1024)
logging.info(data)
logging.info('====end====')
addr = ('127.0.0.1', 9999)
server = socketserver.ThreadingTCPServer(addr, MyHandler)
server.serve_forever() # 永久
将ThreadingTCPServer换成TCPServer,同时连接2个客户端观察效果。
ThreadingTCPServer是异步的,可以同时处理多个连接。
TCPServer是同步的,一个连接处理完了,即一个连接的handle方法执行完了,才能处理另一个连接,且只有主线程。
总结
创建服务器需要几个步骤:
1.从BaseRequestHandler类派生出子类,并覆盖其handle(方法来创建请求处理程序类,此方法将处理
传入请求
2.实例化一个服务器类,传参服务器的地址和请求处理类
3.调用服务器实例的handle_request()或serve_forever()方法
4.调用server_close()关闭套接字
实现EchoServer
顾名思义,Echo,来什么消息回显什么消息
客户端发来什么信息,返回什么信息
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
class EchoHandler(BaseRequestHandler):
def setup(self):
super().setup()
self.event = threading.Event() # 初始工作
def finish(self):
super().finish()
self.event.set()
def handle(self):
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
msg = "{} {}".format(self.client_address, data).encode()
self.request.send(msg)
print('End')
addr = ('127.0.0.1', 9999)
server = ThreadingTCPServer(addr, EchoHandler)
server_thread = threading.Thread(target=server.serve_forever, name='EchoServer', daemon=True)
server_thread.start()
try:
while True:
cmd = input('>>>')
if cmd.strip() == 'quit':
break
print(threading.enumerate())
except Exception as e:
print(e)
except KeyboardInterrupt:
pass
finally:
print('Exit')
sys.exit(0)
练习一一改写ChatServer
使用ThreadingTCPServer改写ChatServer
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatHandler(BaseRequestHandler):
clients = {}
def setup(self):
super().setup()
self.event = threading.Event() # 初始工作
self.clients[self.client_address] = self.request
def finish(self):
super().finish() # 清理工作
self.clients.pop(self.client_address) # 能执行到吗?
self.event.set()
def handle(self):
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
if data == 'quit':
break
msg = "{} {}".format(self.client_address, data).encode()
logging.info(msg)
for c in self.clients.values():
self.request.send(msg)
print('End')
addr = ('127.0.0.1', 9999)
server = ThreadingTCPServer(addr, ChatHandler)
server_thread = threading.Thread(target=server.serve_forever, name='ChatServer', daemon=True)
server_thread.start()
try:
while True:
cmd = input('>>>')
if cmd.strip() == 'quit':
break
print(threading.enumerate())
except Exception as e:
print(e)
except KeyboardInterrupt:
pass
finally:
print('Exit')
sys.exit(0)
问题
上例 self.clients.pop(self.client_address) 能执行到吗?
如果连接的线程中handle方法中抛出异常,例如客户端主动断开导致的异常,线程崩溃,self.clients的pop方法还能执行吗?
当然能执行,基类源码保证了即使异常,也能执行finish方法。但不代表不应该不捕获客户端各种异常。
解决客户端主动连接断开问题
如果客户端主动断开,总是抛出一个异常。看看到底发生了什么,在handle方法中增加一些语句。
import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
class ChatHandler(BaseRequestHandler):
clients = {}
def setup(self):
super().setup()
self.event = threading.Event() # 初始工作
self.clients[self.client_address] = self.request
def finish(self):
super().finish() # 清理工作
self.clients.pop(self.client_address) # 能执行到吗?
self.event.set()
def handle(self):
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
print(data, '~' * 30) # 增加
if data == 'quit':
break
msg = "{} {}".format(self.client_address, data).encode()
logging.info(msg)
for c in self.clients.values:
print('+++++++++++++') # 增加
self.request.send(msg)
print('End')
通过打印可以看到,客户端主动断开,会导致recv方法立即返回一个空bytes,并没有同时抛出异常。当循环回到recv这一句的时候就会抛出异常。所以,可以通过判断data数据是否为空来客户端是否断开。
import threading
import logging
import threading
from socketserver import ThreadingTCPServer, BaseRequestHandler
import sys
class ChatHandler(BaseRequestHandler):
clients = {}
def setup(self):
super().setup()
self.event = threading.Event() # 初始工作
self.clients[self.client_address] = self.request
def finish(self):
super().finish() # 清理工作
self.Clients.pop(self.client_address) # 能执行到吗?
self.event.set()
def handle(self):
super().handle()
while not self.event.is_set():
data = self.request.recv(1024).decode()
print(data, '~' * 30)
if not data or data == 'quit':
print('Broken pipe')
break
msg = "{} {}".format(self.client_address, data).encode()
logging.info(msg)
for c in self.clients.values:
self.request.send(msg)
print('End')
总结
为每一个连接提供RequestHandlerClass类实例,依次调用setup、handle、finish方法,且使用了try...finally结构保证finish方法一定能被调用。这些方法依次执行完成,如果想维持这个连接和客户端通信,就需要在handle函数中使用循环。socketserver模块提供的不同的类,但是编程接口是一样的,即使是多进程、多线程的类也是一样,大大减少了编程的难度。
异步编程
重要概念
同步、异步
函数或方法被调用的时候,调用者是否得到最终结果的。
直接得到最终结果结果的,就是同步调用;
不直接得到最终结果的,就是异步调用。
同步就是我让你打饭,你不打好给我不走开,直到你打饭给了我。
异步就是我让你打饭,你打着,我不等你,但是我会盯着你,你打完,我会过来拿走的。异步并不保证多长时间最终打完饭。
阻塞、非阻塞
函数或方法调用的时候,是否立刻返回。
立即返回就是非阻塞调用;
不立即返回就是阻塞调用。
区别
同步、异步,与阻塞、非阻塞不相关。
同步、异步强调的是,是否得到(最终的)结果;
阻塞、非阻塞强调是时间,是否等待。
同步与异步区别在于:调用者是否得到了想要的最终结果。
同步就是一直要执行到返回最终结果;
异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,还要通过被调用者,使用其它方式通知调用者,来取回最终结果。
阻塞与非阻塞的区别在于,调用者是否还能干其他事。
阻塞,调用者就只能干等;
非阻塞,调用者可以先去忙会别的,不用一直等。
联系
同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。
同步非阻塞,我等着你打饭给我,但我可以玩会手机、看看电视。打饭是结果,但是我不一直等
异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,叫号
异步非阻塞,我要打饭,你说等叫号,并没有返回饭给我,我在旁边看电视、玩手机,饭打好了叫我。
同步IO、异步IO、IO 多路复用
IO 两个阶段
IO过程分两阶段:
1.数据准备阶段
2.内核空间复制回用户进程缓冲区阶段
发生I0的时候: 人的高薪职业学
1、内核从输入设备读、写数据(淘米,把米放饭锅里煮饭)
2、进程从内核复制数据(盛饭,从内核这个饭锅里面把饭装到碗里来)
系统调用一—read函数
IO模型
同步IO
同步IO 模型包括 阻塞IO 、非阻塞IO 、IO 多路复用
阻塞IO
进程等待(阻塞),直到读写完成。(全程等待)
read/write函数
非阻塞1O
进程调用read操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。
第一阶段数据没有准备好,就先忙别的,等会再来看看。检 数据是否准备好了的过程是非阻塞的。
第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。
淘米、蒸饭我不等,我去玩会,盛饭过程我等着你装好饭,但是要等到盛好饭才算完事,这是同步的,结果就是盛好饭。
read/write
IO多路复用
所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。
select几乎所有操作系统平台都支持,poll是对的select的升级。
epoll, Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、Mac平台有
kqueue , Windows有iocp.
以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核"“监视"select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。
select举例,食堂供应很多菜(众多的I0),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好等待。其中一样菜好了,大师傅叫你过来说你点的菜有好的了,你得自己找找看哪一样才好了,请服务员把做好的菜打给你。
epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。
一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方式,当管理的IO多了,每次都要遍历全部fd,效率低下。
epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。
异步IO
进程发起异步IO请求,立即返回。内核完成1O的两个阶段,内核给进程发一个信号。
举例,来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都是异步的。
在整个过程中,进程都可以忙别的,等好了才过来。
举例,今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家门口(第二阶段)。
Linux的aio的系统调用,内核从版本2.6开始支持
Python 中IO多路复用
- IO多路复用
1、 大多数操作系统都支持select和poll
2、 Linux 2.5+ 支持epoll
3、 BSD、Mac支持kqueue
4、 Windows JOCP
Python的select库
实现了select、poll系统调用,这个基本上操作系统都支持。部分实现了epoll
底层的I0多路复用模块。
开发中的选择
1、完全跨平台,使用select、poll。但是性能较差
2、针对不同操作系统自行选择支持的技术,这样做会提高10处理的性能
selectors库
3.4 版本提供这个库,高级 IO 复用库。
类层次结构:
BaseSelector
+-- SelectSelector 实现select
+-- PollSelector 实现po11
+-- EpollSelector 实现epol1
+-- DevpollSelector 实现devpo11
+-- KqueueSelector 实现kqueue
selectors.DefaultSelector返回当前平台最有效、性能最高的实现。
但是,由于没有实现Windows下的IOCP,所以,只能退化为select。
# 在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
# epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
abstractmethod register(fileobj, events, data=None)
为selector注册一个文件对象,监视它的IO事件。
fileobj被监视文件对象,例如socket对象
events 事件,该文件对象必须等待的事件
data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联方法。通过这个
参数在关注的事件产生后让selector干什么事。
# 使用举例
import selectors
import threading
import socket
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 回调函数,自己定义形参
def accept(sock: socket.socket, mask):
"""mask:事件掩码的或值"""
conn, raddr = sock.accept()
conn.setblocking(False) # 不阻塞
# 监视conn这个文件对象
key = selector.register(conn, selectors.EVENT_READ, read)
logging.info(key)
# 回调函数
def read(conn: socket.socket, mask):
data = conn.recv(1024)
msg = "Your msg is {}.", format(data.decode())
conn.send(msg.encode())
# 构造缺省性能最优selector
selector = selectors.DefaultSelector()
# 创建Tcp Server
sock = socket.socket()
sock.bind(('127.0.0.1', 9999))
sock.listen()
logging.info(sock)
sock.setblocking(False) # 非阻塞
# 注册文件对象sock关注读事件,返回SelectorKey
# 将sock、关注事件、data都绑定到key实例属性上
key = selector.register(sock, selectors.EVENT_READ, accept)
logging.info(key)
e = threading.Event()
def select(e):
while not e.is_set():
# 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组
events = selector.select()
print('-' * 30)
for key, mask in events:
logging.info(key)
logging.info(mask)
callback = key.data # 回调函数
callback(key.fileobj, mask)
threading.Thread(target=select, args=(e,), name='select').start()
def main():
while not e.is_set():
cmd = input('>>')
if cmd.strip() == 'quit':
e.set()
fobjs = []
logging.info('{}'.format(list(selector.get_map().items())))
for fd, key in selector.get_map().items(): # 返回注册的项
print(fd, key)
print(key.fileobj)
fobjs.append(key.fileobj)
for fobj in fobjs:
selector.unregister(fobj)
fobj.close() # 关闭socket
selector.close()
if __name__ == '__main__':
main()
练习
将ChatServer改写成IO多路复用的方式
不需要启动多线程来执行socket的accept、recv方法了
import socket
import threading
import datetime
import logging
import selectors
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999): # 启动服务
self.sock = socket.socket()
self.addr = (ip, port)
self.event = threading.Event()
self.selector = selectors.DefaultSelector() # 创建selector
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
self.sock.setblocking(False) # 不阻塞
# 注册
self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
threading.Thread(target=self.select, name='selector', daemon=True).start()
def select(self): # 阻基
while not self.event.is_set():
# 开始监视,等到有文件对象监控事件产生,返回(key,mask)元组
events = self.selector.select()
print('-' * 30)
for key, mask in events:
logging.info(key)
logging.info(mask)
callback = key.data # 回调函数
callback(key.fileobj)
def accept(self, sock: socket.socket): # 多人连接
conn, addr = sock.accept() # BaZE
conn.setblocking(False) # 非阻塞
# 注册,监视每一个连接的socket对象
self.selector.register(conn, selectors.EVENT_READ, self.recv)
def recv(self, sock: socket.socket): # 接收客户端数据
data = sock.recv(1024) # 阻塞到数据到来
if data == b'': # 客户端主动断开,注销并关闭socket
self.selector.unregister(sock)
sock.close()
return
msg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *sock.getpeername(), data.decode())
logging.info(msg)
msg = msg.encode()
# 群发
for key in self.selector.get_map().values():
if key.data == self.recv: # #ißself.accept
key.fileobj.send(msg)
def stop(self): # 停止服务
self.event.set()
fobjs = []
for fd, key in self.selector.get_map().items():
fobjs.append(key.fileobj)
for fobj in fobjs:
self.selector.unregister(fobj)
fobj.close()
self.selector.close()
def main():
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate())
if __name__ == '__main__':
main()
基本完成功能,但是退出机制、异常处理没有加,这个和以前的处理方式一样,请自行完成。
进阶
send是写操作,也可以让selector监听,如何监听?
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)
注册语句,要监听selectors.EVENT_READ | selectors.EVENT_WRITE 读与写事件。
回调的时候,需要mask来判断究竟是读触发还是写触发了。所以,需要修改方法声明,增加mask。
def recv(self, sock, mask)但是由于recv 方法处理读和写事件,所以叫recv不太合适,改名为
def handle(self, sock, mask)
注意读和写是分离的,那么handle函数应该写成下面这样
def handle(self, sock:socket.socket,mask) # 接收客户端数据
if mask & selectors. EVENT_READ:
pass
# 注意,这里是某一个socket的写操作
if mask & selectors.EVENT_WRITE:# 写缓冲区准备好了,可以写入数据了
pass
handle方法里面处理读、写,mask有可能是0b01、0b10、0b11。
问题是,假没读取到了客户端发来的数据后,如何写出去?
为每一个与客户端连接的socket对象增加对应的队列。
与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到信息后,会遍历发给所有客户端的队
列。这里完成一对多,即一份数据放到了所有队列中。
与每一个客户端连接的socket对象,发现自己队列有数据,就发送给客户端。
import socket
import threading
import datetime
import logging
import selectors
from queue import Queue
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message) s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class ChatServer:
def __init__(self, ip='127.0.0.1', port=9999):
self.sock = socket.socket()
self.addr = (ip, port)
self.clients = {}
self.event = threading.Event()
self.selector = selectors.DefaultSelector() # 创建 selector
def start(self): # 启动监听
self.sock.bind(self.addr) # 绑定
self.sock.listen() # 监听
self.sock.setblocking(False) # 不阻塞
# 注册
self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
threading.Thread(target=self.select, name='selector', daemon=True).start()
def select(self): # 阻塞
while not self.event.is_set():
# 开始监视,等到某文件对象被监控的事件产生,返回(key,mask)元组
events = self.selector.select() # 阻塞,直到events
for key, mask in events:
if callable(key.data):
callback = key.data # key对象的data属性,回调
callback(key.fileobj, mask)
else:
callback = key.data[0]
callback(key, mask)
def accept(self, sock: socket.socket, mask): # 接收客户端连接
conn, raddr = sock.accept()
conn.setblocking(False) # 非阻塞
self.clients[raddr] = (self.handle, Queue())
# 注册,监视每一个与客户端的连接的socket对象
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.clients[raddr])
def handle(self, key: selectors.SelectorKey, mask): # 接收客户端数据
if mask & selectors.EVENT_READ:
sock = key.fileobj
raddr = sock.getpeername()
data = sock.recv(1024)
if not data or data == b'quit':
self.selector.unregister(sock)
sock.close()
self.clients.pop(raddr)
return
msg = "{:%Y/%m/%d %H:%M:%S} {}: {}\n{}\n".format(datetime.datetime.now(), *raddr, data.decode())
logging.info(msg)
msg = msg.encode()
for k in self.selector.get_map().values():
logging.info(k)
if isinstance(k.data, tuple):
k.data[1].put(data)
if mask & selectors.EVENT_WRITE:
# 因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样
if not key.data[1].empty():
key.fileobj.send(key.data[1].get())
def stop(self): # 停止服务
self.event.set()
fobjs = []
for fd, key in self.selector.get_map().items():
fobjs.append(key.fileobj)
for fobj in fobjs:
self.selector.unregister(fobj)
fobj.close()
self.selector.close()
def main():
cs = ChatServer()
cs.start()
while True:
cmd = input('>>').strip()
if cmd == 'quit':
cs.stop()
threading.Event().wait(3)
break
logging.info(threading.enumerate())
logging.info('-' * 30)
logging.info("{} {}".format(len(cs.clients), cs.clients))
logging.info(list(map(lambda x: (x.fileobj.fileno(), x.data), cs.selector.get_map().values())))
logging.info('-' * 30)
if __name__ == '__main__':
main()
这个程序最大的问题,在select0一直判断可写,几乎一直循环不停。所以对于写不频繁的情况下,就不要监听EVENT_WRITE。
对于Server来说,一般来说,更多的是等待对方发来数据后响应时才发出数据,而不是积极的等着发送数据。所以监听EVENT_READ,收到数据之后再发送就可以了。
本例只完成基本功能,其他功能如有需要,请自行完成。
asyncio
3.4版本加入标准库。
asyncio底层基于selectors实现,看似库,其实就是个框架,包含异步IO、事件循环、协程、任务等内容。
问题的引出
def a():
for x in range(3):
print(x)
def b():
for x in "abc":
print(x)
a()
b()
输出:
0
1
2
a
b
c
这是一个串行的程序,单线程中根本没有做任何能否并行?
import threading
import time
def a():
for x in range(3):
time.sleep(0.001) #
print(x)
def b():
for x in "abc":
time.sleep(0.001)
print(x)
threading.Thread(target=a, name='a').start()
threading.Thread(target=b, name='b').start()
# 运行结果
0
a
1
b
2
c
import multiprocessing
import time
def a():
for x in range(3):
time.sleep(0.001)
print(x)
def b():
for x in "abc":
time.sleep(0.001)
print(x)
if __name__ == "__main__":
multiprocessing.Process(target=a, name='a').start()
multiprocessing.Process(target=b, name='b').start()
输出:
a
0
b
1
c
2
生成器版本
def a():
for x in range(3):
print(x)
yield
def b():
for x in "abc":
print(x)
yield
x = a()
y = b()
for i in range(3):
next(x)
next(y)
输出:
0
a
1
b
2
c
上例在一个线程内通过生成器完成了调度,让两个函数都有机会执行,这样的调度不是操作系统的进程、线程完成的,而是用户自己设计的。
这个程序编写:
需要使用yield来让出控制权
需要循环帮助交替执行
事件循环
事件循环是asyncio提供的核心运行机制。
协程
- 协程不是进程、也不是线程,它是用户空间调度的完成并发处理的方式。
- 进程、线程由操作系统完成调度,而协程是线程内完成调度。它不需要更多的线程,自然也没有多线程切换
带来的开销。 - 协程是非抢占式调度,只有一个协程主动让出控制权,另一个协程才会被调度。
- 协程也不需要使用锁机制,因为是在同一个线程中执行。
- 多CPU下,可以使用多进程和协程配合,既能进程并发又能发挥协程在单线程中的优势。
- Python中协程是基于生成器的。
协程的使用
3.4引|入asyncio,使用装饰器
import asyncio
@asyncio.coroutine
def sleep(x): # 协程函数
for i in range(3):
print('sleep {}'.format(i))
yield from asyncio.sleep(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()
将生成器函数转换成协程函数,就可以在事件循环中执行了。
3.5版本开始,Python提供关键字async、await,在语言上原生支持协程。
import asyncio
async def sleep(x):
for i in range(3):
print('sleep (}'.format(i))
await asyncio.sleep(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()
async def用来定义协程函数,iscoroutinefunction() 返回True。协程函数中可以不包含await、 async关键字,但不能使用yield关键字。
如同生成器函数调用返回生成器对象一样,协程函数调用也会返回一个对象称为协程对象,iscoroutine()返回True.
再来做个例子
import asyncio
import threading
async def sleep(x):
for i in range(3):
print('sleep {}'.format(i))
await asyncio.sleep(x)
async def showthread(x):
for i in range(3):
print(threading.enumerate())
await asyncio.sleep(2)
loop = asyncio.get_event_loop()
tasks = [sleep(3), showthread(3)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
# 协程版本
import asyncio
import threading
@asyncio.coroutine
def a():
for x in range(3):
print('a.x', x)
yield
@asyncio.coroutine
def b():
for x in 'abc':
print('b.x', x)
yield
print(asyncio.iscoroutinefunction(a))
print(asyncio.iscoroutinefunction(b))
# 大循环
loop = asyncio.get_event_loop()
tasks = [a(), b()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
TCP Echo Server 举例
import asyncio
# TCP Echo Server 举例
async def handle(reader, writer):
while True:
data = await reader.read(1024)
print(dir(reader))
print(dir(writer))
client = writer.get_extra_info('peername')
message = "{} Your msg (}".format(client, data.decode()).encode()
writer.write(message)
await writer.drain()
loop = asyncio.get_event_loop()
ip = '127.0.0.1'
port = 9999
crt = asyncio.start_server(handle, ip, port, loop=loop)
server = loop.run_until_complete(crt)
print(server) # server是监听的socket对象
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
server.close()
loop.close()
aiohttp库
安装
$ pip install aiohttp
开发
HTTP Server
from aiohttp import web
async def indexhandle(request: web.Request):
return web.Response(text=request.path, status=201)
async def handle(request: web.Request):
print(request.match_info)
print(request.query_string) # http://127.0.0.1:8080/1?name=12301
return web.Response(text=request.match_info.get('id', '0000'), status=200)
app = web.Application()
app.router.add_get("/", indexhandle) # http://127.0.0.1:8080/
app.router.add_get("/{id}", handle) # http://127.0.0.1:8080/12301
web.run_app(app, host='0.0.0.0', port=9977)
HTTP Client
import asyncio
from aiohttp import ClientSession
async def get_html(url: str):
async with ClientSession() as session:
async with session.get(url) as res:
print(res.status)
print(await res.text())
url = 'http://127.0.0.1/ziroom-web/'
loop = asyncio.get_event_loop()
loop.run_until_complete(get_html(url))
loop.close()