#!/usr/bin/env python3
"""
Binance WebSocket market data receiver for latency comparison testing.
Directly connects to Binance WebSocket API to receive bookTicker data.
@copyright Copyright (c) 2025 LiquidityTech. All rights reserved.
This software is proprietary and confidential. Unauthorized copying,
distribution, or modification of this file is strictly prohibited.
For more information, visit: https://www.liquiditytech.com/
LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
Trusted execution, custody, clearing, and financing solutions for digital assets.
"""
import asyncio
import websockets
import json
import time
import signal
import sys
import threading
import queue
import csv
import os
from datetime import datetime
from typing import Dict, Any
import argparse
class AsyncFileWriter:
"""Asynchronous file writer for market data events"""
def __init__(self, queue_size: int = 1000000):
self.latency_queue = queue.Queue(maxsize=queue_size)
self.running = True
def enqueue_event(self, event: Dict[str, Any]):
"""Enqueue event for asynchronous writing"""
try:
self.latency_queue.put_nowait(event)
except queue.Full:
print("Warning: Queue is full, dropping event")
def get_current_date_string(self) -> str:
"""Get current date string in YYYY-MM-DD format"""
return datetime.now().strftime("%Y-%m-%d")
def open_log_file(self, date: str):
"""Open log file for writing"""
filename = f"binance_{date}.log"
return open(filename, 'a', newline='', encoding='utf-8')
def run(self):
"""Main loop for asynchronous file writing thread"""
current_date = self.get_current_date_string()
out_file = self.open_log_file(current_date)
writer = csv.writer(out_file)
# Write CSV header
writer.writerow([
'symbol', 'seq_num', 'exchange_ts', 'local_ts', 'receive_ts',
'bid_price', 'bid_qty', 'ask_price', 'ask_qty'
])
out_file.flush()
while self.running:
try:
# Non-blocking event retrieval
event = self.latency_queue.get(timeout=1.0)
# Check if file needs to be rotated
new_date = self.get_current_date_string()
if new_date != current_date:
out_file.close()
current_date = new_date
out_file = self.open_log_file(current_date)
writer = csv.writer(out_file)
writer.writerow([
'symbol', 'seq_num', 'exchange_ts', 'local_ts', 'receive_ts',
'bid_price', 'bid_qty', 'ask_price', 'ask_qty'
])
out_file.flush()
# Write event data
writer.writerow([
event['symbol'],
event['seq_num'],
event['exchange_ts'],
event['local_ts'],
event['receive_ts'],
event['bid_price'],
event['bid_qty'],
event['ask_price'],
event['ask_qty']
])
out_file.flush()
except queue.Empty:
continue
except Exception as e:
print(f"Error in file writer: {e}")
out_file.close()
class BinanceWebSocketReceiver:
"""Binance WebSocket receiver for market data"""
def __init__(self, symbol: str, main_thread_cpu_id: int = -1,
print_latency_thread_cpu_id: int = -1):
self.symbol = symbol.lower()
self.main_thread_cpu_id = main_thread_cpu_id
self.print_latency_thread_cpu_id = print_latency_thread_cpu_id
self.running = True
self.async_writer = AsyncFileWriter()
self.writer_thread = None
def get_current_time_ns(self) -> int:
"""Get current timestamp in nanoseconds"""
return int(time.time() * 1_000_000_000)
def parse_book_ticker_and_write(self, data: str):
"""Parse bookTicker data and enqueue for writing"""
try:
# Parse JSON data
ticker_data = json.loads(data)
# Extract fields
event = {
'symbol': ticker_data.get('s', ''),
'seq_num': ticker_data.get('u', 0), # Use 'u' field as sequence number
'exchange_ts': ticker_data.get('T', 0), # Event time
'bid_price': float(ticker_data.get('b', 0)),
'bid_qty': float(ticker_data.get('B', 0)),
'ask_price': float(ticker_data.get('a', 0)),
'ask_qty': float(ticker_data.get('A', 0)),
'receive_ts': self.get_current_time_ns(),
'local_ts': self.get_current_time_ns()
}
# Enqueue for asynchronous writing
self.async_writer.enqueue_event(event)
except json.JSONDecodeError as e:
print(f"JSON decode error: {e}")
except Exception as e:
print(f"Error parsing bookTicker data: {e}")
async def connect_and_receive(self):
"""Connect and receive WebSocket data"""
uri = f"wss://stream.binance.com:9443/ws/{self.symbol}@bookTicker"
try:
print(f"Connecting to Binance WebSocket: {uri}")
async with websockets.connect(uri) as websocket:
print("Connected to Binance WebSocket successfully")
print("Press Ctrl+C to stop")
while self.running:
try:
# Receive message
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
# Parse and process data
self.parse_book_ticker_and_write(message)
except asyncio.TimeoutError:
# Timeout is normal, continue loop
continue
except websockets.exceptions.ConnectionClosed:
print("WebSocket connection closed")
break
except Exception as e:
print(f"Error receiving data: {e}")
break
except Exception as e:
print(f"Connection error: {e}")
def signal_handler(self, signum, frame):
"""Signal handler for graceful shutdown"""
print(f"\nReceived signal {signum}, shutting down...")
self.running = False
self.async_writer.running = False
def run(self):
"""Main run function"""
# Set up signal handlers
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
print(f"Symbol: {self.symbol}")
print(f"Main thread CPU ID: {self.main_thread_cpu_id}")
print(f"Writer thread CPU ID: {self.print_latency_thread_cpu_id}")
# Start asynchronous writer thread
self.writer_thread = threading.Thread(target=self.async_writer.run)
self.writer_thread.daemon = True
self.writer_thread.start()
try:
# Run WebSocket connection
asyncio.run(self.connect_and_receive())
except KeyboardInterrupt:
print("Interrupted by user")
finally:
# Stop writer thread
self.async_writer.running = False
if self.writer_thread and self.writer_thread.is_alive():
self.writer_thread.join(timeout=5.0)
print("Binance WebSocket receiver stopped")
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="Binance WebSocket market data receiver for latency comparison"
)
parser.add_argument(
"symbol",
help="Trading pair symbol (e.g., btcusdt, ethusdt)"
)
parser.add_argument(
"main_thread_cpu_id",
type=int,
help="Main thread CPU ID (-1 for no affinity)"
)
parser.add_argument(
"print_latency_thread_cpu_id",
type=int,
help="Writer thread CPU ID (-1 for no affinity)"
)
args = parser.parse_args()
# Create and run receiver
receiver = BinanceWebSocketReceiver(
symbol=args.symbol,
main_thread_cpu_id=args.main_thread_cpu_id,
print_latency_thread_cpu_id=args.print_latency_thread_cpu_id
)
receiver.run()
if __name__ == "__main__":
main()