Python与Web3.py库交互实践

发布于:2025-06-26 ⋅ 阅读:(21) ⋅ 点赞:(0)

目录

  • Python与Web3.py库交互实践
    • 引言:连接Python与区块链的桥梁
    • 1. 环境配置与基础连接
      • 1.1 安装Web3.py
      • 1.2 连接以太坊节点
    • 2. 基础区块链交互
      • 2.1 账户与余额查询
      • 2.2 创建并发送交易
    • 3. 智能合约交互
      • 3.1 加载和部署合约
      • 3.2 与已部署合约交互
    • 4. 高级功能实践
      • 4.1 事件监听
      • 4.2 与ERC-20代币交互
    • 5. 完整DApp示例:链上记事本
      • 5.1 智能合约
      • 5.2 Python后端
      • 5.3 前端界面
    • 6. 安全最佳实践
      • 6.1 私钥管理
      • 6.2 交易安全
    • 7. 高级技巧与优化
      • 7.1 批量查询
      • 7.2 Gas优化策略
    • 结论:成为区块链开发者
      • 下一步:

Python与Web3.py库交互实践

引言:连接Python与区块链的桥梁

Web3.py是以太坊官方推荐的Python接口库,它使开发者能够通过Python与以太坊区块链进行交互。本指南将带你从基础到高级应用,全面掌握使用Web3.py进行区块链交互的核心技能,包括智能合约部署、交易签名、事件监听等关键功能。

1. 环境配置与基础连接

1.1 安装Web3.py

pip install web3

1.2 连接以太坊节点

from web3 import Web3

# 使用Infura连接主网
infura_url = "https://mainnet.infura.io/v3/YOUR_INFURA_PROJECT_ID"
w3 = Web3(Web3.HTTPProvider(infura_url))

# 检查连接
if w3.is_connected():
    print("Connected to Ethereum Mainnet")
    print(f"Latest block: {w3.eth.block_number}")
else:
    print("Connection failed")

# 连接本地开发节点(如Ganache)
ganache_url = "http://127.0.0.1:7545"
w3_ganache = Web3(Web3.HTTPProvider(ganache_url))

2. 基础区块链交互

2.1 账户与余额查询

# 查询账户余额(单位:wei)
balance_wei = w3.eth.get_balance("0x742d35Cc6634C0532925a3b844Bc454e4438f44e")

# 转换单位
balance_eth = w3.from_wei(balance_wei, 'ether')
print(f"Balance: {balance_eth} ETH")

# 获取账户列表(Ganache)
ganache_accounts = w3_ganache.eth.accounts
print(f"Ganache accounts: {ganache_accounts}")

2.2 创建并发送交易

def send_eth_transaction(w3, sender, receiver, amount_eth, private_key):
    """发送ETH交易"""
    # 转换金额单位
    amount_wei = w3.to_wei(amount_eth, 'ether')
    
    # 获取nonce
    nonce = w3.eth.get_transaction_count(sender)
    
    # 构建交易字典
    tx = {
        'nonce': nonce,
        'to': receiver,
        'value': amount_wei,
        'gas': 21000,  # 标准ETH转账的gas limit
        'gasPrice': w3.to_wei('50', 'gwei'),
        'chainId': w3.eth.chain_id
    }
    
    # 签名交易
    signed_tx = w3.eth.account.sign_transaction(tx, private_key)
    
    # 发送交易
    tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
    
    # 等待交易确认
    receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
    return receipt

# 在Ganache上测试
sender = ganache_accounts[0]
receiver = ganache_accounts[1]
private_key = "0x..."  # 替换为Ganache中第一个账户的私钥

receipt = send_eth_transaction(w3_ganache, sender, receiver, 1.0, private_key)
print(f"Transaction receipt: {receipt}")

3. 智能合约交互

3.1 加载和部署合约

from solcx import compile_source

# 编译合约
contract_source_code = """
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

contract SimpleStorage {
    uint256 storedData;
    
    function set(uint256 x) public {
        storedData = x;
    }
    
    function get() public view returns (uint256) {
        return storedData;
    }
}
"""

# 编译合约
compiled_sol = compile_source(contract_source_code)
contract_interface = compiled_sol['<stdin>:SimpleStorage']

# 部署合约
def deploy_contract(w3, contract_interface, deployer_private_key):
    # 获取部署者账户
    deployer = w3.eth.account.from_key(deployer_private_key).address
    
    # 创建合约对象
    Contract = w3.eth.contract(
        abi=contract_interface['abi'],
        bytecode=contract_interface['bin']
    )
    
    # 构建部署交易
    tx = Contract.constructor().build_transaction({
        'from': deployer,
        'nonce': w3.eth.get_transaction_count(deployer),
        'gas': 2000000,
        'gasPrice': w3.to_wei('50', 'gwei')
    })
    
    # 签名并发送
    signed_tx = w3.eth.account.sign_transaction(tx, deployer_private_key)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
    
    # 等待部署完成
    receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
    return receipt.contractAddress

# 在Ganache上部署
contract_address = deploy_contract(w3_ganache, contract_interface, private_key)
print(f"Contract deployed at: {contract_address}")

3.2 与已部署合约交互

# 创建合约实例
contract = w3_ganache.eth.contract(
    address=contract_address,
    abi=contract_interface['abi']
)

# 调用只读函数
current_value = contract.functions.get().call()
print(f"Current stored value: {current_value}")

# 调用状态变更函数
def set_contract_value(w3, contract, new_value, caller_private_key):
    caller = w3.eth.account.from_key(caller_private_key).address
    
    # 构建交易
    tx = contract.functions.set(new_value).build_transaction({
        'from': caller,
        'nonce': w3.eth.get_transaction_count(caller),
        'gas': 2000000,
        'gasPrice': w3.to_wei('50', 'gwei')
    })
    
    # 签名并发送
    signed_tx = w3.eth.account.sign_transaction(tx, caller_private_key)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
    
    # 等待交易确认
    receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
    return receipt

# 更新合约状态
receipt = set_contract_value(w3_ganache, contract, 42, private_key)
print(f"Set value transaction receipt: {receipt}")

# 验证更新
updated_value = contract.functions.get().call()
print(f"Updated stored value: {updated_value}")

4. 高级功能实践

4.1 事件监听

# 增强合约以包含事件
contract_with_event = """
// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

contract EventfulStorage {
    uint256 storedData;
    event ValueChanged(address indexed author, uint256 newValue);
    
    function set(uint256 x) public {
        storedData = x;
        emit ValueChanged(msg.sender, x);
    }
    
    function get() public view returns (uint256) {
        return storedData;
    }
}
"""

# 部署增强版合约
compiled_event = compile_source(contract_with_event)
event_contract_interface = compiled_event['<stdin>:EventfulStorage']
event_contract_address = deploy_contract(w3_ganache, event_contract_interface, private_key)
event_contract = w3_ganache.eth.contract(
    address=event_contract_address,
    abi=event_contract_interface['abi']
)

# 创建事件过滤器
value_changed_filter = event_contract.events.ValueChanged.create_filter(
    fromBlock='latest'
)

# 监听事件
def event_listener():
    print("Listening for ValueChanged events...")
    while True:
        for event in value_changed_filter.get_new_entries():
            print(f"New event: Author={event.args.author}, Value={event.args.newValue}")
        time.sleep(2)

# 在后台线程运行监听器
import threading
thread = threading.Thread(target=event_listener, daemon=True)
thread.start()

# 触发事件(在另一个线程)
def trigger_events():
    for i in range(5):
        set_contract_value(w3_ganache, event_contract, i, private_key)
        time.sleep(3)

trigger_thread = threading.Thread(target=trigger_events)
trigger_thread.start()

4.2 与ERC-20代币交互

# ERC-20代币ABI(简化版)
erc20_abi = [
    {
        "name": "balanceOf",
        "outputs": [{"type": "uint256", "name": ""}],
        "inputs": [{"type": "address", "name": "account"}],
        "stateMutability": "view",
        "type": "function"
    },
    {
        "name": "transfer",
        "outputs": [{"type": "bool", "name": ""}],
        "inputs": [
            {"type": "address", "name": "recipient"},
            {"type": "uint256", "name": "amount"}
        ],
        "stateMutability": "nonpayable",
        "type": "function"
    }
]

# 连接DAI合约
dai_address = "0x6B175474E89094C44Da98b954EedeAC495271d0F"  # 主网DAI合约
dai_contract = w3.eth.contract(address=dai_address, abi=erc20_abi)

# 查询余额
wallet_address = "0x742d35Cc6634C0532925a3b844Bc454e4438f44e"
balance = dai_contract.functions.balanceOf(wallet_address).call()
print(f"DAI balance: {balance / (10 ** 18)} DAI")

# 转账函数(需要私钥)
def transfer_erc20(w3, contract, recipient, amount, sender_private_key):
    sender = w3.eth.account.from_key(sender_private_key).address
    
    # 构建交易
    tx = contract.functions.transfer(
        recipient,
        amount
    ).build_transaction({
        'from': sender,
        'nonce': w3.eth.get_transaction_count(sender),
        'gas': 100000,
        'gasPrice': w3.to_wei('50', 'gwei'),
        'chainId': w3.eth.chain_id
    })
    
    # 签名并发送
    signed_tx = w3.eth.account.sign_transaction(tx, sender_private_key)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
    
    # 等待确认
    receipt = w3.eth.wait_for_transaction_receipt(tx_hash)
    return receipt

# 注意:实际主网操作需要真实资金和谨慎处理私钥

5. 完整DApp示例:链上记事本

5.1 智能合约

// SPDX-License-Identifier: MIT
pragma solidity ^0.8.0;

contract OnchainNotepad {
    struct Note {
        string content;
        uint256 timestamp;
    }
    
    mapping(address => Note[]) private userNotes;
    
    event NoteAdded(address indexed user, uint256 index, string content);
    event NoteUpdated(address indexed user, uint256 index, string content);
    
    function addNote(string memory content) public {
        userNotes[msg.sender].push(Note(content, block.timestamp));
        emit NoteAdded(msg.sender, userNotes[msg.sender].length - 1, content);
    }
    
    function updateNote(uint256 index, string memory content) public {
        require(index < userNotes[msg.sender].length, "Note does not exist");
        userNotes[msg.sender][index].content = content;
        userNotes[msg.sender][index].timestamp = block.timestamp;
        emit NoteUpdated(msg.sender, index, content);
    }
    
    function getNoteCount() public view returns (uint256) {
        return userNotes[msg.sender].length;
    }
    
    function getNote(uint256 index) public view returns (string memory content, uint256 timestamp) {
        require(index < userNotes[msg.sender].length, "Note does not exist");
        Note storage note = userNotes[msg.sender][index];
        return (note.content, note.timestamp);
    }
}

5.2 Python后端

from flask import Flask, jsonify, request
from web3 import Web3
import json

app = Flask(__name__)
w3 = Web3(Web3.HTTPProvider("http://127.0.0.1:7545"))

# 加载合约
with open('OnchainNotepad.json') as f:
    contract_data = json.load(f)

contract_address = "0xYourDeployedContractAddress"
contract = w3.eth.contract(address=contract_address, abi=contract_data['abi'])

@app.route('/notes', methods=['GET'])
def get_notes():
    address = request.args.get('address')
    if not address or not w3.is_address(address):
        return jsonify({"error": "Invalid address"}), 400
    
    note_count = contract.functions.getNoteCount().call({'from': address})
    notes = []
    
    for i in range(note_count):
        content, timestamp = contract.functions.getNote(i).call({'from': address})
        notes.append({
            "index": i,
            "content": content,
            "timestamp": timestamp
        })
    
    return jsonify(notes)

@app.route('/add', methods=['POST'])
def add_note():
    data = request.json
    address = data.get('address')
    content = data.get('content')
    private_key = data.get('private_key')
    
    if not all([address, content, private_key]):
        return jsonify({"error": "Missing parameters"}), 400
    
    # 构建交易
    tx = contract.functions.addNote(content).build_transaction({
        'from': address,
        'nonce': w3.eth.get_transaction_count(address),
        'gas': 300000,
        'gasPrice': w3.to_wei('50', 'gwei')
    })
    
    # 签名并发送
    signed_tx = w3.eth.account.sign_transaction(tx, private_key)
    tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
    
    return jsonify({"tx_hash": tx_hash.hex()})

if __name__ == '__main__':
    app.run(port=5000)

5.3 前端界面

import tkinter as tk
from web3 import Web3
import requests

class NotepadApp:
    def __init__(self, root):
        self.root = root
        self.root.title("链上记事本")
        
        # 配置
        self.w3 = Web3(Web3.HTTPProvider("http://127.0.0.1:7545"))
        self.api_url = "http://localhost:5000"
        
        # UI组件
        self.address_label = tk.Label(root, text="你的地址:")
        self.address_entry = tk.Entry(root, width=50)
        
        self.private_key_label = tk.Label(root, text="私钥(仅本地):")
        self.private_key_entry = tk.Entry(root, width=50, show="*")
        
        self.note_label = tk.Label(root, text="笔记内容:")
        self.note_text = tk.Text(root, height=10, width=50)
        
        self.add_button = tk.Button(root, text="添加笔记", command=self.add_note)
        self.load_button = tk.Button(root, text="加载笔记", command=self.load_notes)
        
        self.notes_listbox = tk.Listbox(root, width=70, height=15)
        self.notes_listbox.bind('<<ListboxSelect>>', self.on_note_select)
        
        # 布局
        self.address_label.grid(row=0, column=0, sticky="w")
        self.address_entry.grid(row=0, column=1)
        
        self.private_key_label.grid(row=1, column=0, sticky="w")
        self.private_key_entry.grid(row=1, column=1)
        
        self.note_label.grid(row=2, column=0, sticky="nw")
        self.note_text.grid(row=2, column=1)
        
        self.add_button.grid(row=3, column=1, sticky="e", pady=5)
        self.load_button.grid(row=4, column=1, sticky="e", pady=5)
        
        self.notes_listbox.grid(row=5, column=0, columnspan=2, padx=10, pady=10)
        
        # 状态变量
        self.current_notes = []
        self.selected_index = None
    
    def add_note(self):
        address = self.address_entry.get()
        private_key = self.private_key_entry.get()
        content = self.note_text.get("1.0", "end-1c")
        
        if not address or not private_key or not content:
            tk.messagebox.showerror("错误", "请填写所有字段")
            return
        
        payload = {
            "address": address,
            "content": content,
            "private_key": private_key
        }
        
        try:
            response = requests.post(f"{self.api_url}/add", json=payload)
            if response.status_code == 200:
                tk.messagebox.showinfo("成功", f"笔记已添加!\n交易哈希: {response.json()['tx_hash']}")
                self.note_text.delete("1.0", tk.END)
            else:
                tk.messagebox.showerror("错误", f"添加失败: {response.text}")
        except Exception as e:
            tk.messagebox.showerror("错误", f"连接失败: {str(e)}")
    
    def load_notes(self):
        address = self.address_entry.get()
        if not address:
            tk.messagebox.showerror("错误", "请输入地址")
            return
        
        try:
            response = requests.get(f"{self.api_url}/notes?address={address}")
            if response.status_code == 200:
                self.current_notes = response.json()
                self.notes_listbox.delete(0, tk.END)
                
                for i, note in enumerate(self.current_notes):
                    preview = note['content'][:50] + "..." if len(note['content']) > 50 else note['content']
                    timestamp = note['timestamp']
                    self.notes_listbox.insert(tk.END, f"[{i}] {preview} (时间戳: {timestamp})")
            else:
                tk.messagebox.showerror("错误", f"加载失败: {response.text}")
        except Exception as e:
            tk.messagebox.showerror("错误", f"连接失败: {str(e)}")
    
    def on_note_select(self, event):
        selection = event.widget.curselection()
        if selection:
            self.selected_index = selection[0]
            note = self.current_notes[self.selected_index]
            self.note_text.delete("1.0", tk.END)
            self.note_text.insert(tk.END, note['content'])

if __name__ == "__main__":
    root = tk.Tk()
    app = NotepadApp(root)
    root.mainloop()

6. 安全最佳实践

6.1 私钥管理

import os
from dotenv import load_dotenv
from web3 import Web3

# 从环境变量加载私钥
load_dotenv()
private_key = os.getenv("PRIVATE_KEY")

# 加密存储方案
from cryptography.fernet import Fernet

def encrypt_private_key(key, private_key):
    cipher_suite = Fernet(key)
    return cipher_suite.encrypt(private_key.encode())

def decrypt_private_key(key, encrypted_key):
    cipher_suite = Fernet(key)
    return cipher_suite.decrypt(encrypted_key).decode()

# 生成加密密钥
encryption_key = Fernet.generate_key()

# 加密私钥
encrypted_pk = encrypt_private_key(encryption_key, "0xYourPrivateKey")

# 安全存储encryption_key和encrypted_pk

6.2 交易安全

# 安全发送交易
def safe_send_transaction(w3, tx, private_key, retries=3):
    for attempt in range(retries):
        try:
            # 更新nonce(每次尝试都需要更新)
            tx['nonce'] = w3.eth.get_transaction_count(
                w3.eth.account.from_key(private_key).address
            )
            
            signed_tx = w3.eth.account.sign_transaction(tx, private_key)
            tx_hash = w3.eth.send_raw_transaction(signed_tx.rawTransaction)
            return w3.eth.wait_for_transaction_receipt(tx_hash)
        
        except ValueError as e:
            if 'nonce too low' in str(e) and attempt < retries - 1:
                continue  # 重试
            elif 'insufficient funds' in str(e):
                raise InsufficientFundsError("账户余额不足")
            else:
                raise TransactionError(f"交易失败: {str(e)}")

class TransactionError(Exception):
    pass

class InsufficientFundsError(TransactionError):
    pass

7. 高级技巧与优化

7.1 批量查询

# 批量查询余额
def batch_get_balances(w3, addresses):
    """使用批处理RPC请求优化多个余额查询"""
    from web3 import Web3
    from web3.middleware import geth_poa_middleware
    
    # 创建本地提供者避免阻塞主连接
    local_w3 = Web3(Web3.HTTPProvider(w3.provider.endpoint_uri))
    if 'poa' in w3.provider.endpoint_uri:
        local_w3.middleware_onion.inject(geth_poa_middleware, layer=0)
    
    # 使用批处理请求
    batch = []
    for address in addresses:
        batch.append(("eth_getBalance", [address, "latest"]))
    
    responses = local_w3.provider.make_batch_request(batch)
    
    # 处理响应
    balances = {}
    for i, response in enumerate(responses):
        if response['error'] is not None:
            balances[addresses[i]] = None
        else:
            balances[addresses[i]] = int(response['result'], 16)
    
    return balances

7.2 Gas优化策略

# 动态Gas定价
def get_optimal_gas_price(w3):
    """获取推荐的Gas价格"""
    # 方法1:使用历史数据
    history = w3.eth.fee_history(10, 'latest', [10, 30, 50])
    base_fee = history['baseFeePerGas'][-1]
    
    # 计算优先级费用(小费)的加权平均
    rewards = [tx[0] for block in history['reward'] for tx in block]
    if rewards:
        avg_priority = sum(rewards) / len(rewards)
    else:
        avg_priority = w3.to_wei('2', 'gwei')
    
    # 方法2:使用预言机(如Etherchain)
    try:
        response = requests.get("https://www.etherchain.org/api/gasPriceOracle")
        data = response.json()
        fast_gas = w3.to_wei(data['fast'], 'gwei')
        return min(fast_gas, base_fee * 2 + avg_priority)
    except:
        # 回退到基本方法
        return base_fee * 2 + avg_priority

# 在交易中使用
tx = {
    'to': recipient,
    'value': amount,
    'gas': 21000,
    'maxFeePerGas': get_optimal_gas_price(w3),
    'maxPriorityFeePerGas': w3.to_wei('2', 'gwei'),
    'nonce': w3.eth.get_transaction_count(sender),
    'chainId': w3.eth.chain_id,
    'type': '0x2'  # EIP-1559类型交易
}

结论:成为区块链开发者

通过本指南,您已掌握Web3.py的核心功能:

  1. 基础操作:连接节点、查询余额、发送交易
  2. 智能合约:编译、部署和交互
  3. 事件处理:监听和响应链上事件
  4. DApp开发:完整的前后端集成
  5. 安全实践:私钥管理和交易安全
  6. 高级技巧:批量查询和Gas优化

Web3.py的强大功能为开发者打开了区块链世界的大门:

  • DeFi协议交互:集成Uniswap、Compound等协议
  • NFT开发:创建和交易数字藏品
  • DAO治理:参与去中心化组织决策
  • 跨链应用:桥接不同区块链生态系统

在区块链的世界里,每一行代码都是构建去中心化未来的基石。掌握Web3.py,你就拥有了塑造这个未来的工具。

下一步:

  1. 探索Layer2解决方案(Optimism、Arbitrum)
  2. 学习智能合约安全审计
  3. 研究零知识证明应用
  4. 参与开源Web3项目
  5. 构建自己的去中心化应用

网站公告

今日签到

点亮在社区的每一天
去签到