API Reference

RapidMarket Latency Comparison Test Solution

Overview

This solution provides three market data receivers for latency comparison testing:

Comparison Principle

Data matching is performed using the seq_num field to compare the receive_ts (receive timestamp) of the same market data across different receiving methods, thereby evaluating latency differences.

Important Note: seq_num Field Processing

RapidMarket 1.0 receiver's seq_num is processed twice:

  • Format: channel_id * 1e18 + exchange_seq_num
  • Where channel_id is the exchange channel id and exchange_seq_num is the original exchange sequence number

RapidMarket 2.0 and Binance WebSocket seq_num are original exchange sequence numbers:

  • No restoration needed, use directly

Only RapidMarket 1.0 requires exchange_seq_num restoration for latency comparison testing:

# Extract exchange_seq_num from RapidMarket 1.0's seq_num
def extract_exchange_seq_num(rapidmarket_1_0_seq_num):
    """Extract original exchange sequence number from RapidMarket 1.0 seq_num"""
    return rapidmarket_1_0_seq_num % int(1e18)

# Example
rapidmarket_1_0_seq_num = 2000000000000000123  # channel_id=2, exchange_seq_num=123
exchange_seq_num = extract_exchange_seq_num(rapidmarket_1_0_seq_num)  # Result: 123

File Description

1. RapidMarket 1.0 Receiver (ltp_rm_1_0_receiver.cpp)

Function: Receive market data through Rapidmarket1.0 and asynchronously write to file

Compilation:

protoc --cpp_out=. l2_book.proto
g++ -std=c++17 -Wall -O3 -I. -I./cppzmq ltp_rm_1_0_receiver.cpp l2_book.pb.cc -lzmq -lprotobuf -lpthread -o ltp_rm_1_0_receiver

Execution:

./ltp_rm_1_0_receiver <zmq_path> <main_thread_cpu_id> <print_latency_thread_cpu_id>

Example:

./ltp_rm_1_0_receiver "tcp://localhost:50000" 0 1

2. RapidMarket 2.0 Receiver (ltp_rm_2_0_receiver.cpp)

Function: Receive market data through Rapidmarket2.0 and asynchronously write to file

Compilation:

g++ -o ltp_rm_2_0_receiver ltp_rm_2_0_receiver.cpp -lpthread -L/usr/local/lib -lucli_ffi

Execution:

./ltp_rm_2_0_receiver <topic> <main_thread_cpu_id> <print_latency_thread_cpu_id>

Example:

./ltp_rm_2_0_receiver "bf_pm_pub" 0 1

3. Binance WebSocket Receiver (binance_websocket_receiver.py)

Function: Directly connect to Binance exchange WebSocket API to receive bookTicker data and asynchronously write to file

Dependency Installation:

# Install Python dependencies
pip install websockets asyncio

Execution:

python3 binance_websocket_receiver.py <symbol> <main_thread_cpu_id> <print_latency_thread_cpu_id>

Example:

python3 binance_websocket_receiver.py btcusdt 0 1

Supported Trading Pairs:

  • btcusdt, ethusdt, bnbusdt, adausdt, xrpusdt, etc.

Output Data Format

All three receivers generate date-named log files containing the following fields:

  • RapidMarket 1.0: rm_1_0_2025-01-15.log
  • RapidMarket 2.0: rm_2_0_2025-01-15.log
  • Binance WebSocket: binance_2025-01-15.log
symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty

Field Description

  • symbol: Trading pair symbol
  • seq_num: Sequence number (used for data matching)
  • exchange_ts: Exchange timestamp
  • local_ts: Local timestamp
  • receive_ts: Receive timestamp (key comparison field)
  • bid_price: Best bid price
  • bid_qty: Best bid quantity
  • ask_price: Best ask price
  • ask_qty: Best ask quantity

Latency Comparison Test Steps

1. Environment Preparation

  1. Ensure RapidMarket service is running normally
  2. Prepare client's own WebSocket direct connection program
  3. Start all market data receiving services

2. Start Testing

Start all three receivers simultaneously:

# RapidMarket 1.0
./ltp_rm_1_0_receiver "tcp://localhost:50000" 0 1 &

# RapidMarket 2.0
./ltp_rm_2_0_receiver "bf_m_pub" 0 2 &

# Binance WebSocket Direct Connection
python3 binance_websocket_receiver.py btcusdt 0 3 &

3. Data Collection

After running for a period, stop all programs and collect the following data files:

  • RapidMarket 1.0: rm_1_0_2025-01-15.log
  • RapidMarket 2.0: rm_2_0_2025-01-15.log
  • Binance WebSocket: binance_2025-01-15.log

4. Latency Analysis

Use seq_num for data matching and compare receive_ts of the same market data. The following example shows latency comparison between RM 2.0 and WebSocket:

# Latency comparison analysis script
import pandas as pd
import argparse
import sys
import os

def main():
    parser = argparse.ArgumentParser(
        description='Compare latency data between RapidMarket 2.0 and Binance WebSocket',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Usage examples:
  python3 latency_compare.py rm_2_0_2025-01-15.log binance_2025-01-15.log
  python3 latency_compare.py -r rm_2_0.log -b binance.log
        """
    )

    parser.add_argument(
        'rm2_file',
        nargs='?',
        default=None,
        help='RapidMarket 2.0 log file path'
    )

    parser.add_argument(
        'binance_file',
        nargs='?',
        default=None,
        help='Binance WebSocket log file path'
    )

    parser.add_argument(
        '-r', '--rm2',
        dest='rm2_file_opt',
        help='RapidMarket 2.0 log file path (optional)'
    )

    parser.add_argument(
        '-b', '--binance',
        dest='binance_file_opt',
        help='Binance WebSocket log file path (optional)'
    )

    args = parser.parse_args()

    # Determine file paths (prefer option arguments)
    rm2_file = args.rm2_file_opt or args.rm2_file
    binance_file = args.binance_file_opt or args.binance_file

    # Check arguments
    if not rm2_file:
        parser.error('RapidMarket 2.0 log file path is required')
    if not binance_file:
        parser.error('Binance WebSocket log file path is required')

    # Check if files exist
    if not os.path.exists(rm2_file):
        print(f"Error: File not found: {rm2_file}", file=sys.stderr)
        sys.exit(1)

    if not os.path.exists(binance_file):
        print(f"Error: File not found: {binance_file}", file=sys.stderr)
        sys.exit(1)

    print(f"Reading RapidMarket 2.0 data: {rm2_file}")
    print(f"Reading Binance WebSocket data: {binance_file}\n")

    try:
        # Read data
        rm2_data = pd.read_csv(rm2_file)
        binance_data = pd.read_csv(binance_file)

        print(f"RapidMarket 2.0 data count: {len(rm2_data)}")
        print(f"Binance WebSocket data count: {len(binance_data)}\n")

        # Calculate individual latency (receive_ts - exchange_ts) for each receiver
        # Note: exchange_ts is in milliseconds, receive_ts is in nanoseconds
        # Convert exchange_ts to nanoseconds by multiplying by 1e6
        rm2_data['latency'] = rm2_data['receive_ts'] - (rm2_data['exchange_ts'] * 1e6)
        binance_data['latency'] = binance_data['receive_ts'] - (binance_data['exchange_ts'] * 1e6)

        # Print individual latency statistics
        print("=" * 70)
        print("Individual Latency Statistics (receive_ts - exchange_ts)")
        print("=" * 70)

        # RapidMarket 2.0 latency percentiles
        rm2_p50 = rm2_data['latency'].quantile(0.50)
        rm2_p90 = rm2_data['latency'].quantile(0.90)
        rm2_p95 = rm2_data['latency'].quantile(0.95)
        rm2_p99 = rm2_data['latency'].quantile(0.99)

        print(f"\n[RapidMarket 2.0 Latency Percentiles]")
        print(f"  P50 (Median):  {rm2_p50:>15.2f} ns ({rm2_p50 / 1e6:>8.2f} ms)")
        print(f"  P90:           {rm2_p90:>15.2f} ns ({rm2_p90 / 1e6:>8.2f} ms)")
        print(f"  P95:           {rm2_p95:>15.2f} ns ({rm2_p95 / 1e6:>8.2f} ms)")
        print(f"  P99:           {rm2_p99:>15.2f} ns ({rm2_p99 / 1e6:>8.2f} ms)")

        # Binance WebSocket latency percentiles
        binance_p50 = binance_data['latency'].quantile(0.50)
        binance_p90 = binance_data['latency'].quantile(0.90)
        binance_p95 = binance_data['latency'].quantile(0.95)
        binance_p99 = binance_data['latency'].quantile(0.99)

        print(f"\n[Binance WebSocket Latency Percentiles]")
        print(f"  P50 (Median):  {binance_p50:>15.2f} ns ({binance_p50 / 1e6:>8.2f} ms)")
        print(f"  P90:           {binance_p90:>15.2f} ns ({binance_p90 / 1e6:>8.2f} ms)")
        print(f"  P95:           {binance_p95:>15.2f} ns ({binance_p95 / 1e6:>8.2f} ms)")
        print(f"  P99:           {binance_p99:>15.2f} ns ({binance_p99 / 1e6:>8.2f} ms)")

        # Compare RapidMarket 2.0 vs Binance WebSocket (RM 2.0's seq_num is the original sequence number)
        rm2_vs_binance = rm2_data.merge(binance_data, on='seq_num', suffixes=('_rm2', '_binance'))
        # Calculate latency difference: positive value means RM2.0 is faster (WebSocket receive time - RM2.0 receive time)
        rm2_vs_binance['latency_diff'] = rm2_vs_binance['receive_ts_binance'] - rm2_vs_binance['receive_ts_rm2']
        # RM2.0 advantage (how much faster RM2.0 is compared to WebSocket)
        rm2_vs_binance['rm2_advantage'] = rm2_vs_binance['latency_diff']

        # Statistics on latency distribution
        print("=" * 70)
        print("RapidMarket 2.0 vs Binance WebSocket Latency Comparison Analysis")
        print("=" * 70)
        print(f"Matched data count: {len(rm2_vs_binance)}")

        if len(rm2_vs_binance) > 0:
            # Analyze RM2.0 advantage scenarios
            rm2_faster = rm2_vs_binance[rm2_vs_binance['rm2_advantage'] > 0]
            rm2_slower = rm2_vs_binance[rm2_vs_binance['rm2_advantage'] < 0]
            rm2_same = rm2_vs_binance[rm2_vs_binance['rm2_advantage'] == 0]

            print(f"\n[Latency Advantage Statistics]")
            print(f"  RM2.0 faster cases: {len(rm2_faster)} ({len(rm2_faster)/len(rm2_vs_binance)*100:.2f}%)")
            print(f"  WebSocket faster cases: {len(rm2_slower)} ({len(rm2_slower)/len(rm2_vs_binance)*100:.2f}%)")
            if len(rm2_same) > 0:
                print(f"  Received simultaneously: {len(rm2_same)} ({len(rm2_same)/len(rm2_vs_binance)*100:.2f}%)")

            # RM2.0 average advantage
            avg_advantage = rm2_vs_binance['rm2_advantage'].mean()
            median_advantage = rm2_vs_binance['rm2_advantage'].median()

            print(f"\n[RM2.0 Latency Advantage (how much faster than WebSocket)]")
            if avg_advantage > 0:
                print(f"  Average advantage: RM2.0 is {avg_advantage:.2f} ns ({avg_advantage / 1e6:.2f} ms) faster than WebSocket")
            else:
                print(f"  Average disadvantage: RM2.0 is {abs(avg_advantage):.2f} ns ({abs(avg_advantage) / 1e6:.2f} ms) slower than WebSocket")

            if median_advantage > 0:
                print(f"  Median advantage: RM2.0 is {median_advantage:.2f} ns ({median_advantage / 1e6:.2f} ms) faster than WebSocket")
            else:
                print(f"  Median disadvantage: RM2.0 is {abs(median_advantage):.2f} ns ({abs(median_advantage) / 1e6:.2f} ms) slower than WebSocket")

            # Basic statistics
            print(f"\n[Latency Difference Statistics (WebSocket - RM2.0)]")
            print(f"  Mean: {rm2_vs_binance['latency_diff'].mean():.2f} ns ({rm2_vs_binance['latency_diff'].mean() / 1e6:.2f} ms)")
            print(f"  Median: {rm2_vs_binance['latency_diff'].median():.2f} ns ({rm2_vs_binance['latency_diff'].median() / 1e6:.2f} ms)")
            print(f"  Min: {rm2_vs_binance['latency_diff'].min():.2f} ns ({rm2_vs_binance['latency_diff'].min() / 1e6:.2f} ms)")
            print(f"  Max: {rm2_vs_binance['latency_diff'].max():.2f} ns ({rm2_vs_binance['latency_diff'].max() / 1e6:.2f} ms)")
            print(f"  Std Dev: {rm2_vs_binance['latency_diff'].std():.2f} ns ({rm2_vs_binance['latency_diff'].std() / 1e6:.2f} ms)")

            # Calculate percentiles
            p50 = rm2_vs_binance['latency_diff'].quantile(0.50)
            p90 = rm2_vs_binance['latency_diff'].quantile(0.90)
            p95 = rm2_vs_binance['latency_diff'].quantile(0.95)
            p99 = rm2_vs_binance['latency_diff'].quantile(0.99)

            print(f"\n[Latency Difference Percentiles (positive=RM2.0 faster, negative=RM2.0 slower)]")
            print(f"  P50 (Median):  {p50:>15.2f} ns ({p50 / 1e6:>8.2f} ms)", end='')
            if p50 > 0:
                print(f"  -> RM2.0 {p50 / 1e6:.2f} ms faster")
            elif p50 < 0:
                print(f"  -> RM2.0 {abs(p50) / 1e6:.2f} ms slower")
            else:
                print(f"  -> Same")

            print(f"  P90:           {p90:>15.2f} ns ({p90 / 1e6:>8.2f} ms)", end='')
            if p90 > 0:
                print(f"  -> RM2.0 {p90 / 1e6:.2f} ms faster")
            elif p90 < 0:
                print(f"  -> RM2.0 {abs(p90) / 1e6:.2f} ms slower")
            else:
                print(f"  -> Same")

            print(f"  P95:           {p95:>15.2f} ns ({p95 / 1e6:>8.2f} ms)", end='')
            if p95 > 0:
                print(f"  -> RM2.0 {p95 / 1e6:.2f} ms faster")
            elif p95 < 0:
                print(f"  -> RM2.0 {abs(p95) / 1e6:.2f} ms slower")
            else:
                print(f"  -> Same")

            print(f"  P99:           {p99:>15.2f} ns ({p99 / 1e6:>8.2f} ms)", end='')
            if p99 > 0:
                print(f"  -> RM2.0 {p99 / 1e6:.2f} ms faster")
            elif p99 < 0:
                print(f"  -> RM2.0 {abs(p99) / 1e6:.2f} ms slower")
            else:
                print(f"  -> Same")
        else:
            print("Warning: No matched data, please check if seq_num is consistent")
    except pd.errors.EmptyDataError:
        print("Error: File is empty", file=sys.stderr)
        sys.exit(1)
    except pd.errors.ParserError as e:
        print(f"Error: Failed to parse CSV file: {e}", file=sys.stderr)
        sys.exit(1)
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        sys.exit(1)

if __name__ == '__main__':
    main()

Performance Optimization Recommendations

CPU Affinity Settings

  • Main Thread: Bind to dedicated CPU core to avoid context switching
  • Write Thread: Bind to another CPU core to avoid competition with main thread

Important Notes

  1. Time Synchronization: Ensure all test machines are time-synchronized
  2. Network Latency: Consider the impact of network latency on test results
  3. Data Integrity: Ensure all receivers can receive complete data
  4. System Load: Avoid running other high-load programs during testing
  5. Binance WebSocket Limitations:
    • Binance WebSocket has connection frequency limits, avoid frequent reconnections
    • Recommend using stable network connections
    • Binance data may arrive slightly later than RapidMarket data, which is normal

Troubleshooting

Common Issues

  1. Compilation Errors: Ensure necessary dependency libraries are installed
  2. Connection Failures: Check network connections and service status
  3. Data Loss: Check system resources and network stability

Debug Options

  • Use -v parameter to enable verbose output
  • Check system log files
  • Monitor system resource usage

Quick Start Guide

  1. Install Python Dependencies:
# Run automatic installation script
chmod +x install_python_deps.sh
./install_python_deps.sh

# Or install manually
pip3 install websockets asyncio
  1. Run Binance WebSocket Receiver:
python3 binance_websocket_receiver.py btcusdt 0 1
  1. View Output Files:
ls -la binance_*.log

Quick Test Script

Create a test script to start all three receivers simultaneously:

#!/bin/bash
# test_latency.sh

echo "Starting latency comparison test..."

# Start RapidMarket 1.0
echo "Starting RapidMarket 1.0..."
./ltp_rm_1_0_receiver "tcp://localhost:50000" 0 1 &
RM1_PID=$!

# Start RapidMarket 2.0
echo "Starting RapidMarket 2.0..."
./ltp_rm_2_0_receiver "bf_m_pub" 0 2 &
RM2_PID=$!

# Start Binance WebSocket
echo "Starting Binance WebSocket..."
./binance_websocket_receiver "btcusdt" 0 3 &
BINANCE_PID=$!

echo "All receivers started. Press Ctrl+C to stop all processes."

# Wait for user interrupt
trap "echo 'Stopping all processes...'; kill $RM1_PID $RM2_PID $BINANCE_PID; exit" INT
wait

Usage:

chmod +x test_latency.sh
./test_latency.sh

Client Custom WebSocket Implementation Reference

If clients need to implement their own WebSocket receiver (supporting any programming language), they can refer to the implementation of binance_websocket_receiver.py:

Key Implementation Points:

  1. Unified Data Format: Ensure output CSV format is consistent with RapidMarket receivers
  2. Timestamp Precision: Use nanosecond-level timestamps (time.time() * 1_000_000_000)
  3. seq_num Processing: Use original exchange sequence numbers for easy matching with RapidMarket data
  4. Asynchronous Writing: Use queues to avoid blocking the main receiving thread
  5. Error Handling: Implement reconnection mechanisms and exception recovery
  6. Signal Handling: Support graceful exit

Output Format Requirements:

symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty

seq_num Field Description:

  • RapidMarket 1.0 Receiver: seq_num = channel_id * 1e18 + exchange_seq_num
    • Example: 2000000000000000123 (channel_id=2, exchange_seq_num=123)
    • Requires restoration: Use seq_num % 1e18 to extract original sequence number
  • RapidMarket 2.0 Receiver: seq_num = exchange_seq_num (original exchange sequence number)
    • Example: 123 (original exchange sequence number)
    • No restoration needed: Use directly
  • Binance WebSocket: seq_num = exchange_seq_num (original exchange sequence number)
    • Example: 123 (original exchange sequence number)
    • No restoration needed: Use directly
  • Client Custom Receiver: Recommend using original exchange sequence numbers for easy data matching

Example Code Structure (Python Reference Implementation):

# 1. Asynchronous file writer
class AsyncFileWriter:
    def enqueue_event(self, event):
        # Add event to queue

    def run(self):
        # Asynchronous write thread main loop

# 2. WebSocket receiver
class CustomWebSocketReceiver:
    def parse_and_write(self, data):
        # Parse data and create event
        event = {
            'symbol': ...,
            'seq_num': ...,
            'receive_ts': int(time.time() * 1_000_000_000),
            # ... other fields
        }
        self.async_writer.enqueue_event(event)

Other Programming Language Implementation Points:

  • Java: Use System.nanoTime() to get nanosecond timestamps
  • C++: Use std::chrono::high_resolution_clock to get nanosecond timestamps
  • Go: Use time.Now().UnixNano() to get nanosecond timestamps
  • Node.js: Use process.hrtime.bigint() to get nanosecond timestamps
  • C#: Use DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1000000 to get nanosecond timestamps

Technical Support

For any issues, please contact LiquidityTech technical support team.


LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions