python3 基础库及常用第三方库解析

发布于:2025-06-16 ⋅ 阅读:(22) ⋅ 点赞:(0)

文章目录


Python 高频库及常用函数

一、python3.x 内置库

1. 内置库核心模块

1. 内置函数(Built-in Functions)

  • 高频函数print(), len(), range(), input(), open(), type(), dir(), help()
def my_test2():
    # range 生成 0 到 4 的整数序列
    for i in range(5):
        print(i, end=' ')  # 输出: 0 1 2 3 4
    # 指定起始和结束值
    for i in range(2, 6):
        print(i, end=' ')  # 输出: 2 3 4 5

    # input 输入
    name = input("请输入你的名字: ")
    print(f"你好, {name}")

    # open 读取文件
    with open("test.txt", "r", encoding='utf-8') as f:
        content = f.read()
        print(content)
    # 写入文件
    with open("output.txt", "w", encoding='utf-8') as f:
        f.write("Hello Python!")
    # type() get the type of object
    print(type(123))  # 输出: <class 'int'>
    print(type("hello"))  # 输出: <class 'str'>
    print(type([1, 2, 3]))  # 输出: <class 'list'>
  • 高级函数map(), filter(), reduce(), zip(), sorted(), reversed()
def my_test3():

    # map():对可迭代对象的每个元素应用函数
    nums = [1, 2, 3, 4]
    squares = map(lambda x: x ** 2, nums)
    print(list(squares))  # 输出: [1, 4, 9, 16]

    # filter():过滤可迭代对象的元素
    nums = [1, 2, 3, 4, 5, 6]
    evens = filter(lambda x: x % 2 == 0, nums)
    print(list(evens))  # 输出: [2, 4, 6]

    # reduce():累积地对序列的元素应用函数(需导入)
    nums = [1, 2, 3, 4]
    product = reduce(lambda x, y: x * y, nums)
    print(product)  # 输出: 24 (1*2*3*4)

    # zip():将多个可迭代对象的元素打包成元组
    names = ["Alice", "Bob"]
    ages = [25, 30]
    zipped = zip(names, ages)
    print(list(zipped))  # 输出: [('Alice', 25), ('Bob', 30)]

    # sorted():返回新的已排序列表
    nums = [3, 1, 4, 1, 5]
    sorted_nums = sorted(nums)
    print(sorted_nums)  # 输出: [1, 1, 3, 4, 5]

    # reversed():返回反向迭代器
    lst = [1, 2, 3]
    reversed_lst = list(reversed(lst))
    print(reversed_lst)  # 输出: [3, 2, 1]

2. 数据结构模块

基础数据结构

1. 列表(List)
  • 特点 :可变、有序、允许重复元素。
  • 语法 :用方括号 [] 定义。
2. 元组(Tuple)
  • 特点 :不可变、有序、允许重复元素。
  • 语法 :用圆括号 () 或逗号分隔定义。
3. 字典(Dict)
  • 特点 :键值对存储、无序(Python 3.7+ 后有序)、键唯一。
  • 语法 :用花括号 {} 和冒号 key:value 定义。
4. 集合(Set)
  • 特点 :无序、唯一、元素不可变(但集合本身可变)。
  • 语法 :用花括号 {}set() 函数定义(空集合必须用 set())。
6. 字符串(String)
  • 特点 :不可变、有序的字符序列。
  • 语法 :用单引号 '、双引号 " 或三引号 ''' 定义。
7. 字节(Bytes)
  • 特点 :不可变的二进制数据,范围 0-255。
  • 语法 :用 b'...' 定义。
8. 字节数组(Bytearray)
  • 特点 :可变的二进制数据。
  • 语法 :用 bytearray() 函数定义。

    def my_test1():  
        # 用单引号 '、双引号 " 或三引号 ''' 定义。  
        variable = 'var1'  
        variable_ = 'variable2'  
        print(f'this is print function, {variable}')  # print with f (f means format)  
        print(f'{0.987654321:.4f}')  
        print(len(variable_))  # len() function  
        print(variable_[0])  
      
        # 字节数组  
        data_byte = b'hello'  
        print(data_byte.decode('utf-8'))  
        print(data_byte[0])  
        print(data_byte)  
      
        # 字符-字节转换  
        data_byte_ = 'hello'.encode('utf-8')  
        print(data_byte_.decode('utf-8'))  
      
        # 字节数组修改  
        data = bytearray(b"hello3")  
        data[0] = 74  # 修改为 'J' (ASCII 74)  
        print(data.decode('utf-8'))  # 输出:bytearray(b'Jello')  
      
      
      
        lst = [1, 2, 3, 4, 5, 7, "8"]  # 列表 可变、有序、允许重复元素,元素类型可不同  
        tuple_ = (3, 4, '5')  # 元组 不可变、有序、允许重复元素。  
        dict_ = {"name": "Alice", "age": 30}  # 字典 键值对存储、无序(Python 3.7+ 后有序)、键唯一。  
        set_ = {1, 2, 3, 3}  # 无序、唯一、元素不可变(但集合本身可变)。  
      
        lst.append("date")  # 添加元素  
        lst[0] = "avocado"  # 修改元素  
        idx = 0  
        for l in lst:  
            print(f'{l}', end='')  
            idx += 1  
            if idx < len(lst):  
                print(' -> ', end='')  
        print("")  
      
        for i in range(len(lst)):  
            print(f'index {i} : {lst[i]}')  
        print(lst)  
    # ========================  
        for value in tuple_:  
            print(f' {value} ', end='')  
        print("")  
      
        for i in range(len(tuple_)):  
            print(f'index {i} : {tuple_[i]}')  
      
        print(tuple_)  
    # =========================  
        for key, value in dict_.items():  
            print(f'key: {key} value: {value}')  
      
        for value in set_:  
            print(f" {value} ", end='')  
        print('')

高级数据结构collections

  • Counter:统计元素频率
  • DefaultDict :带默认值的字典
  • OrderedDict:有序字典
  • deque:双端队列(高效的插入 / 删除)
  • 数组(Array) :高效存储单一类型的数组(需导入 array.array)。
  • 堆队列(Heap Queue):实现最小堆的模块(需导入 heapq)。
  • 双端队列(Deque) :高效的双向队列(需导入 collections.deque)。
在这里插入代码片
3. 字符串处理

re(正则表达式)

  • re.search(), re.match(), re.findall(), re.sub()
# string 连接
words = ["Python", "is", "awesome"]
result = " ".join(words)  # 输出: "Python is awesome"

# 字符串格式化
name = "Bob"
age = 30
message = f"My name is {name} and I'm {age} years old."
# 输出: "My name is Bob and I'm 30 years old."

#字符串查找与替换
text = "Hello, World!"
pos = text.find("World")  # 输出: 7

text = "banana"
count = text.count("a")  # 统计,输出: 3

# 字符串查找与替换
text = "Hello, World!"
pos = text.find("World")  # 输出: 7
text = "banana"
count = text.count("a")  # 输出: 3
text = "Hello, World!"
new_text = text.replace("World", "Python") # 输出: "Hello, Python!"
# 按分隔符分割字符串为列表:
text = "apple,banana,orange"
fruits = text.split(",")  # 输出: ["apple", "banana", "orange"]

# 字符串分割与合并
# 按分隔符分割字符串为列表:
text = "apple,banana,orange"
fruits = text.split(",")  # 输出: ["apple", "banana", "orange"]
#按分隔符将字符串分为三部分:
text = "Hello,World"
result = text.partition(",")  # 输出: ("Hello", ",", "World")
# 检查字符串开头或结尾:
text = "Hello, World!"
is_start = text.startswith("Hello")  # 输出: True
is_end = text.endswith("!")  # 输出: True
# 检查字符串是否全为字母、数字或字母数字:
"abc".isalpha()  # 输出: True
"123".isdigit()  # 输出: True
"abc123".isalnum()  # 输出: True
# 去除两侧、左侧或右侧的空白字符:
text = "   Hello   "
clean_text = text.strip()  # 输出: "Hello"
left_clean = text.lstrip()  # 输出: "Hello   "

# 处理字符编码(如 UTF-8):
text = "你好"
encoded = text.encode("utf-8")  # 输出: b'\xe4\xbd\xa0\xe5\xa5\xbd'
decoded = encoded.decode("utf-8")  # 输出: "你好"
# 返回字符串的转义表示:
text = 'Hello\nWorld'
print(repr(text))  # 输出: 'Hello\nWorld'
# 使用正则表达式处理复杂替换:
import re
text = "Hello 123 World 456"
clean_text = re.sub(r'\d+', '', text)  # 移除所有数字
# 输出: "Hello  World "

2. 文件与目录操作

ospathlib
在 Python 中,ospathlib 是处理文件路径和操作系统相关功能的两个核心库。os 是早期标准库,提供面向过程的函数;而pathlib(Python 3.4+)则采用面向对象的方式,代码更简洁。以下是它们的常用功能对比和示例:

一、路径操作(核心功能)

1. 获取当前工作目录
# os 方式
import os
cwd = os.getcwd()
print(cwd)  # 输出:/Users/username/project

# pathlib 方式
from pathlib import Path
cwd = Path.cwd()
print(cwd)  # 输出:PosixPath('/Users/username/project')
2. 拼接路径
# os 方式(需使用 os.path.join)
file_path = os.path.join(cwd, "data", "test.txt")
print(file_path)  # 输出:/Users/username/project/data/test.txt

# pathlib 方式(直接用斜杠 /)
file_path = Path(cwd) / "data" / "test.txt"
print(file_path)  # 输出:PosixPath('/Users/username/project/data/test.txt')
3. 拆分路径组件
path = "/home/user/docs/report.pdf"

# os 方式
dir_name = os.path.dirname(path)    # 输出:/home/user/docs
base_name = os.path.basename(path)  # 输出:report.pdf
file_name, ext = os.path.splitext(path)  # 输出:('report', '.pdf')

# pathlib 方式
path_obj = Path(path)
print(path_obj.parent)   # 输出:PosixPath('/home/user/docs')
print(path_obj.name)     # 输出:report.pdf
print(path_obj.stem)     # 输出:report
print(path_obj.suffix)   # 输出:.pdf

二、文件和目录操作

1. 检查路径是否存在
# os 方式
os.path.exists("/tmp")  # 返回 True 或 False

# pathlib 方式
Path("/tmp").exists()   # 返回 True 或 False
2. 创建目录
# 创建单层目录
new_dir = Path("new_folder")
new_dir.mkdir(exist_ok=True)  # 等价于 os.makedirs("new_folder", exist_ok=True)

# 创建多层目录
Path("a/b/c").mkdir(parents=True, exist_ok=True)  # 等价于 os.makedirs("a/b/c", exist_ok=True)
3. 重命名 / 移动文件
# os 方式
os.rename("old.txt", "new.txt")  # 移动或重命名

# pathlib 方式
Path("old.txt").replace("new.txt")  # 覆盖已存在文件
Path("old.txt").rename("new.txt")   # 不允许覆盖
4. 删除文件 / 目录
# 删除文件
Path("temp.txt").unlink(missing_ok=True)  # 等价于 os.remove("temp.txt")

# 删除空目录
Path("empty_dir").rmdir()  # 等价于 os.rmdir("empty_dir")

# 删除非空目录(需使用 shutil)
import shutil
shutil.rmtree("full_dir")

三、文件属性和权限

1. 获取文件大小和修改时间
# os 方式
size = os.path.getsize("data.csv")  # 单位:字节
mtime = os.path.getmtime("data.csv")  # 修改时间戳

# pathlib 方式
file = Path("data.csv")
size = file.stat().st_size
mtime = file.stat().st_mtime
2. 修改文件权限
# os 方式
os.chmod("script.py", 0o755)  # 755 权限

# pathlib 方式
Path("script.py").chmod(0o755)

四、文件遍历

1. 列出目录内容
# os 方式
for item in os.listdir("/tmp"):
    print(item)

# pathlib 方式
for item in Path("/tmp").iterdir():
    print(item)  # 返回 Path 对象
2. 递归查找文件(通配符)
# 查找所有 .txt 文件
# os 方式(结合 os.walk)
for root, dirs, files in os.walk("."):
    for file in files:
        if file.endswith(".txt"):
            print(os.path.join(root, file))

# pathlib 方式(更简洁)
for txt_path in Path(".").rglob("*.txt"):
    print(txt_path)

五、环境变量和系统操作

1. 获取环境变量
# os 方式
home_dir = os.environ.get("HOME")  # 等价于 os.getenv("HOME")
print(home_dir)  # 输出:/Users/username

# pathlib 方式(结合 os)
home_dir = Path.home()  # 等价于 Path(os.environ["HOME"])
2. 执行系统命令
# os 方式(推荐使用 subprocess)
os.system("ls -l")  # 执行 shell 命令

# subprocess 方式(更安全)
import subprocess
result = subprocess.run(["ls", "-l"], capture_output=True, text=True)
print(result.stdout)

3. 日期与时间

3.1 time 库

一、基础时间模块:time

核心功能 :获取当前时间戳、休眠、CPU 时间测量

import time

# 1. 获取当前时间戳(秒)
timestamp = time.time()  # 如:1696137123.456
print(f"时间戳: {timestamp}")

# 2. 程序休眠(暂停执行)
print("开始休眠...")
time.sleep(2)  # 暂停2秒
print("休眠结束")

# 3. 测量代码执行时间
start_time = time.time()
# 执行一些操作
time.sleep(1)
end_time = time.time()
print(f"执行耗时: {end_time - start_time:.2f}秒")

# 4. 格式化时间(不常用,更推荐datetime)
struct_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)
print(f"格式化时间: {formatted_time}")

3.2 datetime 库

在 Python 中,datetime库是处理日期和时间的标准库,功能强大且易于使用。下面从基本概念到高级应用,详细介绍其核心组件和用法。

一、核心类

datetime库包含 4 个主要类:

  1. date :处理年月日(如:2023-10-01)
  2. time :处理时分秒(如:14:30:25)
  3. datetime :同时处理日期和时间(继承自date
  4. timedelta :表示时间间隔(如:3 天 5 小时)

二、基本用法

1. 获取当前日期和时间
from datetime import datetime, date, time

# 当前日期和时间
now = datetime.now()  # 输出:2023-10-01 14:30:25.123456
print(now)

# 仅日期
today = date.today()  # 输出:2023-10-01
print(today)

# 仅时间
current_time = now.time()  # 输出:14:30:25.123456
print(current_time)
2. 创建指定日期和时间
# 创建datetime对象
dt = datetime(2023, 10, 1, 14, 30, 25)  # 2023-10-01 14:30:25
print(dt)

# 创建date对象
d = date(2023, 10, 1)  # 2023-10-01
print(d)

# 创建time对象
t = time(14, 30, 25)  # 14:30:25
print(t)
3. 日期和时间的属性
dt = datetime.now()

print(dt.year)    # 年:2023
print(dt.month)   # 月:10
print(dt.day)     # 日:1
print(dt.hour)    # 时:14
print(dt.minute)  # 分:30
print(dt.second)  # 秒:25
print(dt.weekday())  # 星期几(0=周一,6=周日)

三、日期和时间的计算(timedelta

timedelta用于计算日期 / 时间的差值或偏移。

1. 计算时间间隔
from datetime import timedelta

# 创建时间间隔
delta = timedelta(days=3, hours=2, minutes=15)
print(delta)  # 3 days, 2:15:00

# 日期偏移
today = date.today()
future = today + delta  # 3天后
print(future)

# 计算两个日期的差值
d1 = date(2023, 10, 1)
d2 = date(2023, 10, 10)
diff = d2 - d1
print(diff.days)  # 9天
2. 时间加减示例
now = datetime.now()

# 3小时后
future = now + timedelta(hours=3)
print(future)

# 2天前
past = now - timedelta(days=2)
print(past)

四、日期和字符串的转换

1. 日期→字符串(格式化)

使用strftime()方法,通过格式代码(如%Y-%m-%d)转换。

now = datetime.now()

# 转为指定格式的字符串
formatted = now.strftime("%Y-%m-%d %H:%M:%S")  # 2023-10-01 14:30:25
print(formatted)

# 常用格式代码:
# %Y:四位数年份(2023)
# %m:两位数月份(01-12)
# %d:两位数日期(01-31)
# %H:24小时制小时(00-23)
# %M:分钟(00-59)
# %S:秒(00-59)
# %A:星期全称(Monday)
# %B:月份全称(October)
2. 字符串→日期(解析)

使用strptime()方法,需指定字符串格式。

date_str = "2023-10-01 14:30:25"
dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
print(dt)  # 2023-10-01 14:30:25

五、时区处理

datetime默认是无时区的,可通过tzinfo属性添加时区信息。

1. 使用pytz库(需安装)
from datetime import datetime
import pytz

# 创建带时区的datetime
utc_now = datetime.now(pytz.UTC)  # UTC时间
print(utc_now)

# 转换时区
shanghai_tz = pytz.timezone("Asia/Shanghai")
shanghai_now = utc_now.astimezone(shanghai_tz)
print(shanghai_now)  # UTC+8时间
2. Python 3.9+ 的zoneinfo模块(内置)
from datetime import datetime
from zoneinfo import ZoneInfo

# 创建带时区的datetime
utc_now = datetime.now(ZoneInfo("UTC"))
print(utc_now)

# 转换时区
shanghai_now = utc_now.astimezone(ZoneInfo("Asia/Shanghai"))
print(shanghai_now)

六、高级用法

1. 判断日期前后
d1 = date(2023, 10, 1)
d2 = date(2023, 10, 2)

print(d1 < d2)  # True
print(d1 > d2)  # False
2. 计算月份差
from dateutil.relativedelta import relativedelta

d1 = date(2023, 1, 1)
d2 = date(2023, 10, 1)

delta = relativedelta(d2, d1)
print(delta.months)  # 9个月
3. 获取当月最后一天
from calendar import monthrange

year, month = 2023, 10
last_day = monthrange(year, month)[1]  # 31
print(last_day)

七、总结

  • 基础操作 :用datetimedatetime创建和操作日期时间。
  • 时间间隔 :用timedelta进行日期加减。
  • 格式转换strftime()格式化日期为字符串,strptime()解析字符串为日期。
  • 时区处理 :用pytzzoneinfo处理时区。

4. JSON 处理

一、选择建议

  • 基础需求 :使用标准库 json
  • 高性能 :使用 ujsonorjson
  • 特殊类型支持 :使用 simplejson
  • 模式验证 :使用 jsonschema
  • 大文件处理 :使用流式解析或 JSON Lines 格式

一、标准库:json(核心)

特点 :Python 内置,提供基础 JSON 序列化 / 反序列化功能。

import json

# 1. json.dumps(obj) - 将 Python 对象转为 JSON 字符串
data = {
    "name": "Alice",
    "age": 30,
    "hobbies": ["reading", "swimming"],
    "is_student": False,
    "address": {"city": "Beijing", "zip": "100000"}
}

json_str = json.dumps(data, indent=2)  # 缩进2个空格,美化输出
print(json_str)
# 输出:
# {
#   "name": "Alice",
#   "age": 30,
#   "hobbies": ["reading", "swimming"],
#   "is_student": false,
#   "address": {"city": "Beijing", "zip": "100000"}
# }

# 2. json.loads(json_str) - 将 JSON 字符串转为 Python 对象
parsed_data = json.loads(json_str)
print(parsed_data["age"])  # 30

# 3. json.dump(obj, file) - 将对象写入 JSON 文件
with open('data.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)  # ensure_ascii=False 支持中文

# 4. json.load(file) - 从 JSON 文件读取对象
with open('data.json', 'r', encoding='utf-8') as f:
    loaded_data = json.load(f)
2. 关键参数
  • indent:指定缩进空格数(美化输出)
  • ensure_ascii:是否强制 ASCII 编码(处理中文需设为 False
  • sort_keys:是否按键排序(True/False
  • default:自定义序列化函数(处理特殊对象)

二、第三方库:ujson(高性能)

特点 :速度快,占用内存少,API 与 json 兼容。

pip install ujson
import ujson

# 用法与 json 完全一致
data = {"key": "value"}
json_str = ujson.dumps(data)
parsed = ujson.loads(json_str)

三、第三方库:simplejson(兼容性强)

特点 :支持更多 Python 数据类型(如 Decimal),兼容 Python 2/3。

pip install simplejson
import simplejson
from decimal import Decimal

data = {"price": Decimal("9.99")}

# 使用 default 参数处理特殊类型
json_str = simplejson.dumps(data, default=str)

四、处理特殊对象

1. 自定义对象序列化
import json

class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age

# 方法1:自定义序列化函数
def person_to_dict(obj):
    if isinstance(obj, Person):
        return {"name": obj.name, "age": obj.age}
    raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")

p = Person("Bob", 25)
json_str = json.dumps(p, default=person_to_dict)

# 方法2:在类中实现 to_dict 方法
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age
    def to_dict(self):
        return {"name": self.name, "age": self.age}

json_str = json.dumps(p.to_dict())
2. 处理 datetime 类型
import json
from datetime import datetime

data = {"timestamp": datetime.now()}

# 自定义序列化函数
def datetime_to_str(obj):
    if isinstance(obj, datetime):
        return obj.strftime("%Y-%m-%d %H:%M:%S")
    raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")

json_str = json.dumps(data, default=datetime_to_str)

五、流式处理大文件

场景 :处理超过内存大小的 JSON 文件。

1. 逐行解析 JSON Lines 格式
# 示例 JSON Lines 文件 (data.jsonl)
# {"name": "Alice", "age": 30}
# {"name": "Bob", "age": 25}

with open('data.jsonl', 'r') as f:
    for line in f:
        obj = json.loads(line)
        print(obj["name"])
2. 使用 json.JSONDecoder 解析不完整 JSON
decoder = json.JSONDecoder()
buffer = ""

with open('large_data.json', 'r') as f:
    for chunk in f:
        buffer += chunk
        while buffer:
            try:
                obj, pos = decoder.raw_decode(buffer)
                process(obj)  # 处理解析出的对象
                buffer = buffer[pos:].lstrip()
            except json.JSONDecodeError:
                break  # 等待更多数据

六、JSON 与 CSV 转换

import json
import csv

# JSON 转 CSV
with open('data.json', 'r') as f:
    data = json.load(f)

with open('output.csv', 'w', newline='') as f:
    writer = csv.DictWriter(f, fieldnames=["name", "age", "city"])
    writer.writeheader()
    writer.writerows(data)

# CSV 转 JSON
with open('input.csv', 'r') as f:
    reader = csv.DictReader(f)
    data = list(reader)

with open('output.json', 'w') as f:
    json.dump(data, f, indent=2)
1. 常用方法

json

  • json.dumps(), json.loads(), json.dump(), json.load()

5. 内置装饰器

1.@staticmethod 将方法转换为静态方法,不需要实例或类作为第一个参数
2. @classmethod: 定义一个类方法,第一个参数是类对象(通常命名为cls),允许类操作自身。
2.@abstractmethod 定义抽象方法,需要配合 abc 模块使用,表示该方法必须在子类中被实现,否则会抛出异常。
3. @functools.lru_cache 缓存函数结果,适用于计算密集型且参数相同的函数
4. @property: 将一个方法变成属性,允许像访问属性一样访问这个方法,主要用于提供封装性、安全性和便利性
5. @functools.wraps - 用于在自定义装饰器中保留原始函数的元数据

import functools

def my_decorator(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        return func(*args, **kwargs)
    return wrapper
  1. @timeit - 自定义装饰器,用于测量函数执行时间
import time

def timeit(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        result = func(*args, **kwargs)
        end = time.time()
        print(f"函数 {func.__name__} 执行时间: {end - start} 秒")
        return result
    return wrapper
  1. @retry - 自定义装饰器,用于重试失败的函数
def retry(max_attempts=3, delay=1):
    def decorator(func):
        def wrapper(*args, **kwargs):
            attempts = 0
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempts += 1
                    if attempts == max_attempts:
                        raise
                    time.sleep(delay)
        return wrapper
    return decorator

二、Web开发库

Flask :轻量级 Web 框架

在这里插入代码片

Django :全功能 Web 框架

在这里插入代码片

三、网络请求库

以下是 Python 中常用的网络请求库及其典型示例代码,按使用场景分类整理:

一、标准库:urllib

特点 :Python 内置,无需安装,适合简单请求。

# GET 请求
from urllib.request import urlopen

with urlopen('https://api.example.com/data') as response:
    data = response.read().decode('utf-8')
    print(data)

# POST 请求
from urllib.parse import urlencode
from urllib.request import Request, urlopen

data = {'key': 'value'}
params = urlencode(data).encode('utf-8')
request = Request('https://api.example.com/submit', data=params)
with urlopen(request) as response:
    result = response.read()

二、第三方库:requests(最流行)

特点 :API 简洁,支持会话、文件上传、超时设置等。

pip install requests
import requests

# GET 请求
response = requests.get('https://api.example.com/data')
print(response.status_code)  # 状态码
print(response.json())       # 解析 JSON

# POST 请求
data = {'key': 'value'}
response = requests.post('https://api.example.com/submit', json=data)

# 带参数的请求
params = {'page': 1, 'size': 10}
response = requests.get('https://api.example.com/list', params=params)

# 会话保持(自动处理 cookies)
session = requests.Session()
session.get('https://example.com/login')  # 登录
session.get('https://example.com/dashboard')  # 访问需要登录的页面

三、异步请求:aiohttp

特点 :基于 asyncio,支持异步请求,适合高并发场景。

pip install aiohttp
import asyncio
import aiohttp

async def fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    urls = ['https://api.example.com/1', 'https://api.example.com/2']
    tasks = [fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

四、高性能:httpx

特点 :兼容 requests API,支持同步 / 异步,HTTP/2。

pip install httpx
import httpx

# 同步请求
response = httpx.get('https://api.example.com/data')
print(response.json())

# 异步请求
async def fetch_async():
    async with httpx.AsyncClient() as client:
        response = await client.get('https://api.example.com/data')
        return response.json()

# HTTP/2 请求
response = httpx.get('https://http2.akamai.com/demo', http2=True)

五、底层控制:socket

特点 :Python 内置,适合实现自定义协议。

import socket

# 创建 TCP 套接字
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect(('www.example.com', 80))
    s.sendall(b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
    data = s.recv(1024)
    print(data.decode('utf-8'))

六、文件上传示例

import requests

# 单文件上传
files = {'file': open('example.txt', 'rb')}
response = requests.post('https://api.example.com/upload', files=files)

# 多文件上传
files = {
    'file1': open('example1.txt', 'rb'),
    'file2': open('example2.txt', 'rb'),
}
response = requests.post('https://api.example.com/upload', files=files)

七、请求超时与重试

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# 设置重试策略
session = requests.Session()
retries = Retry(total=3, backoff_factor=1)
session.mount('https://', HTTPAdapter(max_retries=retries))

# 带超时的请求
try:
    response = session.get('https://api.example.com', timeout=5)  # 5秒超时
except requests.exceptions.Timeout:
    print('请求超时')

八、选择建议

  • 简单请求 :优先使用 requests
  • 异步高并发 :使用 aiohttphttpx
  • 底层控制 :使用 socketasyncio 原生套接字
  • 仅处理 JSON API :考虑 requestshttpx

四、并发编程库

选择建议

  • I/O 密集型任务:优先使用 asyncio,其次是 threading。
  • CPU 密集型任务:使用 multiprocessing。
  • 分布式计算:使用 ray 或 dask。

这些示例展示了 Python 中常见的并行编程方法,根据任务类型选择合适的方式可以显著提升程序性能。

多线程(threading)

适用于 I/O 密集型任务(如网络请求、文件读写),受 GIL(全局解释器锁)限制,不适合 CPU 密集型任务。
优点:轻量级,适合 I/O 等待时间长的场景。
缺点:受 GIL 限制,无法充分利用多核 CPU。

import threading
import requests
import time

def download_image(url, name):
    """下载图片并保存到本地"""
    start_time = time.time()
    response = requests.get(url)
    with open(f"image_{name}.jpg", "wb") as f:
        f.write(response.content)
    print(f"图片 {name} 下载完成,耗时: {time.time() - start_time:.2f}秒")

def main_threading():
    # 示例图片URL
    urls = [
        "https://picsum.photos/seed/1/800/600",
        "https://picsum.photos/seed/2/800/600",
        "https://picsum.photos/seed/3/800/600",
        "https://picsum.photos/seed/4/800/600",
        "https://picsum.photos/seed/5/800/600"
    ]
    
    # 创建线程列表
    threads = []
    for i, url in enumerate(urls):
        thread = threading.Thread(target=download_image, args=(url, i))
        threads.append(thread)
        thread.start()
    
    # 等待所有线程完成
    for thread in threads:
        thread.join()

if __name__ == "__main__":
    main_threading()

多进程(multiprocessing)

适用于 CPU 密集型任务(如科学计算、数据处理),每个进程有独立的 Python 解释器,不受 GIL 限制。
优点:充分利用多核 CPU,适合计算密集型任务。
缺点:进程创建开销大,进程间通信复杂。

import multiprocessing
import math
import time

def is_prime(n):
    """判断一个数是否为素数"""
    if n <= 1:
        return False
    if n <= 3:
        return True
    if n % 2 == 0 or n % 3 == 0:
        return False
    for i in range(5, int(math.sqrt(n)) + 1, 6):
        if n % i == 0 or n % (i + 2) == 0:
            return False
    return True

def count_primes(start, end, result_queue):
    """统计指定范围内的素数数量"""
    count = 0
    for num in range(start, end):
        if is_prime(num):
            count += 1
    result_queue.put(count)

def main_multiprocessing():
    # 计算范围
    n = 1000000
    num_processes = multiprocessing.cpu_count()
    chunk_size = n // num_processes
    
    # 创建进程和结果队列
    processes = []
    result_queue = multiprocessing.Queue()
    
    # 启动多个进程
    for i in range(num_processes):
        start = i * chunk_size
        end = (i + 1) * chunk_size if i < num_processes - 1 else n
        p = multiprocessing.Process(target=count_primes, args=(start, end, result_queue))
        processes.append(p)
        p.start()
    
    # 等待所有进程完成并收集结果
    total_primes = 0
    for p in processes:
        p.join()
    
    while not result_queue.empty():
        total_primes += result_queue.get()
    
    print(f"1到{n}之间共有{total_primes}个素数")

if __name__ == "__main__":
    main_multiprocessing()


异步编程(asyncio)

适用于高并发 I/O 任务(如网络爬虫、服务器),通过协程(coroutine)实现非阻塞 I/O。
优点:极高的并发性能,资源消耗少。
缺点:调试复杂,需要使用异步库。

import asyncio
import aiohttp
import time

async def fetch(session, url, name):
    """异步获取网页内容"""
    start_time = time.time()
    async with session.get(url) as response:
        html = await response.text()
        print(f"网页 {name} 下载完成,耗时: {time.time() - start_time:.2f}秒")
        return len(html)

async def main_asyncio():
    # 示例URL
    urls = [
        "https://www.example.com",
        "https://www.python.org",
        "https://www.github.com",
        "https://www.google.com",
        "https://www.yahoo.com"
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = []
        for i, url in enumerate(urls):
            task = asyncio.create_task(fetch(session, url, i))
            tasks.append(task)
        
        # 等待所有任务完成
        results = await asyncio.gather(*tasks)
        print(f"总共下载了 {sum(results)} 个字符")

if __name__ == "__main__":
    asyncio.run(main_asyncio())

并行计算库(Ray)

适用于分布式计算、机器学习任务,支持大规模并行处理。
优点:简单易用,支持跨节点分布式计算。
缺点:需要额外安装依赖,轻量级任务可能不划算。

import ray
import time

# 初始化Ray
ray.init()

@ray.remote
def process_data(data_id):
    """模拟数据处理任务"""
    time.sleep(1)  # 模拟计算时间
    return f"处理完成的数据 {data_id}"

def main_ray():
    # 模拟10个数据任务
    data_ids = list(range(10))
    
    # 并行处理数据
    start_time = time.time()
    results = ray.get([process_data.remote(i) for i in data_ids])
    print(f"并行处理耗时: {time.time() - start_time:.2f}秒")
    
    # 串行处理数据(仅作对比)
    start_time = time.time()
    for i in data_ids:
        process_data(i)
    print(f"串行处理耗时: {time.time() - start_time:.2f}秒")
    
    print("结果:", results)

if __name__ == "__main__":
    main_ray()

五、 测试框架库

pdb

timeit

Selenium

web 自动化测试库

unittest

单元测试

六、数据处理分析库

1. 数据科学与分析

pandas :数据处理与分析

在这里插入代码片

numpy :科学计算

在这里插入代码片

七、数据可视化库

matplotlib :数据可视化

在这里插入代码片

八、文本处理库

configparser

配置文件自动解析

# config.ini
[Settings]
debug = true
port = 8080

# 读取配置
import configparser

config = configparser.ConfigParser()
config.read("config.ini")
debug = config.getboolean("Settings", "debug")
port = config.getint("Settings", "port")

九、数据库操作库

一、关系型数据库

1. SQLite(内置)

特点 :文件型数据库,无需服务器,适合小型应用。

import sqlite3

# 连接数据库(自动创建文件)
conn = sqlite3.connect('example.db')
cursor = conn.cursor()

# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
    id INTEGER PRIMARY KEY,
    name TEXT,
    age INTEGER
)
''')

# 插入数据
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice', 30))
conn.commit()

# 查询数据
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
for row in rows:
    print(row)  # (1, 'Alice', 30)

# 参数化查询
age = 25
cursor.execute("SELECT * FROM users WHERE age > ?", (age,))

conn.close()
2. MySQL(第三方库)

mysql-connector-pythonpymysql

pip install mysql-connector-python
import mysql.connector

# 连接数据库
conn = mysql.connector.connect(
    host="localhost",
    user="your_username",
    password="your_password",
    database="your_database"
)
cursor = conn.cursor()

# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
    id INT PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(255),
    price DECIMAL(10, 2)
)
''')

# 插入数据
cursor.execute("INSERT INTO products (name, price) VALUES (%s, %s)", ('Apple', 5.99))
conn.commit()

# 查询数据
cursor.execute("SELECT * FROM products")
for row in cursor:
    print(row)

conn.close()
3. PostgreSQL(第三方库)

psycopg2asyncpg(异步)

pip install psycopg2-binary
import psycopg2

# 连接数据库
conn = psycopg2.connect(
    host="localhost",
    user="your_username",
    password="your_password",
    database="your_database",
    port="5432"
)
cursor = conn.cursor()

# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
    id SERIAL PRIMARY KEY,
    title VARCHAR(255),
    author VARCHAR(255)
)
''')

# 插入数据
cursor.execute("INSERT INTO books (title, author) VALUES (%s, %s)", ('Python Crash Course', 'Eric Matthes'))
conn.commit()

# 查询数据
cursor.execute("SELECT * FROM books")
rows = cursor.fetchall()

conn.close()

二、ORM(对象关系映射)

1. SQLAlchemy(通用 ORM)

特点 :支持多种数据库,提供高级抽象。

pip install sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# 创建引擎(以 SQLite 为例)
engine = create_engine('sqlite:///example_orm.db')
Base = declarative_base()

# 定义模型
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, primary_key=True)
    name = Column(String)
    age = Column(Integer)

# 创建表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 添加数据
user = User(name='Charlie', age=35)
session.add(user)
session.commit()

# 查询数据
users = session.query(User).filter(User.age > 30).all()
for u in users:
    print(u.name, u.age)

session.close()

三、异步数据库操作

1. Asyncpg(PostgreSQL 异步驱动)
pip install asyncpg 
import asyncpg

async def main():
    conn = await asyncpg.connect(
        user='user',
        password='password',
        database='mydb',
        host='127.0.0.1'
    )
    
    # 创建表
    await conn.execute('''
        CREATE TABLE IF NOT EXISTS users (
            id serial PRIMARY KEY,
            name text,
            email text UNIQUE
        )
    ''')
    
    # 插入数据
    await conn.execute('INSERT INTO users(name, email) VALUES($1, $2)', 'Alice', 'alice@example.com')
    
    # 查询数据
    rows = await conn.fetch('SELECT * FROM users')
    for row in rows:
        print(row)
    
    await conn.close()

# 运行异步函数
import asyncio
asyncio.run(main())

网站公告

今日签到

点亮在社区的每一天
去签到