python接口自动化的脚本

发布于:2024-06-28 ⋅ 阅读:(17) ⋅ 点赞:(0)

使用Requests库进行GET请求

Requests是Python中最常用的HTTP库,用于发送HTTP请求。下面是一个简单的GET请求示例,用于从API获取数据。

import requests
url = "https://api.example.com/data"
response = requests.get(url)
if response.status_code == 200:
    print("Data received:", response.json())
else:
    print("Request failed with status code:", response.status_code)

POST请求提交表单数据

向服务器提交数据,如登录表单,通常使用POST请求。


import requests
url = "https://api.example.com/login"
payload = {"username": "user@example.com", "password": "securepass"}
response = requests.post(url, data=payload)
if response.status_code == 200:
    print("Login successful!")
else:
    print("Login failed with status code:", response.status_code)

使用Session保持会话

对于需要保持登录状态的接口,可以使用Session对象。

import requests
s = requests.Session()
login_url = "https://api.example.com/login"
login_payload = {"username": "user@example.com", "password": "securepass"}
  # 登录
s.post(login_url, data=login_payload)
 # 现在使用同一个session访问需要登录的页面
protected_url = "https://api.example.com/profile"
response = s.get(protected_url)
if response.status_code == 200:
    print("Profile data received:", response.json())

异常处理与重试机制

添加异常处理和重试逻辑,提高脚本的健壮性。

import requests
from requests.exceptions import RequestException
import time
def fetch_data_with_retry(url, retries=3):
    for attempt in range(retries):
        try:
            response = requests.get(url)
            response.raise_for_status()  # 如果响应状态不是200,将抛出异常
            return response.json()
        except RequestException as e:
            print(f"Attempt {attempt + 1} failed. Retrying...")
            time.sleep(2 ** attempt)  # 重试间隔时间指数增长
    print("Failed to fetch data after all attempts.")
    return None
data = fetch_data_with_retry("https://api.example.com/data")

使用Params进行查询参数处理

在URL中添加查询参数时,可以使用params参数。

import requests
url = "https://api.example.com/search"
params = {"query": "python", "limit": 10}
response = requests.get(url, params=params)
results = response.json()
print("Search results:", results)

自动化API测试报告

结合unittest或pytest等测试框架,可以自动生成测试报告。

import requests
import unittest
class TestAPI(unittest.TestCase):
    def test_get_data(self):
        url = "https://api.example.com/data"
        response = requests.get(url)
        self.assertEqual(response.status_code, 200)
if __name__ == '__main__':
    unittest.main(testRunner=unittest.TextTestRunner())

使用JSON Schema验证响应

确保API响应的数据结构符合预期,可以使用jsonschema库进行验证。

import requests
import jsonschema
from jsonschema import validate
from jsonschema.exceptions import ValidationError
schema = {
    "type": "object",
    "properties": {
        "id": {"type": "integer"},
        "name": {"type": "string"},
    },
    "required": ["id", "name"],
}
url = "https://api.example.com/user/1"
response = requests.get(url)
try:
    validate(instance=response.json(), schema=schema)
    print("Response data is valid according to schema.")
except ValidationError as e:
    print("Validation error:", e.message)

文件上传

向API上传文件,如图片或文档。

import requests
url = "https://api.example.com/upload"
files = {'file': open('example.txt', 'rb')}
response = requests.post(url, files=files)
if response.status_code == 200:
    print("File uploaded successfully.")
else:
    print("Upload failed with status code:", response.status_code)

多线程/异步请求提高效率

使用多线程或多进程并发执行请求,提高处理速度。这里使用concurrent.futures模块实现多线程。

im

port requests
from concurrent.futures import ThreadPoolExecutor
urls = ["https://api.example.com/data1", "https://api.example.com/data2"]
def fetch_url(url):
    response = requests.get(url)
    return url, response.status_code
with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {executor.submit(fetch_url, url) for url in urls}
    for future in concurrent.futures.as_completed(futures):
        url, status = future.result()
        print(f"{url}: Status Code {status}")

使用requests-toolbelt处理大文件下载

处理大文件下载时,可以使用requests-toolbelt库的StreamingIterator,避免一次性加载整个文件到内存。

from requests_toolbelt.downloadutils import StreamingIterator
import requests
url = "https://example.com/largefile.zip"
response = requests.get(url, stream=True)
stream = StreamingIterator(response.iter_content(chunk_size=1024*1024),  # 每1MB读取一次
                          headers={'Accept-Encoding': None})  # 禁止自动解压,以便流式处理
with open('largefile.zip', 'wb') as f:
    for chunk in stream:
        f.write(chunk)
print("Download complete.")