python-并发编程
前言
进程
(Process):正在运行的程序,是系统进行资源分配的最小单位。每个进程都有自己独立的内存空间和系统资源线程
(Thread):运行在进程之上,系统进行调度的最小单位。一个进程可以包含多个线程,它们共享进程的内存空间和资源协程
(Coroutine):协程是一种用户态的轻量级线程,又称微线程。协程的调度完全由用户控制,不需要上下文切换的开销,因此执行效率极高并行
(Parallelism):指多个任务在同一时刻同时执行,需要多核 CPU 的支持并发
(Concurrency):指多个任务在同一时间段内交替执行,通过时间片轮转或协作式调度实现
进程和线程
- 一个进程可以有一个以上的线程,进程之间都是独立的,一个进程内的线程共享这个进程空间
- 同一个进程内的线程是可以直接通信的,进程要想通信,必须通过内核空间实现
- 创建新的线程很简单,创建新的进程需要对父进程进行克隆,所有的进程都是由另外一个进程创建的
- 一个线程可以控制和操作同一个进程内的其他线程,而进程只能操作子进程
- 一个主线程的改变可能会影响其他进程,而父进程不会影响子进程
多进程 安全性高 开销大 占用空间大 上下文切换开销大 分布式支持
多线程 安全性低 开销小 占用空间小 上下文切换开销小 不支持
一、多线程(threading)
1.1 多线程的使用
以从网页中下载图片为例
下载一张图片
def download_image(url, path):
response = requests.get(url)
with open(path, 'wb') as f:
f.write(response.content)
url = 'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg'
path = '1.jpg'
download_image(url, path)
下载5张图片
import requests
import time
def runtime(func):
def runtime_inner(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"执行函数{func.__name__}花费了{end - start}s")
return result
return runtime_inner
def download_image(url, path):
response = requests.get(url)
with open(path, 'wb') as f:
f.write(response.content)
@runtime
def main():
for i in range(5):
download_image(url, str(i) + ".jpg")
url = 'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg'
main()
# 执行函数main花费了1.8841361999511719s
使用多线程下载5张图片
import threading
@runtime
def main():
print("start...")
t_list = []
for i in range(5):
# 创建线程对象
# target --> 指定传入一个callable对象 做什么
# args --> 指定方法需要传入的参数 元组类型 (1,)
t = threading.Thread(target=download_image, args=(url, str(i) + ".jpg"))
# 启动线程 start --> run <--target
# 默认情况为前台线程 主线程要等待子线程结束才退出
# 设置后台线程 主线程执行结束,子线程也要退出
t.daemon = True
# t.setDaemon(True) # 设置为后台线程,在start之前设置
t.start()
t_list.append(t)
for t in t_list:
t.join() # 阻塞当前环境上下文,直到t的线程执行完成
# 去掉t.jon() 执行函数main花费了0.0039768218994140625s
print("start...")
main()
print("end...")
# start...
# start...
# 执行函数main花费了0.6438114643096924s
# end...
1.2 自定义线程类
class MyThread(threading.Thread):
def __init__(self, num):
super().__init__()
self.num = num
def run(self):
print(f"running...{self.num}")
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()
使用自定义线程类,创建多线程下载图片
class MyThread(threading.Thread):
def __init__(self, url, path):
super().__init__()
self.url = url
self.path = path
def run(self):
response = requests.get(self.url)
with open(self.path, 'wb') as f:
f.write(response.content)
url = 'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg'
for i in range(5):
t = MyThread(url, str(i) + ".jpg")
t.start()
1.3 线程锁
为什么需要线程锁?
公共资源进行访问修改,存在资源争抢,造成脏数据
解决公共资源竞争
限制同一时刻只有一个线程可以访问公共资源
1.3.1 互斥锁
import threading
import time
from threading import Lock, RLock
num = 0
def sum_num(i):
# lock.acquire() # 获取锁
with lock:
global num
time.sleep(1)
num += i
print(num)
# lock.release() # 释放锁
# 创建锁对象
lock = Lock()
# lock = RLock()
t_list = []
for i in range(10):
t = threading.Thread(target=sum_num, args=(1,))
t.start()
t_list.append(t)
[t.join() for t in t_list]
print("end...")
# Lock 原始锁 获取锁之前不做判断,直到获取到锁为止
# RLock 重入锁 获取锁之前先判断,如果有这把锁了就 立即返回
r1 = Lock()
r2 = RLock()
r1.acquire()
# print("lock1 acquired 1")
# r1.acquire() # 死锁
# print("lock1 acquired 2")
r2.acquire()
print("lock1 acquired 1")
r2.acquire()
print("lock1 acquired 2")
死锁
程序设计不到位
尽量避免产生死锁
- 尽量避免同一个线程对多lock进行锁定
- 多个线程需要对多个lock进行锁定,尽量保证他们以相同的顺序获取锁
- 设置超时
# 当两个线程以相反顺序调用 transfer时,会发生死锁
class Account:
def __init__(self, id, balance, lock):
self.id = id
self.balance = balance
self.lock = lock
# 取钱
def withdraw(self, amount):
self.balance -= amount
# 存钱
def deposit(self, amount):
self.balance += amount
# 查看余额
def get_balance(self):
return self.balance
def transfer(from_id, to_id, amount):
if from_id.lock.acquire():
from_id.withdraw(amount)
time.sleep(1)
print("wait...end")
if to_id.lock.acquire():
to_id.deposit(amount)
to_id.lock.release()
from_id.lock.release()
print(f"{from_id.id}向{to_id.id}转了{amount}元")
huang = Account("huang", 10000, RLock())
zhang = Account("zhang", 20000, RLock())
t1 = threading.Thread(target=transfer, args=(huang, zhang, 5000))
t2 = threading.Thread(target=transfer, args=(zhang, huang, 2000))
t1.start()
t2.start()
t1.join()
t2.join()
print(huang.get_balance())
print(zhang.get_balance())
1.3.2 信号量
信号量允许指定数量的线程同时执行
from threading import BoundedSemaphore
num = 0
def sum_num(i):
# lock.acquire() # 获取锁
with lock:
global num
time.sleep(1)
num += i
print(num)
# lock.release() # 释放锁
# 信号量锁对象,最多允许2和线程同时执行
lock = BoundedSemaphore(2)
t_list = []
for i in range(10):
t = threading.Thread(target=sum_num, args=(1,))
t.start()
t_list.append(t)
[t.join() for t in t_list]
print("end...")
1.4 全局解释器锁(GIL)
GIL
全称Global Interpreter Lock
GIL 和 Python 语言没有任何关系,只是因为历史原因导致在官方推荐的解释器Cpython
中遗留的问题(Jpython无此类问题)
每个线程在执行的过程中都需要先获取GIL,保证同一时刻同一个进程内只有一个线程可以执行代码
GIL最基本的行为只有两个:
- 当前执行的线程持有GIL
- 当线程遇到io阻塞时,会释放GIL
计算密集型(cpu) 使用多进程
io密集型(频繁阻塞等待) 使用多线程
二、多进程(multiprocessing)
2.1 使用os库创建多进程
import os, time
# linux系统中
result = os.fork()
# 父进程运行时result为子进程的pid
# 子进程运行时这个result就是0
print("outerside pid is:", result)
if result == 0:
print("child process")
#time.sleep(60)
print("child pid is:", os.getpid())
print("child-parent pid is:", os.getppid())
else:
print("parent process")
#time.sleep(60)
print("parent pid is:", os.getpid())
僵尸进程
: 子进程退出,父进程没有调用wait或者waitpid取获取子进程的状态 --> 无time.sleep()
那么这个子进程的进程描述符就依然存在系统中,这种进程称之为僵尸进程
孤儿进程
: 父进程退出,子进程还在运行,那么这个子进程就会称为孤儿进程 --> 取消父进程的sleep(60)注释,保留子进程的sleep(60)
孤儿进程会被pid为1的进程所收养
2.2 多进程的使用
import multiprocessing
from multiprocessing import Process, current_process
import time
lst = []
def task(i):
print(current_process().name, i, 'start...')
time.sleep(2)
lst.append(i)
print(lst)
print(current_process().name, i, 'end...')
if __name__ == '__main__':
for i in range(10):
p = Process(target=task, args=(i,))
p.start()
# 进程之间资源隔离
# Process-4 3 start...
# Process-5 4 start...
# Process-2 1 start...
# Process-1 0 start...
# Process-6 5 start...
# Process-8 7 start...
# Process-10 9 start...
# Process-3 2 start...
# Process-9 8 start...
# Process-7 6 start...
# [3]
# Process-4 3 end...
# [4]
# Process-5 4 end...
# [1]
# Process-2 1 end...
# [0]
# Process-1 0 end...
# [5]
# Process-6 5 end...
# [7]
# Process-8 7 end...
# [2][9]
#
# Process-10 9 end...
# Process-3 2 end...
# [8]
# Process-9 8 end...
# [6]
# Process-7 6 end...
2.3 自定义进程类
import multiprocessing
class MyProcess(multiprocessing.Process):
def __init__(self, i):
super().__init__()
self.i = i
def run(self):
print(self.name, self.i, 'start...')
print(self.name, self.i, 'end...')
if __name__ == '__main__':
for i in range(10):
p = MyProcess(i)
p.start()
2.4 进程通信方式
2.4.1 管道(pipe)
传递二进制数据流,消息之间没有明确界限
半双工的通信方式,本质上就是内核空间中固定大小的缓冲区 (只能从一边到另一边)
匿名管道
适用有亲缘关系的进程
命名管道
无亲缘关系也可以进行访问
2.4.2 消息队列(message queues)
消息队列是保存在内核中的消息链表,有明确的界限,支持多种数据类型传入
发送方和接收方不需要同时存在,消息可持久化
2.4.3 信号量(semaphores)
信号量就是一个计数器,用于控制最多n个进程对共享资源访问
p
操作(申请资源)-> 将信号量的值减 1
v
操作(释放资源) -> 将信号量的值加 1
若信号量 ≥ 0,表示资源可用,进程继续执行
若信号量 < 0,表示资源已被耗尽,进程被阻塞并放入等待队列
2.4.4 共享内存(shared memory)
多个进程通过映射共享同一片物理内存区域,这是最快的进程通信(IPC)方式
直接读写速度最快
配合信号量或者互斥锁来使用
2.4.5 信号(signal)
信号是最古老的进程通信方式,是一种异步通知机制,用来通知进程,控制进程的一些行为
ctrl + c 停止信号
ctrl + z 终止信号
2.4.5 socket通信
套接字,通常用于不同主机之间的通信
支持全双工通信,数据按字节流传输
分布式系统,跨网络通信
2.5 进程池
有效的降低频繁创建销毁线程多带来的额外开销
from multiprocessing import Pool, current_process
import time
lst = []
def task(i):
print(current_process().name, i, 'start...')
time.sleep(1)
lst.append(i)
print(lst)
print(current_process().name, i, 'end...')
# 每个进程独立空间,互相隔离
if __name__ == "__main__":
# 创建进程池,建议进程数和cpu核数一致
# maxtaskperchild 指定每个子进程最多可以处理多少任务,防止过多的内存占用
p = Pool(processes=4, maxtasksperchild=3)
for i in range(20):
# 进程池接受任务
p.apply_async(func=task, args=(i,))
# 关闭进程池,不接受任务
p.close()
# 阻塞当前环境,直到p子进程执行完成。如果没有join,父进程退出,子进程也会退出
p.join()
print("end...")
三、协程(coroutine)
协程是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。又称为微线程,纤程
import asyncio
async def func1():
print(1)
await asyncio.sleep(1)
print(2)
async def func2():
print(3)
await asyncio.sleep(1)
print(4)
# 创建任务列表
tasks = [asyncio.ensure_future(func1()), asyncio.ensure_future(func2())]
# 生成事件循环 -- 监听
loop = asyncio.get_event_loop()
# 运行
loop.run_until_complete(asyncio.wait(tasks))
# 依次输出:1 3 2 4
总结
并发编程是提升 Python 程序效率的核心手段
选择方式需遵循 “任务类型优先” 原则:
- CPU 密集型→多进程
- I/O 密集型→协程(高并发)或多线程(简单场景)
同时,需注意同步机制(避免资源竞争)、GIL 限制(多线程的局限性)、进程 / 线程开销(控制数量)等问题,才能写出高效、可靠的并发程序