API Reference

binance_websocket_receiver

#!/usr/bin/env python3
"""
Binance WebSocket market data receiver for latency comparison testing.
Directly connects to Binance WebSocket API to receive bookTicker data.

@copyright Copyright (c) 2025 LiquidityTech. All rights reserved.

This software is proprietary and confidential. Unauthorized copying,
distribution, or modification of this file is strictly prohibited.

For more information, visit: https://www.liquiditytech.com/

LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
Trusted execution, custody, clearing, and financing solutions for digital assets.
"""

import asyncio
import websockets
import json
import time
import signal
import sys
import threading
import queue
import csv
import os
from datetime import datetime
from typing import Dict, Any
import argparse


class AsyncFileWriter:
    """Asynchronous file writer for market data events"""

    def __init__(self, queue_size: int = 1000000):
        self.latency_queue = queue.Queue(maxsize=queue_size)
        self.running = True

    def enqueue_event(self, event: Dict[str, Any]):
        """Enqueue event for asynchronous writing"""
        try:
            self.latency_queue.put_nowait(event)
        except queue.Full:
            print("Warning: Queue is full, dropping event")

    def get_current_date_string(self) -> str:
        """Get current date string in YYYY-MM-DD format"""
        return datetime.now().strftime("%Y-%m-%d")

    def open_log_file(self, date: str):
        """Open log file for writing"""
        filename = f"binance_{date}.log"
        return open(filename, 'a', newline='', encoding='utf-8')

    def run(self):
        """Main loop for asynchronous file writing thread"""
        current_date = self.get_current_date_string()
        out_file = self.open_log_file(current_date)
        writer = csv.writer(out_file)

        # Write CSV header
        writer.writerow([
            'symbol', 'seq_num', 'exchange_ts', 'local_ts', 'receive_ts',
            'bid_price', 'bid_qty', 'ask_price', 'ask_qty'
        ])
        out_file.flush()

        while self.running:
            try:
                # Non-blocking event retrieval
                event = self.latency_queue.get(timeout=1.0)

                # Check if file needs to be rotated
                new_date = self.get_current_date_string()
                if new_date != current_date:
                    out_file.close()
                    current_date = new_date
                    out_file = self.open_log_file(current_date)
                    writer = csv.writer(out_file)
                    writer.writerow([
                        'symbol', 'seq_num', 'exchange_ts', 'local_ts', 'receive_ts',
                        'bid_price', 'bid_qty', 'ask_price', 'ask_qty'
                    ])
                    out_file.flush()

                # Write event data
                writer.writerow([
                    event['symbol'],
                    event['seq_num'],
                    event['exchange_ts'],
                    event['local_ts'],
                    event['receive_ts'],
                    event['bid_price'],
                    event['bid_qty'],
                    event['ask_price'],
                    event['ask_qty']
                ])
                out_file.flush()

            except queue.Empty:
                continue
            except Exception as e:
                print(f"Error in file writer: {e}")

        out_file.close()


class BinanceWebSocketReceiver:
    """Binance WebSocket receiver for market data"""

    def __init__(self, symbol: str, main_thread_cpu_id: int = -1,
                 print_latency_thread_cpu_id: int = -1):
        self.symbol = symbol.lower()
        self.main_thread_cpu_id = main_thread_cpu_id
        self.print_latency_thread_cpu_id = print_latency_thread_cpu_id
        self.running = True
        self.async_writer = AsyncFileWriter()
        self.writer_thread = None

    def get_current_time_ns(self) -> int:
        """Get current timestamp in nanoseconds"""
        return int(time.time() * 1_000_000_000)

    def parse_book_ticker_and_write(self, data: str):
        """Parse bookTicker data and enqueue for writing"""
        try:
            # Parse JSON data
            ticker_data = json.loads(data)

            # Extract fields
            event = {
                'symbol': ticker_data.get('s', ''),
                'seq_num': ticker_data.get('u', 0),  # Use 'u' field as sequence number
                'exchange_ts': ticker_data.get('T', 0),  # Event time
                'bid_price': float(ticker_data.get('b', 0)),
                'bid_qty': float(ticker_data.get('B', 0)),
                'ask_price': float(ticker_data.get('a', 0)),
                'ask_qty': float(ticker_data.get('A', 0)),
                'receive_ts': self.get_current_time_ns(),
                'local_ts': self.get_current_time_ns()
            }

            # Enqueue for asynchronous writing
            self.async_writer.enqueue_event(event)

        except json.JSONDecodeError as e:
            print(f"JSON decode error: {e}")
        except Exception as e:
            print(f"Error parsing bookTicker data: {e}")

    async def connect_and_receive(self):
        """Connect and receive WebSocket data"""
        uri = f"wss://stream.binance.com:9443/ws/{self.symbol}@bookTicker"

        try:
            print(f"Connecting to Binance WebSocket: {uri}")

            async with websockets.connect(uri) as websocket:
                print("Connected to Binance WebSocket successfully")
                print("Press Ctrl+C to stop")

                while self.running:
                    try:
                        # Receive message
                        message = await asyncio.wait_for(websocket.recv(), timeout=1.0)

                        # Parse and process data
                        self.parse_book_ticker_and_write(message)

                    except asyncio.TimeoutError:
                        # Timeout is normal, continue loop
                        continue
                    except websockets.exceptions.ConnectionClosed:
                        print("WebSocket connection closed")
                        break
                    except Exception as e:
                        print(f"Error receiving data: {e}")
                        break

        except Exception as e:
            print(f"Connection error: {e}")

    def signal_handler(self, signum, frame):
        """Signal handler for graceful shutdown"""
        print(f"\nReceived signal {signum}, shutting down...")
        self.running = False
        self.async_writer.running = False

    def run(self):
        """Main run function"""
        # Set up signal handlers
        signal.signal(signal.SIGINT, self.signal_handler)
        signal.signal(signal.SIGTERM, self.signal_handler)

        print(f"Symbol: {self.symbol}")
        print(f"Main thread CPU ID: {self.main_thread_cpu_id}")
        print(f"Writer thread CPU ID: {self.print_latency_thread_cpu_id}")

        # Start asynchronous writer thread
        self.writer_thread = threading.Thread(target=self.async_writer.run)
        self.writer_thread.daemon = True
        self.writer_thread.start()

        try:
            # Run WebSocket connection
            asyncio.run(self.connect_and_receive())
        except KeyboardInterrupt:
            print("Interrupted by user")
        finally:
            # Stop writer thread
            self.async_writer.running = False
            if self.writer_thread and self.writer_thread.is_alive():
                self.writer_thread.join(timeout=5.0)

            print("Binance WebSocket receiver stopped")


def main():
    """Main function"""
    parser = argparse.ArgumentParser(
        description="Binance WebSocket market data receiver for latency comparison"
    )
    parser.add_argument(
        "symbol",
        help="Trading pair symbol (e.g., btcusdt, ethusdt)"
    )
    parser.add_argument(
        "main_thread_cpu_id",
        type=int,
        help="Main thread CPU ID (-1 for no affinity)"
    )
    parser.add_argument(
        "print_latency_thread_cpu_id",
        type=int,
        help="Writer thread CPU ID (-1 for no affinity)"
    )

    args = parser.parse_args()

    # Create and run receiver
    receiver = BinanceWebSocketReceiver(
        symbol=args.symbol,
        main_thread_cpu_id=args.main_thread_cpu_id,
        print_latency_thread_cpu_id=args.print_latency_thread_cpu_id
    )

    receiver.run()


if __name__ == "__main__":
    main()