跳至主要內容
進階

ZeroMQ

深入了解 Bitcoin Core 的 ZeroMQ 通知系統,實時接收區塊和交易事件。

10 分鐘

什麼是 ZeroMQ?

ZeroMQ(ZMQ)是 Bitcoin Core 提供的實時通知系統,允許外部應用程序 在區塊或交易事件發生時立即接收通知,無需輪詢 RPC 接口。

通知類型

主題 內容 觸發時機
hashblock 區塊雜湊(32 bytes) 新區塊加入最佳鏈
hashtx 交易雜湊(32 bytes) 交易加入 mempool 或區塊
rawblock 完整區塊(序列化) 新區塊加入最佳鏈
rawtx 完整交易(序列化) 交易加入 mempool 或區塊
sequence 序列通知 mempool 變化、區塊連接/斷開

配置

# bitcoin.conf

# 區塊雜湊通知
zmqpubhashblock=tcp://127.0.0.1:28332

# 交易雜湊通知
zmqpubhashtx=tcp://127.0.0.1:28333

# 原始區塊通知
zmqpubrawblock=tcp://127.0.0.1:28334

# 原始交易通知
zmqpubrawtx=tcp://127.0.0.1:28335

# 序列通知
zmqpubsequence=tcp://127.0.0.1:28336

# 高水位標記(消息隊列大小)
zmqpubhashblockhwm=1000
zmqpubhashtxhwm=1000

注意: ZMQ 需要在編譯時啟用。使用 bitcoin-cli getzmqnotifications 檢查是否可用。

訂閱示例

Python

import zmq
import binascii

# 創建 ZMQ 上下文
context = zmq.Context()
socket = context.socket(zmq.SUB)

# 連接到 Bitcoin Core
socket.connect("tcp://127.0.0.1:28332")

# 訂閱所有消息(空字串 = 訂閱所有)
socket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")

print("等待區塊通知...")

while True:
    # 接收多部分消息
    topic = socket.recv()
    body = socket.recv()
    sequence = socket.recv()

    # 解析區塊雜湊(小端序)
    block_hash = binascii.hexlify(body[::-1]).decode()
    seq_num = int.from_bytes(sequence, 'little')

    print(f"新區塊: {block_hash}")
    print(f"序列號: {seq_num}")

Node.js

import * as zmq from 'zeromq';

async function subscribeToBlocks() {
  const socket = new zmq.Subscriber();

  socket.connect('tcp://127.0.0.1:28332');
  socket.subscribe('hashblock');

  console.log('等待區塊通知...');

  for await (const [topic, body, sequence] of socket) {
    // 反轉字節順序(小端 -> 大端)
    const blockHash = Buffer.from(body)
      .reverse()
      .toString('hex');

    const seqNum = sequence.readUInt32LE(0);

    console.log(`新區塊: ${blockHash}`);
    console.log(`序列號: ${seqNum}`);
  }
}

subscribeToBlocks();

Rust

use zmq;

fn main() {
    let context = zmq::Context::new();
    let socket = context.socket(zmq::SUB).unwrap();

    socket.connect("tcp://127.0.0.1:28332").unwrap();
    socket.set_subscribe(b"hashblock").unwrap();

    println!("等待區塊通知...");

    loop {
        let topic = socket.recv_bytes(0).unwrap();
        let body = socket.recv_bytes(0).unwrap();
        let sequence = socket.recv_bytes(0).unwrap();

        // 反轉字節順序
        let mut hash = body.clone();
        hash.reverse();
        let block_hash = hex::encode(&hash);

        let seq_num = u32::from_le_bytes(
            sequence[..4].try_into().unwrap()
        );

        println!("新區塊: {}", block_hash);
        println!("序列號: {}", seq_num);
    }
}

序列通知

sequence 主題提供更詳細的 mempool 和區塊鏈狀態變化通知。

序列消息格式:

| 字段 | 長度 | 描述 |
|------|------|------|
| hash | 32   | 區塊/交易雜湊 |
| label| 1    | 事件類型 |
| sequence | 4 | 序列號 |

事件類型(label):
'A' = 交易加入 mempool
'R' = 交易從 mempool 移除
'C' = 區塊連接
'D' = 區塊斷開(重組)
# 訂閱序列通知
socket.setsockopt_string(zmq.SUBSCRIBE, "sequence")

while True:
    topic = socket.recv()
    body = socket.recv()

    hash_bytes = body[:32]
    label = chr(body[32])
    sequence = int.from_bytes(body[33:37], 'little')

    hash_hex = binascii.hexlify(hash_bytes[::-1]).decode()

    events = {
        'A': '交易加入 mempool',
        'R': '交易移除 mempool',
        'C': '區塊連接',
        'D': '區塊斷開'
    }

    print(f"{events.get(label, '未知')}: {hash_hex[:16]}...")

應用場景

區塊瀏覽器

實時更新區塊和交易信息,無需輪詢。

交易監控

監控特定地址或交易的確認狀態。

閃電網路節點

監控通道相關交易,及時響應鏈上事件。

挖礦軟體

立即收到新區塊通知,更新工作模板。

可靠性考慮

注意事項

  • ! ZMQ 使用 PUB/SUB 模式,消息可能丟失
  • ! 重組時可能收到相同交易的多次通知
  • ! 應用程序重啟時需要重新同步狀態
# 使用序列號檢測丟失的消息
last_sequence = None

while True:
    topic = socket.recv()
    body = socket.recv()
    sequence = socket.recv()

    seq_num = int.from_bytes(sequence, 'little')

    if last_sequence is not None:
        if seq_num != last_sequence + 1:
            print(f"警告:可能丟失消息 {last_sequence + 1} - {seq_num - 1}")
            # 需要通過 RPC 重新同步

    last_sequence = seq_num

    # 處理消息...

命令行工具

# 檢查 ZMQ 配置
bitcoin-cli getzmqnotifications

# 輸出示例
[
  {
    "type": "pubhashblock",
    "address": "tcp://127.0.0.1:28332",
    "hwm": 1000
  },
  {
    "type": "pubhashtx",
    "address": "tcp://127.0.0.1:28333",
    "hwm": 1000
  }
]

# 使用 Python 快速測試
python3 -c "
import zmq
ctx = zmq.Context()
s = ctx.socket(zmq.SUB)
s.connect('tcp://127.0.0.1:28332')
s.subscribe('')
print('收到:', s.recv(), s.recv())
"

總結

  • 實時通知:無需輪詢,事件驅動
  • 多種主題:區塊、交易、序列通知
  • 跨語言:支持 Python、Node.js、Rust 等
  • 注意:消息可能丟失,需要處理重連
已複製連結
已複製到剪貼簿