進階
ZeroMQ
深入了解 Bitcoin Core 的 ZeroMQ 通知系統,實時接收區塊和交易事件。
10 分鐘
什麼是 ZeroMQ?
ZeroMQ(ZMQ)是 Bitcoin Core 提供的實時通知系統,允許外部應用程序 在區塊或交易事件發生時立即接收通知,無需輪詢 RPC 接口。
通知類型
| 主題 | 內容 | 觸發時機 |
|---|---|---|
| hashblock | 區塊雜湊(32 bytes) | 新區塊加入最佳鏈 |
| hashtx | 交易雜湊(32 bytes) | 交易加入 mempool 或區塊 |
| rawblock | 完整區塊(序列化) | 新區塊加入最佳鏈 |
| rawtx | 完整交易(序列化) | 交易加入 mempool 或區塊 |
| sequence | 序列通知 | mempool 變化、區塊連接/斷開 |
配置
# bitcoin.conf
# 區塊雜湊通知
zmqpubhashblock=tcp://127.0.0.1:28332
# 交易雜湊通知
zmqpubhashtx=tcp://127.0.0.1:28333
# 原始區塊通知
zmqpubrawblock=tcp://127.0.0.1:28334
# 原始交易通知
zmqpubrawtx=tcp://127.0.0.1:28335
# 序列通知
zmqpubsequence=tcp://127.0.0.1:28336
# 高水位標記(消息隊列大小)
zmqpubhashblockhwm=1000
zmqpubhashtxhwm=1000 注意:
ZMQ 需要在編譯時啟用。使用 bitcoin-cli getzmqnotifications 檢查是否可用。
訂閱示例
Python
import zmq
import binascii
# 創建 ZMQ 上下文
context = zmq.Context()
socket = context.socket(zmq.SUB)
# 連接到 Bitcoin Core
socket.connect("tcp://127.0.0.1:28332")
# 訂閱所有消息(空字串 = 訂閱所有)
socket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
print("等待區塊通知...")
while True:
# 接收多部分消息
topic = socket.recv()
body = socket.recv()
sequence = socket.recv()
# 解析區塊雜湊(小端序)
block_hash = binascii.hexlify(body[::-1]).decode()
seq_num = int.from_bytes(sequence, 'little')
print(f"新區塊: {block_hash}")
print(f"序列號: {seq_num}") Node.js
import * as zmq from 'zeromq';
async function subscribeToBlocks() {
const socket = new zmq.Subscriber();
socket.connect('tcp://127.0.0.1:28332');
socket.subscribe('hashblock');
console.log('等待區塊通知...');
for await (const [topic, body, sequence] of socket) {
// 反轉字節順序(小端 -> 大端)
const blockHash = Buffer.from(body)
.reverse()
.toString('hex');
const seqNum = sequence.readUInt32LE(0);
console.log(`新區塊: ${blockHash}`);
console.log(`序列號: ${seqNum}`);
}
}
subscribeToBlocks(); Rust
use zmq;
fn main() {
let context = zmq::Context::new();
let socket = context.socket(zmq::SUB).unwrap();
socket.connect("tcp://127.0.0.1:28332").unwrap();
socket.set_subscribe(b"hashblock").unwrap();
println!("等待區塊通知...");
loop {
let topic = socket.recv_bytes(0).unwrap();
let body = socket.recv_bytes(0).unwrap();
let sequence = socket.recv_bytes(0).unwrap();
// 反轉字節順序
let mut hash = body.clone();
hash.reverse();
let block_hash = hex::encode(&hash);
let seq_num = u32::from_le_bytes(
sequence[..4].try_into().unwrap()
);
println!("新區塊: {}", block_hash);
println!("序列號: {}", seq_num);
}
} 序列通知
sequence 主題提供更詳細的 mempool 和區塊鏈狀態變化通知。
序列消息格式:
| 字段 | 長度 | 描述 |
|------|------|------|
| hash | 32 | 區塊/交易雜湊 |
| label| 1 | 事件類型 |
| sequence | 4 | 序列號 |
事件類型(label):
'A' = 交易加入 mempool
'R' = 交易從 mempool 移除
'C' = 區塊連接
'D' = 區塊斷開(重組) # 訂閱序列通知
socket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
while True:
topic = socket.recv()
body = socket.recv()
hash_bytes = body[:32]
label = chr(body[32])
sequence = int.from_bytes(body[33:37], 'little')
hash_hex = binascii.hexlify(hash_bytes[::-1]).decode()
events = {
'A': '交易加入 mempool',
'R': '交易移除 mempool',
'C': '區塊連接',
'D': '區塊斷開'
}
print(f"{events.get(label, '未知')}: {hash_hex[:16]}...") 應用場景
區塊瀏覽器
實時更新區塊和交易信息,無需輪詢。
交易監控
監控特定地址或交易的確認狀態。
閃電網路節點
監控通道相關交易,及時響應鏈上事件。
挖礦軟體
立即收到新區塊通知,更新工作模板。
可靠性考慮
注意事項
- ! ZMQ 使用 PUB/SUB 模式,消息可能丟失
- ! 重組時可能收到相同交易的多次通知
- ! 應用程序重啟時需要重新同步狀態
# 使用序列號檢測丟失的消息
last_sequence = None
while True:
topic = socket.recv()
body = socket.recv()
sequence = socket.recv()
seq_num = int.from_bytes(sequence, 'little')
if last_sequence is not None:
if seq_num != last_sequence + 1:
print(f"警告:可能丟失消息 {last_sequence + 1} - {seq_num - 1}")
# 需要通過 RPC 重新同步
last_sequence = seq_num
# 處理消息... 命令行工具
# 檢查 ZMQ 配置
bitcoin-cli getzmqnotifications
# 輸出示例
[
{
"type": "pubhashblock",
"address": "tcp://127.0.0.1:28332",
"hwm": 1000
},
{
"type": "pubhashtx",
"address": "tcp://127.0.0.1:28333",
"hwm": 1000
}
]
# 使用 Python 快速測試
python3 -c "
import zmq
ctx = zmq.Context()
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:28332')
s.subscribe('')
print('收到:', s.recv(), s.recv())
" 總結
- ✓ 實時通知:無需輪詢,事件驅動
- ✓ 多種主題:區塊、交易、序列通知
- ✓ 跨語言:支持 Python、Node.js、Rust 等
- ⚠ 注意:消息可能丟失,需要處理重連
已複製連結