1.线程和线程池
t1 = threading.Thread(target = task, args=("a"))
t2 = threading.Thread(target = task, args=("b"))
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
executor.map(task, ["A,B,C,D"])
threads = [threading.Thread(target = increment()) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
count = 0
lock = threading.Lock()
def increment():
global count
for i in range(100):
with lock:
count +=1
2.进程和进程池
def send(conn):
conn.send("helloworld")
def receive(conn):
print(conn.recv())
val = multiprocessing.Value("i",1) // 共享变量
queue = multiprocessing.Queue()
process = multiprocessing.Process(target = worker,args= (queue,))
process2 = multiprocessing.Process(target = worker, args=(queue,))
process.start()
process2.start()
process.join()
process2.join()
with concurrent.futures.ProcessPoolExecutor(5) as executor:
executor.map(worker,["a","b","c","d"])
processes = [multiprocessing.Process(target = printVal, args =(val,))for _ in range(4)]
for process in processes:
process.start()
for process in processes:
process.join()
asyncio
async def cpu_bound_task(n):
total = 0
for i in range(10**8):
total += i*n
print(f"total is {total}")
await asyncio.sleep(10)
return total
def event_loop(n):
asyncio.run(cpu_bound_task(n))
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool,cpu_bound_task, 10)
print(f"result is {result}")
for i in range(4):
p = multiprocessing.Process(target= event_loop, args=(10,))
pList.append(p)
p.start()
for p in pList:
p.join()
asyncio.run(main())
print("主线程执行结束")
五、总结
方案 | 适用场景 | 主要特性 |
---|---|---|
多线程(threading) | I/O 密集型(爬虫、文件 I/O) | 共享内存,受 GIL 限制 |
线程池(ThreadPoolExecutor) | 并发 I/O 任务 | 自动管理线程 |
多进程(multiprocessing) | CPU 密集型任务 | 多核计算,不受 GIL 限制 |
进程池(ProcessPoolExecutor) | 大规模计算任务 | 自动管理进程 |
协程(asyncio) | I/O 高并发(API、爬虫) | 非阻塞执行 |
如何选择?
- CPU 密集型任务(科学计算、数据分析) → 多进程
- I/O 密集型任务(爬虫、数据库查询) → 多线程 或 asyncio
- 高并发任务(网络请求、日志处理) → asyncio