【Python爬虫(31)】解锁Python多线程编程:从入门到实战

发布于:2025-02-22 ⋅ 阅读:(18) ⋅ 点赞:(0)

【Python爬虫】专栏简介:本专栏是 Python 爬虫领域的集大成之作,共 100 章节。从 Python 基础语法、爬虫入门知识讲起,深入探讨反爬虫、多线程、分布式等进阶技术。以大量实例为支撑,覆盖网页、图片、音频等各类数据爬取,还涉及数据处理与分析。无论是新手小白还是进阶开发者,都能从中汲取知识,助力掌握爬虫核心技能,开拓技术视野。


一、线程的概念与 Python 中的线程模块(threading)

1.1 线程的概念

线程是进程内部的一个执行流,是系统调度的最小单位。打个比方,进程像是一个工厂,而线程则是工厂里的工人。一个进程可以包含多个线程,就如同一个工厂里有多个工人同时工作。与进程相比,线程具有以下显著特点:

  • 共享进程资源:同一进程内的线程共享进程的地址空间、文件描述符、全局变量等资源 。这就好比工厂里的工人共享厂房、设备等资源,大大降低了资源的开销。例如,在一个网络爬虫程序中,多个线程可以共享同一个网络连接池,减少了连接创建和销毁的开销。
  • 上下文切换开销小:由于线程基本不拥有系统资源,所以线程之间的上下文切换比进程之间的上下文切换要快得多。这就像工厂里的工人切换工作任务时,不需要重新准备厂房和设备,只需要切换手头的工作即可。在多线程爬虫中,当一个线程在等待网络响应时,CPU 可以快速切换到其他线程继续执行,提高了 CPU 的利用率。
  • 能并发执行:多个线程可以并发执行,充分利用 CPU 和其他系统资源。在多核 CPU 的环境下,不同的线程可以同时在不同的核心上运行,实现真正的并行处理。例如,在一个视频处理程序中,一个线程可以负责视频解码,另一个线程负责音频解码,还有一个线程负责视频和音频的合成,这样可以大大提高视频处理的效率。

在现代编程中,线程扮演着至关重要的角色。它可以显著提高程序的执行效率,特别是在处理 I/O 密集型任务时,如网络请求、文件读写等。例如,在一个网络爬虫程序中,使用多线程可以同时发起多个网络请求,加快数据的抓取速度;在一个图形界面应用程序中,使用多线程可以将耗时的任务放在后台执行,避免界面卡顿,提高用户体验。

1.2 Python 中的 threading 模块

在 Python 中,threading模块是进行多线程编程的核心模块。它提供了丰富的类和方法,用于创建、管理和同步线程。以下是threading模块中一些重要的类和方法:

  • Thread类:用于创建线程对象。通过实例化Thread类,可以创建一个新的线程。例如:
import threading

def task():
    print("这是一个线程任务")

# 创建线程对象
thread = threading.Thread(target=task)
# 启动线程
thread.start()

在上述代码中,threading.Thread(target=task)创建了一个线程对象,target参数指定了线程要执行的任务函数task。然后调用start()方法启动线程,此时线程就会开始执行task函数中的代码。

  • Lock类:用于线程同步的锁。当多个线程需要访问共享资源时,为了避免数据竞争和不一致性问题,可以使用锁来确保同一时间只有一个线程能够访问共享资源。例如:
import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(10000):
        with lock:
            counter += 1

# 创建多个线程
threads = []
for _ in range(10):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

print(f"最终的计数器值为: {counter}")

在这段代码中,lock = threading.Lock()创建了一个锁对象。在increment函数中,使用with lock:语句来获取锁,确保在对counter进行加一操作时,其他线程无法访问counter,从而避免了数据竞争问题。

  • Semaphore类:信号量,用于控制同时访问某一资源的线程数量。例如,假设有一个数据库连接池,最多允许同时有 5 个线程使用连接,就可以使用信号量来实现:
import threading
import time

# 创建一个信号量,最多允许5个线程同时访问
semaphore = threading.Semaphore(5)

def access_database():
    with semaphore:
        print(f"{threading.current_thread().name} 正在访问数据库")
        time.sleep(2)
        print(f"{threading.current_thread().name} 完成对数据库的访问")

# 创建多个线程
threads = []
for i in range(10):
    thread = threading.Thread(target=access_database, name=f"Thread-{i}")
    threads.append(thread)
    thread.start()

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

在上述代码中,semaphore = threading.Semaphore(5)创建了一个信号量,最多允许 5 个线程同时访问。每个线程在访问数据库前,先通过with semaphore:获取信号量,如果当前有 5 个线程正在访问,其他线程就会被阻塞,直到有线程释放信号量。

  • Condition类:条件变量,用于线程之间的复杂同步和通信。它允许线程在满足特定条件时等待,当条件满足时被唤醒。例如,在生产者 - 消费者模型中:
import threading

class ProducerConsumer:
    def __init__(self):
        self.condition = threading.Condition()
        self.items = []

    def produce(self):
        with self.condition:
            for i in range(5):
                self.items.append(i)
                print(f"生产了 item {i}")
                self.condition.notify()  # 通知消费者
                self.condition.wait()  # 等待消费者处理

    def consume(self):
        with self.condition:
            while True:
                self.condition.wait()  # 等待生产者生产
                if self.items:
                    item = self.items.pop(0)
                    print(f"消费了 item {item}")
                    self.condition.notify()  # 通知生产者
                else:
                    break

# 创建生产者和消费者线程
producer_consumer = ProducerConsumer()
producer_thread = threading.Thread(target=producer_consumer.produce)
consumer_thread = threading.Thread(target=producer_consumer.consume)

# 启动线程
producer_thread.start()
consumer_thread.start()

# 等待线程执行完毕
producer_thread.join()
consumer_thread.join()

在这个例子中,生产者线程在生产完一个物品后,通过condition.notify()通知消费者线程,然后通过condition.wait()等待消费者处理;消费者线程在消费完一个物品后,也通过condition.notify()通知生产者线程,然后通过condition.wait()等待生产者生产。这样就实现了生产者和消费者之间的同步和通信。

  • Event类:用于线程间的简单通知机制。一个线程可以设置事件,其他线程可以等待这个事件被设置。例如:
import threading
import time

event = threading.Event()

def worker():
    print("线程等待事件...")
    event.wait()
    print("线程接收到事件,开始执行任务")

# 创建线程
thread = threading.Thread(target=worker)
thread.start()

# 主线程等待一段时间后设置事件
time.sleep(3)
event.set()

在上述代码中,event = threading.Event()创建了一个事件对象。worker线程通过event.wait()等待事件被设置,主线程在等待 3 秒后,通过event.set()设置事件,此时worker线程就会接收到事件并继续执行后续的代码。

threading模块为 Python 多线程编程提供了强大的支持,通过合理使用这些类和方法,可以有效地实现多线程编程,提高程序的性能和效率。

二、创建与启动线程、线程的生命周期管理

2.1 创建与启动线程

2.1.1 使用 Thread 类的实例创建线程

在 Python 中,最常见的创建线程的方式之一是通过实例化threading.Thread类。以下是一个简单的示例代码,展示如何使用这种方式创建线程:

import threading

def print_numbers():
    for i in range(1, 6):
        print(f"线程 {threading.current_thread().name} 打印: {i}")


# 创建线程对象,将print_numbers函数作为目标传递
thread = threading.Thread(target=print_numbers, name="Thread-1")

# 启动线程
thread.start()

print("主线程继续执行其他任务")

在上述代码中:

  1. 首先定义了一个print_numbers函数,这个函数就是线程要执行的任务。
  2. 使用threading.Thread(target=print_numbers, name=“Thread-1”)创建了一个线程对象。target参数指定了线程要执行的目标函数,name参数为线程指定了一个名称,方便在调试和日志记录中识别线程。
  3. 调用thread.start()方法启动线程。一旦调用start()方法,Python 解释器会为该线程分配资源,并在一个新的执行线程中调用print_numbers函数。

需要注意的是:

  • 线程启动后,主线程和新线程会并发执行:在调用start()方法后,主线程会继续执行print(“主线程继续执行其他任务”)这行代码,而新线程会开始执行print_numbers函数中的代码。两个线程的执行顺序是不确定的,这取决于操作系统的调度策略。
  • 一个线程对象的start()方法只能调用一次:如果尝试多次调用start()方法,Python 会抛出RuntimeError异常,提示线程已经启动。

2.1.2 继承 Thread 类创建线程

另一种创建线程的方式是通过继承threading.Thread类,并重写其run方法。以下是示例代码:

import threading


class MyThread(threading.Thread):
    def __init__(self, name):
        super().__init__(name=name)

    def run(self):
        for i in range(1, 6):
            print(f"线程 {self.name} 打印: {i}")


# 创建自定义线程类的实例
my_thread = MyThread(name="Thread-2")

# 启动线程
my_thread.start()

print("主线程继续执行其他任务")

在这段代码中:

  1. 定义了一个MyThread类,它继承自threading.Thread类。
  2. 在MyThread类的__init__方法中,调用了父类的__init__方法,以确保正确初始化线程对象。
  3. 重写了run方法,在run方法中定义了线程要执行的具体任务。
  4. 创建MyThread类的实例my_thread,并调用start()方法启动线程。当调用start()方法时,会自动调用run方法。

两种创建线程方式的比较:

  • 使用Thread类的实例创建线程:这种方式更加简单直接,适用于简单的任务场景。它将线程的执行逻辑和线程的创建分离,代码结构更加清晰,易于理解和维护。例如,在一个简单的多线程爬虫程序中,每个线程负责抓取一个网页,使用这种方式可以很方便地创建多个线程,每个线程执行相同的抓取任务。
  • 继承Thread类创建线程:这种方式适用于需要对线程进行更多定制化的场景。通过继承Thread类,可以在类中添加更多的属性和方法,方便对线程进行管理和控制。例如,在一个复杂的多线程下载器中,可能需要为每个线程添加下载进度、下载速度等属性,以及暂停、恢复下载等方法,使用继承Thread类的方式可以很方便地实现这些功能。

2.1.3 启动线程

在创建线程对象后,需要调用start()方法来启动线程。start()方法的作用是通知 Python 解释器为该线程分配资源,并在一个新的执行线程中调用线程的run方法(如果是通过继承Thread类创建的线程)或目标函数(如果是通过实例化Thread类并传递目标函数创建的线程)。

当调用start()方法后,线程并不会立即开始执行,而是进入就绪状态,等待操作系统的调度。一旦操作系统为该线程分配了 CPU 时间片,线程就会进入运行状态,开始执行run方法或目标函数中的代码。

以下是一个简单的示例,展示start()方法的调用和线程的执行过程:

import threading
import time


def task():
    print(f"线程 {threading.current_thread().name} 开始执行")
    time.sleep(2)
    print(f"线程 {threading.current_thread().name} 执行结束")


# 创建线程对象
thread = threading.Thread(target=task, name="Thread-3")

# 启动线程
print("主线程调用start()方法启动线程")
thread.start()

print("主线程继续执行其他任务")

在上述代码中,当调用thread.start()方法后,主线程会继续执行print(“主线程继续执行其他任务”)这行代码,而线程Thread-3会进入就绪状态,等待 CPU 调度。大约 2 秒后,线程Thread-3执行task函数中的代码,打印出 “线程 Thread-3 开始执行” 和 “线程 Thread-3 执行结束”。

2.2 线程的生命周期管理

线程从创建到结束,会经历多个阶段,每个阶段都有其特定的状态和行为。了解线程的生命周期对于有效地管理线程和编写健壮的多线程程序至关重要。线程的生命周期通常包括以下几个状态:

  • 新建(New):当创建一个线程对象时,线程处于新建状态。此时,线程对象已经被分配了内存空间,但尚未开始执行。例如,thread = threading.Thread(target=task)创建了一个线程对象thread,此时thread处于新建状态。
  • 就绪(Runnable):当调用线程的start()方法后,线程进入就绪状态。在这个状态下,线程已经准备好执行,但还没有获得 CPU 时间片。它会等待操作系统的调度,一旦获得 CPU 时间片,就会进入运行状态。例如,thread.start()调用后,线程thread进入就绪状态。
  • 运行(Running):线程获得 CPU 时间片后,开始执行run方法或目标函数中的代码,此时线程处于运行状态。在运行状态下,线程可以执行各种任务,如计算、I/O 操作等。
  • 阻塞(Blocked):线程在执行过程中,由于某些原因(如等待 I/O 操作完成、等待获取锁、调用time.sleep等)暂时无法继续执行,会进入阻塞状态。在阻塞状态下,线程不会占用 CPU 时间片,直到导致阻塞的原因被解除,线程才会重新回到就绪状态,等待 CPU 调度。例如,当线程执行time.sleep(2)时,线程会进入阻塞状态,暂停执行 2 秒,2 秒后重新回到就绪状态。
  • 死亡(Dead):当线程的run方法或目标函数执行完毕,或者线程因异常而终止时,线程进入死亡状态。处于死亡状态的线程不再执行任何代码,其占用的资源也会被释放。

在 Python 中,可以使用is_alive()方法来判断线程是否处于活动状态(即是否处于运行或就绪状态)。例如:

import threading
import time


def task():
    print(f"线程 {threading.current_thread().name} 开始执行")
    time.sleep(2)
    print(f"线程 {threading.current_thread().name} 执行结束")


# 创建线程对象
thread = threading.Thread(target=task, name="Thread-4")

# 启动线程
thread.start()

# 检查线程状态
while thread.is_alive():
    print("线程仍在运行")
    time.sleep(1)

print("线程已结束")

在上述代码中,通过while thread.is_alive():循环不断检查线程的状态,只要线程处于活动状态,就会打印 “线程仍在运行”,直到线程执行完毕,进入死亡状态,循环结束,打印 “线程已结束”。

另外,join()方法在线程生命周期管理中也起着重要的作用。join()方法用于阻塞调用它的线程,直到被调用的线程完成其执行。例如:

import threading
import time


def task():
    print(f"线程 {threading.current_thread().name} 开始执行")
    time.sleep(2)
    print(f"线程 {threading.current_thread().name} 执行结束")


# 创建线程对象
thread = threading.Thread(target=task, name="Thread-5")

# 启动线程
thread.start()

# 等待线程执行完毕
print("主线程等待线程执行完毕")
thread.join()

print("主线程继续执行")

在这段代码中,thread.join()会使主线程阻塞,等待thread线程执行完毕。只有当thread线程的task函数执行结束,join()方法才会返回,主线程才会继续执行print(“主线程继续执行”)这行代码。join()方法常用于确保某些线程按顺序执行,或者在主线程中等待所有子线程完成任务后再进行后续操作。例如,在一个多线程数据处理程序中,可能需要等待所有线程完成数据处理后,再对处理结果进行汇总和分析。

三、线程之间的共享数据与同步问题

3.1 共享数据问题

在多线程编程中,当多个线程访问和修改共享数据时,可能会出现数据不一致和竞态条件(Race Condition)等问题。竞态条件是指多个线程同时访问和修改共享数据,导致最终结果依赖于线程的执行顺序,从而产生不可预测的结果。

以下通过一个简单的示例代码来展示共享数据时可能出现的问题:

import threading

# 共享数据
counter = 0


def increment():
    global counter
    for _ in range(10000):
        counter += 1


# 创建多个线程
threads = []
for _ in range(10):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

print(f"最终的计数器值为: {counter}")

在上述代码中,我们创建了 10 个线程,每个线程都会对共享变量counter进行 10000 次加一操作。理论上,最终的counter值应该是10 * 10000 = 100000。然而,由于多线程同时访问和修改counter,可能会出现竞态条件,导致最终的结果小于 100000。

这是因为counter += 1这一操作并非原子操作,它实际上包含了读取counter的值、加一、再将结果写回counter这三个步骤。当多个线程同时执行这三个步骤时,就可能出现以下情况:

  1. 线程 A 读取counter的值为 10。
  2. 线程 B 也读取counter的值为 10(此时线程 A 还未完成加一和写回操作)。
  3. 线程 A 将counter加一,得到 11,并写回counter。
  4. 线程 B 也将counter加一(它读取的值是 10),得到 11,并写回counter。

这样,虽然两个线程都执行了加一操作,但counter只增加了 1,而不是 2,从而导致数据不一致。

3.2 线程同步机制

为了解决多线程环境下共享数据的问题,需要使用线程同步机制来确保在同一时间只有一个线程能够访问和修改共享数据。Python 提供了多种线程同步机制,如锁(Lock)、信号量(Semaphore)、条件变量(Condition)等。

3.2.1 锁(Lock)

锁是一种最基本的线程同步机制,它用于保证在同一时间只有一个线程能够进入临界区(即访问共享资源的代码段)。当一个线程获取到锁后,其他线程如果试图获取该锁,就会被阻塞,直到持有锁的线程释放锁。

在 Python 中,可以使用threading.Lock类来实现锁机制。Lock类提供了两个主要方法:

  • acquire():获取锁。如果锁可用,线程将获取锁并继续执行;如果锁已被其他线程持有,线程将被阻塞,直到锁被释放。
  • release():释放锁。将锁释放,允许其他线程获取锁。

以下是使用锁来保护共享资源的示例代码:

import threading

# 共享数据
counter = 0
# 创建锁对象
lock = threading.Lock()


def increment():
    global counter
    for _ in range(10000):
        # 获取锁
        lock.acquire()
        try:
            counter += 1
        finally:
            # 释放锁
            lock.release()


# 创建多个线程
threads = []
for _ in range(10):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

print(f"最终的计数器值为: {counter}")

在上述代码中,我们创建了一个Lock对象lock。在increment函数中,每次对counter进行操作前,先调用lock.acquire()获取锁,操作完成后,使用finally块确保无论是否发生异常,都会调用lock.release()释放锁。这样就保证了在同一时间只有一个线程能够访问和修改counter,从而避免了竞态条件。

使用with语句可以更简洁地使用锁,改写后的代码如下:

import threading

# 共享数据
counter = 0
# 创建锁对象
lock = threading.Lock()


def increment():
    global counter
    for _ in range(10000):
        with lock:
            counter += 1


# 创建多个线程
threads = []
for _ in range(10):
    thread = threading.Thread(target=increment)
    threads.append(thread)
    thread.start()

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

print(f"最终的计数器值为: {counter}")

在这个版本中,with lock:语句会自动在进入代码块时获取锁,在离开代码块时释放锁,代码更加简洁和安全。

3.2.2 信号量(Semaphore)

信号量是一种更高级的同步机制,它允许同时有多个线程访问共享资源,但限制了同时访问的线程数量。信号量维护了一个计数器,当一个线程获取信号量时,计数器减一;当一个线程释放信号量时,计数器加一。当计数器为 0 时,其他线程无法获取信号量,只能等待。

在 Python 中,可以使用threading.Semaphore类来创建信号量。Semaphore类的构造函数接受一个参数value,表示信号量的初始值,即允许同时访问的线程数量。

以下是使用信号量控制对共享资源访问数量的示例代码:

import threading
import time

# 创建一个信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)


def access_resource():
    with semaphore:
        print(f"{threading.current_thread().name} 正在访问资源")
        time.sleep(2)
        print(f"{threading.current_thread().name} 访问资源结束")


# 创建多个线程
threads = []
for i in range(10):
    thread = threading.Thread(target=access_resource, name=f"Thread-{i}")
    threads.append(thread)
    thread.start()

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

在上述代码中,我们创建了一个信号量semaphore,初始值为 3,表示最多允许 3 个线程同时访问资源。每个线程在访问资源前,先通过with semaphore:获取信号量,如果当前有 3 个线程正在访问,其他线程就会被阻塞,直到有线程释放信号量。这样就有效地控制了对共享资源的并发访问数量,避免资源竞争过于激烈。

3.2.3 条件变量(Condition)

条件变量用于线程之间的复杂同步和通信。它允许线程在满足特定条件时等待,当条件满足时被唤醒。条件变量通常与锁一起使用,以确保在检查条件和等待条件满足之间的操作是原子性的。

在 Python 中,条件变量通过threading.Condition类来实现。Condition类提供了以下几个重要方法:

  • acquire():获取锁,进入临界区。
  • release():释放锁,离开临界区。
  • wait(timeout=None):释放锁,并进入等待状态,直到被其他线程通过notify()或notify_all()唤醒,或者等待超时。在等待期间,线程会被阻塞,并且会释放持有的锁,以便其他线程可以获取锁并修改共享数据。当被唤醒后,线程会重新获取锁。
  • notify(n=1):唤醒一个等待在条件变量上的线程。如果有多个线程在等待,会随机唤醒一个。
  • notify_all():唤醒所有等待在条件变量上的线程。

以下是使用条件变量实现生产者 - 消费者模型的示例代码:

import threading
import time

# 共享数据,模拟缓冲区
buffer = []
# 创建锁对象
lock = threading.Lock()
# 创建条件变量对象
condition = threading.Condition(lock)


# 生产者线程函数
def producer():
    global buffer
    while True:
        with condition:
            # 当缓冲区已满时,等待消费者消费
            while len(buffer) >= 5:
                print("缓冲区已满,生产者等待")
                condition.wait()
            # 生产数据
            item = f"Item {len(buffer) + 1}"
            buffer.append(item)
            print(f"生产者生产了: {item}")
            # 通知消费者
            condition.notify()
            time.sleep(1)


# 消费者线程函数
def consumer():
    global buffer
    while True:
        with condition:
            # 当缓冲区为空时,等待生产者生产
            while not buffer:
                print("缓冲区为空,消费者等待")
                condition.wait()
            # 消费数据
            item = buffer.pop(0)
            print(f"消费者消费了: {item}")
            # 通知生产者
            condition.notify()
            time.sleep(1)


# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# 启动线程
producer_thread.start()
consumer_thread.start()

在上述代码中:

  • 生产者线程在生产数据前,先获取条件变量的锁,检查缓冲区是否已满。如果已满,调用condition.wait()等待,释放锁并进入等待状态。当被消费者线程唤醒后,重新获取锁,继续生产数据。生产完成后,调用condition.notify()通知消费者线程。
  • 消费者线程在消费数据前,同样获取条件变量的锁,检查缓冲区是否为空。如果为空,调用condition.wait()等待,释放锁并进入等待状态。当被生产者线程唤醒后,重新获取锁,继续消费数据。消费完成后,调用condition.notify()通知生产者线程。

通过条件变量,生产者和消费者线程可以在合适的时机进行同步和通信,避免了缓冲区溢出或空读的问题,实现了线程之间的复杂协作。


网站公告

今日签到

点亮在社区的每一天
去签到