目录
thread.join() 是一个线程同步方法,用于主线程等待子线程完成。当你调用 thread.join() 时,主线程会阻塞,直到调用 join() 的子线程完成其执行。
threading.Lock() 是一个互斥锁(Mutex Lock),用于确保同一时间只有一个线程可以执行特定的代码块。它主要用于保护共享资源,避免多线程环境下的竞态条件和数据不一致问题
开发一个多任务版的TCP服务端程序能够服务于多个客户端。
如何实现:
- TCP服务端程序,循环等待接受客户端的连接请求
- 当客户端和服务端建立连接成功,创建子线程专门处理客户端的请求,防止主线程阻塞
- 把创建的子线程设置成为守护主线程,防止主线程无法退出,避免主线程结束时,子线程还在运行
客户端1.0版本:
import socket
import threading
def link1():
tcp_c1 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
tcp_c1.connect(('127.0.0.1',8888))
tcp_c1.send('客户端1连接成功'.encode('utf-8'))
recv_data = tcp_c1.recv(1024)
print("接收到数据:",recv_data.decode('utf-8'))
tcp_c1.close()
def link2():
tcp_c2 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
tcp_c2.connect(('127.0.0.1',8888))
tcp_c2.send('客户端2连接成功'.encode('utf-8'))
recv_data = tcp_c2.recv(1024)
print("接收到数据:",recv_data.decode('utf-8'))
tcp_c2.close()
def link3():
tcp_c3 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
tcp_c3.connect(('127.0.0.1',8888))
tcp_c3.send('客户端3连接成功'.encode('utf-8'))
recv_data = tcp_c3.recv(1024)
print("接收到数据:",recv_data.decode('utf-8'))
tcp_c3.close()
if __name__ == '__main__':
c1 = threading.Thread(target=link1)
c2 = threading.Thread(target=link2)
c3 = threading.Thread(target=link3)
c1.start()
c2.start()
c3.start()
服务端:
import socket
import threading
def handle(service_client_socket,ip_port):
while True:
recv_data = service_client_socket.recv(1024)
if recv_data:
print(recv_data.decode('utf-8'),ip_port)
service_client_socket.send("收到客户端信息".encode('utf-8'))
else:
print("客户端下线",ip_port)
break
service_client_socket.close()
if __name__ == '__main__':
tcp_server_socket = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 端口复用让程序退出端口号立即释放
tcp_server_socket.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,True)
tcp_server_socket.bind(('',8888))
tcp_server_socket.listen(128)
# tcp服务端循环等待客户端连接请求
while True:
# 给客户端创建子线程处理相关内容
service_client_socket,ip_port = tcp_server_socket.accept()
print('\n客户端正在连接',ip_port)
new_client = threading.Thread(target=handle,args=(service_client_socket,ip_port))
# 将子线程设置为守护线程
new_client.daemon = True
new_client.start()
setsockopt
:Python socket
模块中的一个方法,用于设置套接字的选项
level
:指定选项所在的协议级别。常见的值包括:socket.SOL_SOCKET
:套接字级别的选项。socket.IPPROTO_TCP
:TCP 协议级别的选项。socket.IPPROTO_IP
:IP 协议级别的选项。
option
:指定要设置的选项名称。常见的选项包括:socket.SO_REUSEADDR
:允许套接字绑定到一个处于TIME_WAIT
状态的地址和端口socket.SO_KEEPALIVE
:启用 TCP 保活机制,保活机制会定期发送探测包,以检测连接是否仍然有效。如果对方主机在一定时间内没有响应,连接将被关闭socket.TCP_NODELAY
:禁用 Nagle 算法,立即发送数据,Nagle 算法会将多个小数据包合并成一个大数据包发送,以提高网络效率socket.SO_BROADCAST:
允许套接字发送广播消息。这在实现广播功能时非常有用,例如在局域网内广播服务器地址
value
:指定选项的值。通常是一个整数,表示选项的状态(如1
表示启用,0
表示禁用)。
运行结果:
不难发现我们手动创建客户端会出现大量重复代码,可扩展可维护性较低,于是我们可以优化客户端生成代码,动态生成客户端
客户端2.0版本:
def client_thread(client_id):
tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_client.connect(('127.0.0.1', 8888))
tcp_client.send(f'客户端{client_id}正在连接'.encode('utf-8'))
recv_data = tcp_client.recv(1024)
print(f"客户端{client_id}接收到数据:", recv_data.decode('utf-8'))
tcp_client.close()
if __name__ == '__main__':
threads = []
for i in range(1, 4): # 创建3个客户端
thread = threading.Thread(target=client_thread, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
thread.join()
是一个线程同步方法,用于主线程等待子线程完成。当你调用 thread.join()
时,主线程会阻塞,直到调用 join()
的子线程完成其执行。
同步机制
观察上图,我们可以看见客户端和服务端的连接和断开行为并不是完全同步的,这主要是由于多线程和网络通信的特性导致的。
假设客户端1、客户端2和客户端3按顺序启动,但实际执行顺序可能是这样的:
客户端1启动并发送消息,服务端接收并处理。
客户端2启动并发送消息,服务端接收并处理。
客户端1关闭连接,服务端检测到客户端1关闭并打印“客户端1下线”。
客户端3启动并发送消息,服务端接收并处理。
客户端2关闭连接,服务端检测到客户端2关闭并打印“客户端2下线”。
客户端3关闭连接,服务端检测到客户端3关闭并打印“客户端3下线”。
如何实现同步机制:
①在客户端和服务端之间引入同步机制,例如使用线程锁(threading.Lock
)或信号量(threading.Semaphore
)来控制线程的执行顺序
②在客户端和服务端的线程中添加同步点,确保每个客户端的连接和断开操作都完成后再进行下一个客户端的操作
③使用队列(queue.Queue
)来管理客户端的连接和断开操作,确保按顺序处理每个客户端的请求
服务端:
import socket
import threading
import time
def handle(service_client_socket, ip_port):
with lock:
while True:
recv_data = service_client_socket.recv(1024)
if recv_data:
print(recv_data.decode('utf-8'), ip_port)
service_client_socket.send("收到客户端信息".encode('utf-8'))
else:
print("客户端下线", ip_port)
break
service_client_socket.close()
time.sleep(1) # 确保按顺序处理
if __name__ == '__main__':
tcp_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
tcp_server_socket.bind(('', 8888))
tcp_server_socket.listen(128)
lock = threading.Lock()
while True:
service_client_socket, ip_port = tcp_server_socket.accept()
print('\n客户端正在连接', ip_port)
new_client = threading.Thread(target=handle, args=(service_client_socket, ip_port))
new_client.daemon = True
new_client.start()
客户端:
import socket
import threading
import time
lock = threading.Lock()
def client_thread(client_id):
with lock:
tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_client.connect(('127.0.0.1', 8888))
tcp_client.send(f'客户端{client_id}连接成功'.encode('utf-8'))
recv_data = tcp_client.recv(1024)
print(f"客户端{client_id}接收到数据:", recv_data.decode('utf-8'))
tcp_client.close()
time.sleep(1) # 确保按顺序关闭连接
if __name__ == '__main__':
threads = []
for i in range(1, 4):
thread = threading.Thread(target=client_thread, args=(i,))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
threading.Lock()
是一个互斥锁(Mutex Lock),用于确保同一时间只有一个线程可以执行特定的代码块。它主要用于保护共享资源,避免多线程环境下的竞态条件和数据不一致问题
with lock
的作用:with lock
是一个上下文管理器,用于自动管理锁的获取和释放。当进入
with
块时,自动调用lock.acquire()
,获取锁。当退出
with
块时,自动调用lock.release()
,释放锁。使用
with
语句可以简化锁的管理,避免因异常导致锁未释放的情况。
accept的作用:
accept
函数的作用是接受一个已经建立的连接。当服务器端套接字调用 listen
函数进入监听状态后,它会等待客户端的连接请求。当一个客户端请求连接时,服务器端套接字可以调用 accept
函数来接受这个连接。
accept
是否阻塞,这取决于它的使用方式:
阻塞模式:在默认情况下,
accept
函数是阻塞的。这意味着如果当前没有客户端连接请求,accept
函数会暂停执行,直到一个连接请求到达。这种模式适用于服务器端希望立即处理客户端连接的情况。非阻塞模式:如果服务器端套接字被设置为非阻塞模式,那么
accept
函数将不会等待连接请求。如果当前没有客户端连接请求,accept
函数会立即返回一个错误(通常是EWOULDBLOCK或EAGAIN)。这种模式适用于服务器端需要同时处理其他任务,不希望因为等待连接请求而被阻塞的情况。