文章目录
Python 高频库及常用函数
一、python3.x 内置库
1. 内置库核心模块
1. 内置函数(Built-in Functions)
- 高频函数 :
print()
,len()
,range()
,input()
,open()
,type()
,dir()
,help()
def my_test2():
# range 生成 0 到 4 的整数序列
for i in range(5):
print(i, end=' ') # 输出: 0 1 2 3 4
# 指定起始和结束值
for i in range(2, 6):
print(i, end=' ') # 输出: 2 3 4 5
# input 输入
name = input("请输入你的名字: ")
print(f"你好, {name}")
# open 读取文件
with open("test.txt", "r", encoding='utf-8') as f:
content = f.read()
print(content)
# 写入文件
with open("output.txt", "w", encoding='utf-8') as f:
f.write("Hello Python!")
# type() get the type of object
print(type(123)) # 输出: <class 'int'>
print(type("hello")) # 输出: <class 'str'>
print(type([1, 2, 3])) # 输出: <class 'list'>
- 高级函数 :
map()
,filter()
,reduce()
,zip()
,sorted()
,reversed()
def my_test3():
# map():对可迭代对象的每个元素应用函数
nums = [1, 2, 3, 4]
squares = map(lambda x: x ** 2, nums)
print(list(squares)) # 输出: [1, 4, 9, 16]
# filter():过滤可迭代对象的元素
nums = [1, 2, 3, 4, 5, 6]
evens = filter(lambda x: x % 2 == 0, nums)
print(list(evens)) # 输出: [2, 4, 6]
# reduce():累积地对序列的元素应用函数(需导入)
nums = [1, 2, 3, 4]
product = reduce(lambda x, y: x * y, nums)
print(product) # 输出: 24 (1*2*3*4)
# zip():将多个可迭代对象的元素打包成元组
names = ["Alice", "Bob"]
ages = [25, 30]
zipped = zip(names, ages)
print(list(zipped)) # 输出: [('Alice', 25), ('Bob', 30)]
# sorted():返回新的已排序列表
nums = [3, 1, 4, 1, 5]
sorted_nums = sorted(nums)
print(sorted_nums) # 输出: [1, 1, 3, 4, 5]
# reversed():返回反向迭代器
lst = [1, 2, 3]
reversed_lst = list(reversed(lst))
print(reversed_lst) # 输出: [3, 2, 1]
2. 数据结构模块
基础数据结构
1. 列表(List)
- 特点 :可变、有序、允许重复元素。
- 语法 :用方括号
[]
定义。
2. 元组(Tuple)
- 特点 :不可变、有序、允许重复元素。
- 语法 :用圆括号
()
或逗号分隔定义。
3. 字典(Dict)
- 特点 :键值对存储、无序(Python 3.7+ 后有序)、键唯一。
- 语法 :用花括号
{}
和冒号key:value
定义。
4. 集合(Set)
- 特点 :无序、唯一、元素不可变(但集合本身可变)。
- 语法 :用花括号
{}
或set()
函数定义(空集合必须用set()
)。
6. 字符串(String)
- 特点 :不可变、有序的字符序列。
- 语法 :用单引号
'
、双引号"
或三引号'''
定义。
7. 字节(Bytes)
- 特点 :不可变的二进制数据,范围 0-255。
- 语法 :用
b'...'
定义。
8. 字节数组(Bytearray)
- 特点 :可变的二进制数据。
- 语法 :用
bytearray()
函数定义。
def my_test1():
# 用单引号 '、双引号 " 或三引号 ''' 定义。
variable = 'var1'
variable_ = 'variable2'
print(f'this is print function, {variable}') # print with f (f means format)
print(f'{0.987654321:.4f}')
print(len(variable_)) # len() function
print(variable_[0])
# 字节数组
data_byte = b'hello'
print(data_byte.decode('utf-8'))
print(data_byte[0])
print(data_byte)
# 字符-字节转换
data_byte_ = 'hello'.encode('utf-8')
print(data_byte_.decode('utf-8'))
# 字节数组修改
data = bytearray(b"hello3")
data[0] = 74 # 修改为 'J' (ASCII 74)
print(data.decode('utf-8')) # 输出:bytearray(b'Jello')
lst = [1, 2, 3, 4, 5, 7, "8"] # 列表 可变、有序、允许重复元素,元素类型可不同
tuple_ = (3, 4, '5') # 元组 不可变、有序、允许重复元素。
dict_ = {"name": "Alice", "age": 30} # 字典 键值对存储、无序(Python 3.7+ 后有序)、键唯一。
set_ = {1, 2, 3, 3} # 无序、唯一、元素不可变(但集合本身可变)。
lst.append("date") # 添加元素
lst[0] = "avocado" # 修改元素
idx = 0
for l in lst:
print(f'{l}', end='')
idx += 1
if idx < len(lst):
print(' -> ', end='')
print("")
for i in range(len(lst)):
print(f'index {i} : {lst[i]}')
print(lst)
# ========================
for value in tuple_:
print(f' {value} ', end='')
print("")
for i in range(len(tuple_)):
print(f'index {i} : {tuple_[i]}')
print(tuple_)
# =========================
for key, value in dict_.items():
print(f'key: {key} value: {value}')
for value in set_:
print(f" {value} ", end='')
print('')
高级数据结构collections
Counter
:统计元素频率DefaultDict
:带默认值的字典OrderedDict
:有序字典deque
:双端队列(高效的插入 / 删除)数组(Array)
:高效存储单一类型的数组(需导入array.array
)。堆队列(Heap Queue)
:实现最小堆的模块(需导入heapq
)。双端队列(Deque)
:高效的双向队列(需导入collections.deque
)。
在这里插入代码片
3. 字符串处理
re
(正则表达式)
re.search()
,re.match()
,re.findall()
,re.sub()
# string 连接
words = ["Python", "is", "awesome"]
result = " ".join(words) # 输出: "Python is awesome"
# 字符串格式化
name = "Bob"
age = 30
message = f"My name is {name} and I'm {age} years old."
# 输出: "My name is Bob and I'm 30 years old."
#字符串查找与替换
text = "Hello, World!"
pos = text.find("World") # 输出: 7
text = "banana"
count = text.count("a") # 统计,输出: 3
# 字符串查找与替换
text = "Hello, World!"
pos = text.find("World") # 输出: 7
text = "banana"
count = text.count("a") # 输出: 3
text = "Hello, World!"
new_text = text.replace("World", "Python") # 输出: "Hello, Python!"
# 按分隔符分割字符串为列表:
text = "apple,banana,orange"
fruits = text.split(",") # 输出: ["apple", "banana", "orange"]
# 字符串分割与合并
# 按分隔符分割字符串为列表:
text = "apple,banana,orange"
fruits = text.split(",") # 输出: ["apple", "banana", "orange"]
#按分隔符将字符串分为三部分:
text = "Hello,World"
result = text.partition(",") # 输出: ("Hello", ",", "World")
# 检查字符串开头或结尾:
text = "Hello, World!"
is_start = text.startswith("Hello") # 输出: True
is_end = text.endswith("!") # 输出: True
# 检查字符串是否全为字母、数字或字母数字:
"abc".isalpha() # 输出: True
"123".isdigit() # 输出: True
"abc123".isalnum() # 输出: True
# 去除两侧、左侧或右侧的空白字符:
text = " Hello "
clean_text = text.strip() # 输出: "Hello"
left_clean = text.lstrip() # 输出: "Hello "
# 处理字符编码(如 UTF-8):
text = "你好"
encoded = text.encode("utf-8") # 输出: b'\xe4\xbd\xa0\xe5\xa5\xbd'
decoded = encoded.decode("utf-8") # 输出: "你好"
# 返回字符串的转义表示:
text = 'Hello\nWorld'
print(repr(text)) # 输出: 'Hello\nWorld'
# 使用正则表达式处理复杂替换:
import re
text = "Hello 123 World 456"
clean_text = re.sub(r'\d+', '', text) # 移除所有数字
# 输出: "Hello World "
2. 文件与目录操作
os
与 pathlib
在 Python 中,os
和 pathlib
是处理文件路径和操作系统相关功能的两个核心库。os
是早期标准库,提供面向过程的函数;而pathlib
(Python 3.4+)则采用面向对象的方式,代码更简洁。以下是它们的常用功能对比和示例:
一、路径操作(核心功能)
1. 获取当前工作目录
# os 方式
import os
cwd = os.getcwd()
print(cwd) # 输出:/Users/username/project
# pathlib 方式
from pathlib import Path
cwd = Path.cwd()
print(cwd) # 输出:PosixPath('/Users/username/project')
2. 拼接路径
# os 方式(需使用 os.path.join)
file_path = os.path.join(cwd, "data", "test.txt")
print(file_path) # 输出:/Users/username/project/data/test.txt
# pathlib 方式(直接用斜杠 /)
file_path = Path(cwd) / "data" / "test.txt"
print(file_path) # 输出:PosixPath('/Users/username/project/data/test.txt')
3. 拆分路径组件
path = "/home/user/docs/report.pdf"
# os 方式
dir_name = os.path.dirname(path) # 输出:/home/user/docs
base_name = os.path.basename(path) # 输出:report.pdf
file_name, ext = os.path.splitext(path) # 输出:('report', '.pdf')
# pathlib 方式
path_obj = Path(path)
print(path_obj.parent) # 输出:PosixPath('/home/user/docs')
print(path_obj.name) # 输出:report.pdf
print(path_obj.stem) # 输出:report
print(path_obj.suffix) # 输出:.pdf
二、文件和目录操作
1. 检查路径是否存在
# os 方式
os.path.exists("/tmp") # 返回 True 或 False
# pathlib 方式
Path("/tmp").exists() # 返回 True 或 False
2. 创建目录
# 创建单层目录
new_dir = Path("new_folder")
new_dir.mkdir(exist_ok=True) # 等价于 os.makedirs("new_folder", exist_ok=True)
# 创建多层目录
Path("a/b/c").mkdir(parents=True, exist_ok=True) # 等价于 os.makedirs("a/b/c", exist_ok=True)
3. 重命名 / 移动文件
# os 方式
os.rename("old.txt", "new.txt") # 移动或重命名
# pathlib 方式
Path("old.txt").replace("new.txt") # 覆盖已存在文件
Path("old.txt").rename("new.txt") # 不允许覆盖
4. 删除文件 / 目录
# 删除文件
Path("temp.txt").unlink(missing_ok=True) # 等价于 os.remove("temp.txt")
# 删除空目录
Path("empty_dir").rmdir() # 等价于 os.rmdir("empty_dir")
# 删除非空目录(需使用 shutil)
import shutil
shutil.rmtree("full_dir")
三、文件属性和权限
1. 获取文件大小和修改时间
# os 方式
size = os.path.getsize("data.csv") # 单位:字节
mtime = os.path.getmtime("data.csv") # 修改时间戳
# pathlib 方式
file = Path("data.csv")
size = file.stat().st_size
mtime = file.stat().st_mtime
2. 修改文件权限
# os 方式
os.chmod("script.py", 0o755) # 755 权限
# pathlib 方式
Path("script.py").chmod(0o755)
四、文件遍历
1. 列出目录内容
# os 方式
for item in os.listdir("/tmp"):
print(item)
# pathlib 方式
for item in Path("/tmp").iterdir():
print(item) # 返回 Path 对象
2. 递归查找文件(通配符)
# 查找所有 .txt 文件
# os 方式(结合 os.walk)
for root, dirs, files in os.walk("."):
for file in files:
if file.endswith(".txt"):
print(os.path.join(root, file))
# pathlib 方式(更简洁)
for txt_path in Path(".").rglob("*.txt"):
print(txt_path)
五、环境变量和系统操作
1. 获取环境变量
# os 方式
home_dir = os.environ.get("HOME") # 等价于 os.getenv("HOME")
print(home_dir) # 输出:/Users/username
# pathlib 方式(结合 os)
home_dir = Path.home() # 等价于 Path(os.environ["HOME"])
2. 执行系统命令
# os 方式(推荐使用 subprocess)
os.system("ls -l") # 执行 shell 命令
# subprocess 方式(更安全)
import subprocess
result = subprocess.run(["ls", "-l"], capture_output=True, text=True)
print(result.stdout)
3. 日期与时间
3.1 time 库
一、基础时间模块:time
核心功能 :获取当前时间戳、休眠、CPU 时间测量
import time
# 1. 获取当前时间戳(秒)
timestamp = time.time() # 如:1696137123.456
print(f"时间戳: {timestamp}")
# 2. 程序休眠(暂停执行)
print("开始休眠...")
time.sleep(2) # 暂停2秒
print("休眠结束")
# 3. 测量代码执行时间
start_time = time.time()
# 执行一些操作
time.sleep(1)
end_time = time.time()
print(f"执行耗时: {end_time - start_time:.2f}秒")
# 4. 格式化时间(不常用,更推荐datetime)
struct_time = time.localtime(timestamp)
formatted_time = time.strftime("%Y-%m-%d %H:%M:%S", struct_time)
print(f"格式化时间: {formatted_time}")
3.2 datetime 库
在 Python 中,datetime
库是处理日期和时间的标准库,功能强大且易于使用。下面从基本概念到高级应用,详细介绍其核心组件和用法。
一、核心类
datetime
库包含 4 个主要类:
date
:处理年月日(如:2023-10-01)time
:处理时分秒(如:14:30:25)datetime
:同时处理日期和时间(继承自date
)timedelta
:表示时间间隔(如:3 天 5 小时)
二、基本用法
1. 获取当前日期和时间
from datetime import datetime, date, time
# 当前日期和时间
now = datetime.now() # 输出:2023-10-01 14:30:25.123456
print(now)
# 仅日期
today = date.today() # 输出:2023-10-01
print(today)
# 仅时间
current_time = now.time() # 输出:14:30:25.123456
print(current_time)
2. 创建指定日期和时间
# 创建datetime对象
dt = datetime(2023, 10, 1, 14, 30, 25) # 2023-10-01 14:30:25
print(dt)
# 创建date对象
d = date(2023, 10, 1) # 2023-10-01
print(d)
# 创建time对象
t = time(14, 30, 25) # 14:30:25
print(t)
3. 日期和时间的属性
dt = datetime.now()
print(dt.year) # 年:2023
print(dt.month) # 月:10
print(dt.day) # 日:1
print(dt.hour) # 时:14
print(dt.minute) # 分:30
print(dt.second) # 秒:25
print(dt.weekday()) # 星期几(0=周一,6=周日)
三、日期和时间的计算(timedelta
)
timedelta
用于计算日期 / 时间的差值或偏移。
1. 计算时间间隔
from datetime import timedelta
# 创建时间间隔
delta = timedelta(days=3, hours=2, minutes=15)
print(delta) # 3 days, 2:15:00
# 日期偏移
today = date.today()
future = today + delta # 3天后
print(future)
# 计算两个日期的差值
d1 = date(2023, 10, 1)
d2 = date(2023, 10, 10)
diff = d2 - d1
print(diff.days) # 9天
2. 时间加减示例
now = datetime.now()
# 3小时后
future = now + timedelta(hours=3)
print(future)
# 2天前
past = now - timedelta(days=2)
print(past)
四、日期和字符串的转换
1. 日期→字符串(格式化)
使用strftime()
方法,通过格式代码(如%Y-%m-%d
)转换。
now = datetime.now()
# 转为指定格式的字符串
formatted = now.strftime("%Y-%m-%d %H:%M:%S") # 2023-10-01 14:30:25
print(formatted)
# 常用格式代码:
# %Y:四位数年份(2023)
# %m:两位数月份(01-12)
# %d:两位数日期(01-31)
# %H:24小时制小时(00-23)
# %M:分钟(00-59)
# %S:秒(00-59)
# %A:星期全称(Monday)
# %B:月份全称(October)
2. 字符串→日期(解析)
使用strptime()
方法,需指定字符串格式。
date_str = "2023-10-01 14:30:25"
dt = datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S")
print(dt) # 2023-10-01 14:30:25
五、时区处理
datetime
默认是无时区的,可通过tzinfo
属性添加时区信息。
1. 使用pytz
库(需安装)
from datetime import datetime
import pytz
# 创建带时区的datetime
utc_now = datetime.now(pytz.UTC) # UTC时间
print(utc_now)
# 转换时区
shanghai_tz = pytz.timezone("Asia/Shanghai")
shanghai_now = utc_now.astimezone(shanghai_tz)
print(shanghai_now) # UTC+8时间
2. Python 3.9+ 的zoneinfo
模块(内置)
from datetime import datetime
from zoneinfo import ZoneInfo
# 创建带时区的datetime
utc_now = datetime.now(ZoneInfo("UTC"))
print(utc_now)
# 转换时区
shanghai_now = utc_now.astimezone(ZoneInfo("Asia/Shanghai"))
print(shanghai_now)
六、高级用法
1. 判断日期前后
d1 = date(2023, 10, 1)
d2 = date(2023, 10, 2)
print(d1 < d2) # True
print(d1 > d2) # False
2. 计算月份差
from dateutil.relativedelta import relativedelta
d1 = date(2023, 1, 1)
d2 = date(2023, 10, 1)
delta = relativedelta(d2, d1)
print(delta.months) # 9个月
3. 获取当月最后一天
from calendar import monthrange
year, month = 2023, 10
last_day = monthrange(year, month)[1] # 31
print(last_day)
七、总结
- 基础操作 :用
date
、time
、datetime
创建和操作日期时间。 - 时间间隔 :用
timedelta
进行日期加减。 - 格式转换 :
strftime()
格式化日期为字符串,strptime()
解析字符串为日期。 - 时区处理 :用
pytz
或zoneinfo
处理时区。
4. JSON 处理
一、选择建议
- 基础需求 :使用标准库
json
- 高性能 :使用
ujson
或orjson
- 特殊类型支持 :使用
simplejson
- 模式验证 :使用
jsonschema
- 大文件处理 :使用流式解析或 JSON Lines 格式
一、标准库:json
(核心)
特点 :Python 内置,提供基础 JSON 序列化 / 反序列化功能。
import json
# 1. json.dumps(obj) - 将 Python 对象转为 JSON 字符串
data = {
"name": "Alice",
"age": 30,
"hobbies": ["reading", "swimming"],
"is_student": False,
"address": {"city": "Beijing", "zip": "100000"}
}
json_str = json.dumps(data, indent=2) # 缩进2个空格,美化输出
print(json_str)
# 输出:
# {
# "name": "Alice",
# "age": 30,
# "hobbies": ["reading", "swimming"],
# "is_student": false,
# "address": {"city": "Beijing", "zip": "100000"}
# }
# 2. json.loads(json_str) - 将 JSON 字符串转为 Python 对象
parsed_data = json.loads(json_str)
print(parsed_data["age"]) # 30
# 3. json.dump(obj, file) - 将对象写入 JSON 文件
with open('data.json', 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2) # ensure_ascii=False 支持中文
# 4. json.load(file) - 从 JSON 文件读取对象
with open('data.json', 'r', encoding='utf-8') as f:
loaded_data = json.load(f)
2. 关键参数
indent
:指定缩进空格数(美化输出)ensure_ascii
:是否强制 ASCII 编码(处理中文需设为False
)sort_keys
:是否按键排序(True
/False
)default
:自定义序列化函数(处理特殊对象)
二、第三方库:ujson
(高性能)
特点 :速度快,占用内存少,API 与 json
兼容。
pip install ujson
import ujson
# 用法与 json 完全一致
data = {"key": "value"}
json_str = ujson.dumps(data)
parsed = ujson.loads(json_str)
三、第三方库:simplejson
(兼容性强)
特点 :支持更多 Python 数据类型(如 Decimal
),兼容 Python 2/3。
pip install simplejson
import simplejson
from decimal import Decimal
data = {"price": Decimal("9.99")}
# 使用 default 参数处理特殊类型
json_str = simplejson.dumps(data, default=str)
四、处理特殊对象
1. 自定义对象序列化
import json
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
# 方法1:自定义序列化函数
def person_to_dict(obj):
if isinstance(obj, Person):
return {"name": obj.name, "age": obj.age}
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
p = Person("Bob", 25)
json_str = json.dumps(p, default=person_to_dict)
# 方法2:在类中实现 to_dict 方法
class Person:
def __init__(self, name, age):
self.name = name
self.age = age
def to_dict(self):
return {"name": self.name, "age": self.age}
json_str = json.dumps(p.to_dict())
2. 处理 datetime 类型
import json
from datetime import datetime
data = {"timestamp": datetime.now()}
# 自定义序列化函数
def datetime_to_str(obj):
if isinstance(obj, datetime):
return obj.strftime("%Y-%m-%d %H:%M:%S")
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
json_str = json.dumps(data, default=datetime_to_str)
五、流式处理大文件
场景 :处理超过内存大小的 JSON 文件。
1. 逐行解析 JSON Lines 格式
# 示例 JSON Lines 文件 (data.jsonl)
# {"name": "Alice", "age": 30}
# {"name": "Bob", "age": 25}
with open('data.jsonl', 'r') as f:
for line in f:
obj = json.loads(line)
print(obj["name"])
2. 使用 json.JSONDecoder
解析不完整 JSON
decoder = json.JSONDecoder()
buffer = ""
with open('large_data.json', 'r') as f:
for chunk in f:
buffer += chunk
while buffer:
try:
obj, pos = decoder.raw_decode(buffer)
process(obj) # 处理解析出的对象
buffer = buffer[pos:].lstrip()
except json.JSONDecodeError:
break # 等待更多数据
六、JSON 与 CSV 转换
import json
import csv
# JSON 转 CSV
with open('data.json', 'r') as f:
data = json.load(f)
with open('output.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=["name", "age", "city"])
writer.writeheader()
writer.writerows(data)
# CSV 转 JSON
with open('input.csv', 'r') as f:
reader = csv.DictReader(f)
data = list(reader)
with open('output.json', 'w') as f:
json.dump(data, f, indent=2)
1. 常用方法
json
json.dumps()
,json.loads()
,json.dump()
,json.load()
5. 内置装饰器
1.@staticmethod
将方法转换为静态方法,不需要实例或类作为第一个参数
2. @classmethod
: 定义一个类方法,第一个参数是类对象(通常命名为cls),允许类操作自身。
2.@abstractmethod
定义抽象方法,需要配合 abc
模块使用,表示该方法必须在子类中被实现,否则会抛出异常。
3. @functools.lru_cache
缓存函数结果,适用于计算密集型且参数相同的函数
4. @property
: 将一个方法变成属性,允许像访问属性一样访问这个方法,主要用于提供封装性、安全性和便利性
5. @functools.wraps
- 用于在自定义装饰器中保留原始函数的元数据
import functools
def my_decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
@timeit
- 自定义装饰器,用于测量函数执行时间
import time
def timeit(func):
def wrapper(*args, **kwargs):
start = time.time()
result = func(*args, **kwargs)
end = time.time()
print(f"函数 {func.__name__} 执行时间: {end - start} 秒")
return result
return wrapper
@retry
- 自定义装饰器,用于重试失败的函数
def retry(max_attempts=3, delay=1):
def decorator(func):
def wrapper(*args, **kwargs):
attempts = 0
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempts += 1
if attempts == max_attempts:
raise
time.sleep(delay)
return wrapper
return decorator
二、Web开发库
Flask
:轻量级 Web 框架
在这里插入代码片
Django
:全功能 Web 框架
在这里插入代码片
三、网络请求库
以下是 Python 中常用的网络请求库及其典型示例代码,按使用场景分类整理:
一、标准库:urllib
特点 :Python 内置,无需安装,适合简单请求。
# GET 请求
from urllib.request import urlopen
with urlopen('https://api.example.com/data') as response:
data = response.read().decode('utf-8')
print(data)
# POST 请求
from urllib.parse import urlencode
from urllib.request import Request, urlopen
data = {'key': 'value'}
params = urlencode(data).encode('utf-8')
request = Request('https://api.example.com/submit', data=params)
with urlopen(request) as response:
result = response.read()
二、第三方库:requests
(最流行)
特点 :API 简洁,支持会话、文件上传、超时设置等。
pip install requests
import requests
# GET 请求
response = requests.get('https://api.example.com/data')
print(response.status_code) # 状态码
print(response.json()) # 解析 JSON
# POST 请求
data = {'key': 'value'}
response = requests.post('https://api.example.com/submit', json=data)
# 带参数的请求
params = {'page': 1, 'size': 10}
response = requests.get('https://api.example.com/list', params=params)
# 会话保持(自动处理 cookies)
session = requests.Session()
session.get('https://example.com/login') # 登录
session.get('https://example.com/dashboard') # 访问需要登录的页面
三、异步请求:aiohttp
特点 :基于 asyncio
,支持异步请求,适合高并发场景。
pip install aiohttp
import asyncio
import aiohttp
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['https://api.example.com/1', 'https://api.example.com/2']
tasks = [fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
四、高性能:httpx
特点 :兼容 requests
API,支持同步 / 异步,HTTP/2。
pip install httpx
import httpx
# 同步请求
response = httpx.get('https://api.example.com/data')
print(response.json())
# 异步请求
async def fetch_async():
async with httpx.AsyncClient() as client:
response = await client.get('https://api.example.com/data')
return response.json()
# HTTP/2 请求
response = httpx.get('https://http2.akamai.com/demo', http2=True)
五、底层控制:socket
特点 :Python 内置,适合实现自定义协议。
import socket
# 创建 TCP 套接字
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(('www.example.com', 80))
s.sendall(b'GET / HTTP/1.1\r\nHost: www.example.com\r\n\r\n')
data = s.recv(1024)
print(data.decode('utf-8'))
六、文件上传示例
import requests
# 单文件上传
files = {'file': open('example.txt', 'rb')}
response = requests.post('https://api.example.com/upload', files=files)
# 多文件上传
files = {
'file1': open('example1.txt', 'rb'),
'file2': open('example2.txt', 'rb'),
}
response = requests.post('https://api.example.com/upload', files=files)
七、请求超时与重试
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# 设置重试策略
session = requests.Session()
retries = Retry(total=3, backoff_factor=1)
session.mount('https://', HTTPAdapter(max_retries=retries))
# 带超时的请求
try:
response = session.get('https://api.example.com', timeout=5) # 5秒超时
except requests.exceptions.Timeout:
print('请求超时')
八、选择建议
- 简单请求 :优先使用
requests
- 异步高并发 :使用
aiohttp
或httpx
- 底层控制 :使用
socket
或asyncio
原生套接字 - 仅处理 JSON API :考虑
requests
或httpx
四、并发编程库
选择建议
- I/O 密集型任务:优先使用 asyncio,其次是 threading。
- CPU 密集型任务:使用 multiprocessing。
- 分布式计算:使用 ray 或 dask。
这些示例展示了 Python 中常见的并行编程方法,根据任务类型选择合适的方式可以显著提升程序性能。
多线程(threading)
适用于 I/O 密集型任务(如网络请求、文件读写),受 GIL(全局解释器锁)限制,不适合 CPU 密集型任务。
优点:轻量级,适合 I/O 等待时间长的场景。
缺点:受 GIL 限制,无法充分利用多核 CPU。
import threading
import requests
import time
def download_image(url, name):
"""下载图片并保存到本地"""
start_time = time.time()
response = requests.get(url)
with open(f"image_{name}.jpg", "wb") as f:
f.write(response.content)
print(f"图片 {name} 下载完成,耗时: {time.time() - start_time:.2f}秒")
def main_threading():
# 示例图片URL
urls = [
"https://picsum.photos/seed/1/800/600",
"https://picsum.photos/seed/2/800/600",
"https://picsum.photos/seed/3/800/600",
"https://picsum.photos/seed/4/800/600",
"https://picsum.photos/seed/5/800/600"
]
# 创建线程列表
threads = []
for i, url in enumerate(urls):
thread = threading.Thread(target=download_image, args=(url, i))
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
if __name__ == "__main__":
main_threading()
多进程(multiprocessing)
适用于 CPU 密集型任务(如科学计算、数据处理),每个进程有独立的 Python 解释器,不受 GIL 限制。
优点:充分利用多核 CPU,适合计算密集型任务。
缺点:进程创建开销大,进程间通信复杂。
import multiprocessing
import math
import time
def is_prime(n):
"""判断一个数是否为素数"""
if n <= 1:
return False
if n <= 3:
return True
if n % 2 == 0 or n % 3 == 0:
return False
for i in range(5, int(math.sqrt(n)) + 1, 6):
if n % i == 0 or n % (i + 2) == 0:
return False
return True
def count_primes(start, end, result_queue):
"""统计指定范围内的素数数量"""
count = 0
for num in range(start, end):
if is_prime(num):
count += 1
result_queue.put(count)
def main_multiprocessing():
# 计算范围
n = 1000000
num_processes = multiprocessing.cpu_count()
chunk_size = n // num_processes
# 创建进程和结果队列
processes = []
result_queue = multiprocessing.Queue()
# 启动多个进程
for i in range(num_processes):
start = i * chunk_size
end = (i + 1) * chunk_size if i < num_processes - 1 else n
p = multiprocessing.Process(target=count_primes, args=(start, end, result_queue))
processes.append(p)
p.start()
# 等待所有进程完成并收集结果
total_primes = 0
for p in processes:
p.join()
while not result_queue.empty():
total_primes += result_queue.get()
print(f"1到{n}之间共有{total_primes}个素数")
if __name__ == "__main__":
main_multiprocessing()
异步编程(asyncio)
适用于高并发 I/O 任务(如网络爬虫、服务器),通过协程(coroutine)实现非阻塞 I/O。
优点:极高的并发性能,资源消耗少。
缺点:调试复杂,需要使用异步库。
import asyncio
import aiohttp
import time
async def fetch(session, url, name):
"""异步获取网页内容"""
start_time = time.time()
async with session.get(url) as response:
html = await response.text()
print(f"网页 {name} 下载完成,耗时: {time.time() - start_time:.2f}秒")
return len(html)
async def main_asyncio():
# 示例URL
urls = [
"https://www.example.com",
"https://www.python.org",
"https://www.github.com",
"https://www.google.com",
"https://www.yahoo.com"
]
async with aiohttp.ClientSession() as session:
tasks = []
for i, url in enumerate(urls):
task = asyncio.create_task(fetch(session, url, i))
tasks.append(task)
# 等待所有任务完成
results = await asyncio.gather(*tasks)
print(f"总共下载了 {sum(results)} 个字符")
if __name__ == "__main__":
asyncio.run(main_asyncio())
并行计算库(Ray)
适用于分布式计算、机器学习任务,支持大规模并行处理。
优点:简单易用,支持跨节点分布式计算。
缺点:需要额外安装依赖,轻量级任务可能不划算。
import ray
import time
# 初始化Ray
ray.init()
@ray.remote
def process_data(data_id):
"""模拟数据处理任务"""
time.sleep(1) # 模拟计算时间
return f"处理完成的数据 {data_id}"
def main_ray():
# 模拟10个数据任务
data_ids = list(range(10))
# 并行处理数据
start_time = time.time()
results = ray.get([process_data.remote(i) for i in data_ids])
print(f"并行处理耗时: {time.time() - start_time:.2f}秒")
# 串行处理数据(仅作对比)
start_time = time.time()
for i in data_ids:
process_data(i)
print(f"串行处理耗时: {time.time() - start_time:.2f}秒")
print("结果:", results)
if __name__ == "__main__":
main_ray()
五、 测试框架库
pdb
timeit
Selenium
web 自动化测试库
unittest
单元测试
六、数据处理分析库
1. 数据科学与分析
pandas
:数据处理与分析
在这里插入代码片
numpy
:科学计算
在这里插入代码片
七、数据可视化库
matplotlib
:数据可视化
在这里插入代码片
八、文本处理库
configparser
配置文件自动解析
# config.ini
[Settings]
debug = true
port = 8080
# 读取配置
import configparser
config = configparser.ConfigParser()
config.read("config.ini")
debug = config.getboolean("Settings", "debug")
port = config.getint("Settings", "port")
九、数据库操作库
一、关系型数据库
1. SQLite(内置)
特点 :文件型数据库,无需服务器,适合小型应用。
import sqlite3
# 连接数据库(自动创建文件)
conn = sqlite3.connect('example.db')
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
age INTEGER
)
''')
# 插入数据
cursor.execute("INSERT INTO users (name, age) VALUES (?, ?)", ('Alice', 30))
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM users")
rows = cursor.fetchall()
for row in rows:
print(row) # (1, 'Alice', 30)
# 参数化查询
age = 25
cursor.execute("SELECT * FROM users WHERE age > ?", (age,))
conn.close()
2. MySQL(第三方库)
库 :mysql-connector-python
或 pymysql
pip install mysql-connector-python
import mysql.connector
# 连接数据库
conn = mysql.connector.connect(
host="localhost",
user="your_username",
password="your_password",
database="your_database"
)
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS products (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255),
price DECIMAL(10, 2)
)
''')
# 插入数据
cursor.execute("INSERT INTO products (name, price) VALUES (%s, %s)", ('Apple', 5.99))
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM products")
for row in cursor:
print(row)
conn.close()
3. PostgreSQL(第三方库)
库 :psycopg2
或 asyncpg
(异步)
pip install psycopg2-binary
import psycopg2
# 连接数据库
conn = psycopg2.connect(
host="localhost",
user="your_username",
password="your_password",
database="your_database",
port="5432"
)
cursor = conn.cursor()
# 创建表
cursor.execute('''
CREATE TABLE IF NOT EXISTS books (
id SERIAL PRIMARY KEY,
title VARCHAR(255),
author VARCHAR(255)
)
''')
# 插入数据
cursor.execute("INSERT INTO books (title, author) VALUES (%s, %s)", ('Python Crash Course', 'Eric Matthes'))
conn.commit()
# 查询数据
cursor.execute("SELECT * FROM books")
rows = cursor.fetchall()
conn.close()
二、ORM(对象关系映射)
1. SQLAlchemy(通用 ORM)
特点 :支持多种数据库,提供高级抽象。
pip install sqlalchemy
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
# 创建引擎(以 SQLite 为例)
engine = create_engine('sqlite:///example_orm.db')
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String)
age = Column(Integer)
# 创建表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 添加数据
user = User(name='Charlie', age=35)
session.add(user)
session.commit()
# 查询数据
users = session.query(User).filter(User.age > 30).all()
for u in users:
print(u.name, u.age)
session.close()
三、异步数据库操作
1. Asyncpg(PostgreSQL 异步驱动)
pip install asyncpg
import asyncpg
async def main():
conn = await asyncpg.connect(
user='user',
password='password',
database='mydb',
host='127.0.0.1'
)
# 创建表
await conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id serial PRIMARY KEY,
name text,
email text UNIQUE
)
''')
# 插入数据
await conn.execute('INSERT INTO users(name, email) VALUES($1, $2)', 'Alice', 'alice@example.com')
# 查询数据
rows = await conn.fetch('SELECT * FROM users')
for row in rows:
print(row)
await conn.close()
# 运行异步函数
import asyncio
asyncio.run(main())