:2026-05-14 22:42 点击:1
在区块链数据分析、智能合约审计或DeFi研究等场景中,将以太坊链上数据导出为结构化的CSV格式,是提升数据处理效率的关键一步,CSV(逗号分隔值)文件因其通用性强、兼容Excel等工具,成为数据分析师和开发者的首选格式,本文将详细介绍以太坊数据导出的核心方法、具体步骤及实用技巧,助你轻松实现链上数据的结构化存储与后续分析。
在开始导出前,首先需明确数据类型和数据范围,这直接影响工具选择和操作复杂度,常见的以太坊数据包括:
根据数据量和需求复杂度,可选择以下三种主流方法,覆盖从小白到开发者的不同场景。
区块链浏览器(如Etherscan、Ethtx、BscScan)是最直观的数据导出工具,适合获取少量特定地址或交易的数据。
选择数据类型:
筛选数据范围:
在导出前,可通过时间筛选、交易类型(如成功/失败)、事件参数等条件缩小数据范围,避免导出无用数据。
通过以太坊节点服务商的API(如Infura、Alchemy、Moralis)获取数据,再通过代码处理为CSV格式,适合批量导出和自定义字段。
import requests
import csv
from datetime import datetime
# 配置API参数(以Infura为例)
API_KEY = "YOUR_INFURA_API_KEY"
address = "0x742d35Cc6634C0532925a3b844Bc9e7595f8e5a8" # 示例地址
url = f"https://mainnet.infura.io/v3/{API_KEY}"
# 构造请求参数(获取地址交易记录)
params = {
"jsonrpc": "2.0",
"method": "eth_getLogs",
"params": [
{
"fromBlock": "0x0", # 起始区块(可设为"latest"或具体高度)
"toBlock": "latest", # 结束区块
"address": address, # 目标地址(空则查询所有交易)
"topics": [] # 主题(用于筛选事件日志,空则查询所有交易)
}
],
"id": 1
}
# 发送请求并解析数据
response = requests.post(url, json=params)
transactions = response.json().get("result", [])
# 提取交易关键信息
data_to_export = []
for tx in transactions:
tx_hash = tx.get("transactionHash")
block_num = int(tx.get("blockNumber"), 16)
timestamp = datetime.fromtimestamp(int(tx.get("timeStamp"), 16)).strftime("%Y-%m-%d %H:%M:%S")
from_addr = tx.get("from")
to_addr = tx.get("to")
value = int(tx.get("value"), 16) / 1e18 # wei转ETH
data_to_export.append([tx_hash, block_num, timestamp, from_addr, to_addr, value])
# 写入CSV文件
with open("ethereum_transactions.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["Transaction Hash", "Block Number", "Timestamp", "From", "To", "Value (ETH)"]) # 表头
writer.writerows(data_to_export)
print("数据导出成功!文件名:ethereum_transactions.csv")
pip install requests。对于需要全量数据或高频查询的场景,可通过本地同步以太坊全节点(如Geth、Nethermind),将数据存入数据库(如PostgreSQL、MongoDB),再通过SQL查询导出CSV。
同步以太坊节点:
安装Geth,执行同步命令(首次同步需较长时间,建议SSD硬盘):
geth --syncmode full --http --http.addr 0.0.0.0 --http.port 8545
同步完成后,通过HTTP API访问节点数据。
监听数据并写入数据库:
使用Python脚本监听新区块,解析交易和事件数据,存入PostgreSQL:
import requests
import psycopg2
import time
# 连接PostgreSQL
conn = psycopg2.connect(
dbname="ethereum",
user="postgres",
password="your_password",
host="localhost"
)
cur = conn.cursor()
# 创建交易表
cur.execute("""
CREATE TABLE IF NOT EXISTS transactions (
tx_hash VARCHAR(66) PRIMARY KEY,
block_num BIGINT,
timestamp TIMESTAMP,
from_addr VARCHAR(42),
to_addr VARCHAR(42),
value NUMERIC(36, 18)
)
""")
conn.commit()
# 监听新区块(通过轮询最新区块号)
last_block = 0
while True:
url = "h
ttps://mainnet.infura.io/v3/YOUR_INFURA_API_KEY"
params = {"jsonrpc": "2.0", "method": "eth_blockNumber", "params": [], "id": 1}
response = requests.post(url, json=params)
current_block = int(response.json().get("result"), 16)
if current_block > last_block:
for block_num in range(last_block + 1, current_block + 1):
block_params = {
"jsonrpc": "2
本文由用户投稿上传,若侵权请提供版权资料并联系删除!