Overview
This solution provides three market data receivers for latency comparison testing:
-
RapidMarket 1.0: Market data receiver based on Rapidmarket1.0 (ltp_rm_1_0_receiver.cpp)
-
RapidMarket 2.0: Market data receiver based on Rapidmarket2.0 (ltp_rm_2_0_receiver.cpp )
-
Binance WebSocket: Direct WebSocket receiver connecting to Binance exchange (binance_websocket_receiver.py )
Comparison Principle
Data matching is performed using the seq_num field to compare the receive_ts (receive timestamp) of the same market data across different receiving methods, thereby evaluating latency differences.
Important Note: seq_num Field Processing
RapidMarket 1.0 receiver's seq_num is processed twice:
- Format:
channel_id * 1e18 + exchange_seq_num - Where
channel_idis the exchangechannel idandexchange_seq_numis the original exchange sequence number
RapidMarket 2.0 and Binance WebSocket seq_num are original exchange sequence numbers:
- No restoration needed, use directly
Only RapidMarket 1.0 requires exchange_seq_num restoration for latency comparison testing:
# Extract exchange_seq_num from RapidMarket 1.0's seq_num
def extract_exchange_seq_num(rapidmarket_1_0_seq_num):
"""Extract original exchange sequence number from RapidMarket 1.0 seq_num"""
return rapidmarket_1_0_seq_num % int(1e18)
# Example
rapidmarket_1_0_seq_num = 2000000000000000123 # channel_id=2, exchange_seq_num=123
exchange_seq_num = extract_exchange_seq_num(rapidmarket_1_0_seq_num) # Result: 123
File Description
1. RapidMarket 1.0 Receiver (ltp_rm_1_0_receiver.cpp)
ltp_rm_1_0_receiver.cpp)Function: Receive market data through Rapidmarket1.0 and asynchronously write to file
Compilation:
protoc --cpp_out=. l2_book.proto
g++ -std=c++17 -Wall -O3 -I. -I./cppzmq ltp_rm_1_0_receiver.cpp l2_book.pb.cc -lzmq -lprotobuf -lpthread -o ltp_rm_1_0_receiver
Execution:
./ltp_rm_1_0_receiver <zmq_path> <main_thread_cpu_id> <print_latency_thread_cpu_id>
Example:
./ltp_rm_1_0_receiver "tcp://localhost:50000" 0 1
2. RapidMarket 2.0 Receiver (ltp_rm_2_0_receiver.cpp)
ltp_rm_2_0_receiver.cpp)Function: Receive market data through Rapidmarket2.0 and asynchronously write to file
Compilation:
g++ -o ltp_rm_2_0_receiver ltp_rm_2_0_receiver.cpp -lpthread -L/usr/local/lib -lucli_ffi
Execution:
./ltp_rm_2_0_receiver <topic> <main_thread_cpu_id> <print_latency_thread_cpu_id>
Example:
./ltp_rm_2_0_receiver "bf_pm_pub" 0 1
3. Binance WebSocket Receiver (binance_websocket_receiver.py)
binance_websocket_receiver.py)Function: Directly connect to Binance exchange WebSocket API to receive bookTicker data and asynchronously write to file
Dependency Installation:
# Install Python dependencies
pip install websockets asyncio
Execution:
python3 binance_websocket_receiver.py <symbol> <main_thread_cpu_id> <print_latency_thread_cpu_id>
Example:
python3 binance_websocket_receiver.py btcusdt 0 1
Supported Trading Pairs:
btcusdt,ethusdt,bnbusdt,adausdt,xrpusdt, etc.
Output Data Format
All three receivers generate date-named log files containing the following fields:
- RapidMarket 1.0:
rm_1_0_2025-01-15.log - RapidMarket 2.0:
rm_2_0_2025-01-15.log - Binance WebSocket:
binance_2025-01-15.log
symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty
Field Description
- symbol: Trading pair symbol
- seq_num: Sequence number (used for data matching)
- exchange_ts: Exchange timestamp
- local_ts: Local timestamp
- receive_ts: Receive timestamp (key comparison field)
- bid_price: Best bid price
- bid_qty: Best bid quantity
- ask_price: Best ask price
- ask_qty: Best ask quantity
Latency Comparison Test Steps
1. Environment Preparation
- Ensure RapidMarket service is running normally
- Prepare client's own WebSocket direct connection program
- Start all market data receiving services
2. Start Testing
Start all three receivers simultaneously:
# RapidMarket 1.0
./ltp_rm_1_0_receiver "tcp://localhost:50000" 0 1 &
# RapidMarket 2.0
./ltp_rm_2_0_receiver "bf_m_pub" 0 2 &
# Binance WebSocket Direct Connection
python3 binance_websocket_receiver.py btcusdt 0 3 &
3. Data Collection
After running for a period, stop all programs and collect the following data files:
- RapidMarket 1.0:
rm_1_0_2025-01-15.log - RapidMarket 2.0:
rm_2_0_2025-01-15.log - Binance WebSocket:
binance_2025-01-15.log
4. Latency Analysis
Use seq_num for data matching and compare receive_ts of the same market data. The following example shows latency comparison between RM 2.0 and WebSocket:
# Latency comparison analysis script
import pandas as pd
import argparse
import sys
import os
def main():
parser = argparse.ArgumentParser(
description='Compare latency data between RapidMarket 2.0 and Binance WebSocket',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Usage examples:
python3 latency_compare.py rm_2_0_2025-01-15.log binance_2025-01-15.log
python3 latency_compare.py -r rm_2_0.log -b binance.log
"""
)
parser.add_argument(
'rm2_file',
nargs='?',
default=None,
help='RapidMarket 2.0 log file path'
)
parser.add_argument(
'binance_file',
nargs='?',
default=None,
help='Binance WebSocket log file path'
)
parser.add_argument(
'-r', '--rm2',
dest='rm2_file_opt',
help='RapidMarket 2.0 log file path (optional)'
)
parser.add_argument(
'-b', '--binance',
dest='binance_file_opt',
help='Binance WebSocket log file path (optional)'
)
args = parser.parse_args()
# Determine file paths (prefer option arguments)
rm2_file = args.rm2_file_opt or args.rm2_file
binance_file = args.binance_file_opt or args.binance_file
# Check arguments
if not rm2_file:
parser.error('RapidMarket 2.0 log file path is required')
if not binance_file:
parser.error('Binance WebSocket log file path is required')
# Check if files exist
if not os.path.exists(rm2_file):
print(f"Error: File not found: {rm2_file}", file=sys.stderr)
sys.exit(1)
if not os.path.exists(binance_file):
print(f"Error: File not found: {binance_file}", file=sys.stderr)
sys.exit(1)
print(f"Reading RapidMarket 2.0 data: {rm2_file}")
print(f"Reading Binance WebSocket data: {binance_file}\n")
try:
# Read data
rm2_data = pd.read_csv(rm2_file)
binance_data = pd.read_csv(binance_file)
print(f"RapidMarket 2.0 data count: {len(rm2_data)}")
print(f"Binance WebSocket data count: {len(binance_data)}\n")
# Calculate individual latency (receive_ts - exchange_ts) for each receiver
# Note: exchange_ts is in milliseconds, receive_ts is in nanoseconds
# Convert exchange_ts to nanoseconds by multiplying by 1e6
rm2_data['latency'] = rm2_data['receive_ts'] - (rm2_data['exchange_ts'] * 1e6)
binance_data['latency'] = binance_data['receive_ts'] - (binance_data['exchange_ts'] * 1e6)
# Print individual latency statistics
print("=" * 70)
print("Individual Latency Statistics (receive_ts - exchange_ts)")
print("=" * 70)
# RapidMarket 2.0 latency percentiles
rm2_p50 = rm2_data['latency'].quantile(0.50)
rm2_p90 = rm2_data['latency'].quantile(0.90)
rm2_p95 = rm2_data['latency'].quantile(0.95)
rm2_p99 = rm2_data['latency'].quantile(0.99)
print(f"\n[RapidMarket 2.0 Latency Percentiles]")
print(f" P50 (Median): {rm2_p50:>15.2f} ns ({rm2_p50 / 1e6:>8.2f} ms)")
print(f" P90: {rm2_p90:>15.2f} ns ({rm2_p90 / 1e6:>8.2f} ms)")
print(f" P95: {rm2_p95:>15.2f} ns ({rm2_p95 / 1e6:>8.2f} ms)")
print(f" P99: {rm2_p99:>15.2f} ns ({rm2_p99 / 1e6:>8.2f} ms)")
# Binance WebSocket latency percentiles
binance_p50 = binance_data['latency'].quantile(0.50)
binance_p90 = binance_data['latency'].quantile(0.90)
binance_p95 = binance_data['latency'].quantile(0.95)
binance_p99 = binance_data['latency'].quantile(0.99)
print(f"\n[Binance WebSocket Latency Percentiles]")
print(f" P50 (Median): {binance_p50:>15.2f} ns ({binance_p50 / 1e6:>8.2f} ms)")
print(f" P90: {binance_p90:>15.2f} ns ({binance_p90 / 1e6:>8.2f} ms)")
print(f" P95: {binance_p95:>15.2f} ns ({binance_p95 / 1e6:>8.2f} ms)")
print(f" P99: {binance_p99:>15.2f} ns ({binance_p99 / 1e6:>8.2f} ms)")
# Compare RapidMarket 2.0 vs Binance WebSocket (RM 2.0's seq_num is the original sequence number)
rm2_vs_binance = rm2_data.merge(binance_data, on='seq_num', suffixes=('_rm2', '_binance'))
# Calculate latency difference: positive value means RM2.0 is faster (WebSocket receive time - RM2.0 receive time)
rm2_vs_binance['latency_diff'] = rm2_vs_binance['receive_ts_binance'] - rm2_vs_binance['receive_ts_rm2']
# RM2.0 advantage (how much faster RM2.0 is compared to WebSocket)
rm2_vs_binance['rm2_advantage'] = rm2_vs_binance['latency_diff']
# Statistics on latency distribution
print("=" * 70)
print("RapidMarket 2.0 vs Binance WebSocket Latency Comparison Analysis")
print("=" * 70)
print(f"Matched data count: {len(rm2_vs_binance)}")
if len(rm2_vs_binance) > 0:
# Analyze RM2.0 advantage scenarios
rm2_faster = rm2_vs_binance[rm2_vs_binance['rm2_advantage'] > 0]
rm2_slower = rm2_vs_binance[rm2_vs_binance['rm2_advantage'] < 0]
rm2_same = rm2_vs_binance[rm2_vs_binance['rm2_advantage'] == 0]
print(f"\n[Latency Advantage Statistics]")
print(f" RM2.0 faster cases: {len(rm2_faster)} ({len(rm2_faster)/len(rm2_vs_binance)*100:.2f}%)")
print(f" WebSocket faster cases: {len(rm2_slower)} ({len(rm2_slower)/len(rm2_vs_binance)*100:.2f}%)")
if len(rm2_same) > 0:
print(f" Received simultaneously: {len(rm2_same)} ({len(rm2_same)/len(rm2_vs_binance)*100:.2f}%)")
# RM2.0 average advantage
avg_advantage = rm2_vs_binance['rm2_advantage'].mean()
median_advantage = rm2_vs_binance['rm2_advantage'].median()
print(f"\n[RM2.0 Latency Advantage (how much faster than WebSocket)]")
if avg_advantage > 0:
print(f" Average advantage: RM2.0 is {avg_advantage:.2f} ns ({avg_advantage / 1e6:.2f} ms) faster than WebSocket")
else:
print(f" Average disadvantage: RM2.0 is {abs(avg_advantage):.2f} ns ({abs(avg_advantage) / 1e6:.2f} ms) slower than WebSocket")
if median_advantage > 0:
print(f" Median advantage: RM2.0 is {median_advantage:.2f} ns ({median_advantage / 1e6:.2f} ms) faster than WebSocket")
else:
print(f" Median disadvantage: RM2.0 is {abs(median_advantage):.2f} ns ({abs(median_advantage) / 1e6:.2f} ms) slower than WebSocket")
# Basic statistics
print(f"\n[Latency Difference Statistics (WebSocket - RM2.0)]")
print(f" Mean: {rm2_vs_binance['latency_diff'].mean():.2f} ns ({rm2_vs_binance['latency_diff'].mean() / 1e6:.2f} ms)")
print(f" Median: {rm2_vs_binance['latency_diff'].median():.2f} ns ({rm2_vs_binance['latency_diff'].median() / 1e6:.2f} ms)")
print(f" Min: {rm2_vs_binance['latency_diff'].min():.2f} ns ({rm2_vs_binance['latency_diff'].min() / 1e6:.2f} ms)")
print(f" Max: {rm2_vs_binance['latency_diff'].max():.2f} ns ({rm2_vs_binance['latency_diff'].max() / 1e6:.2f} ms)")
print(f" Std Dev: {rm2_vs_binance['latency_diff'].std():.2f} ns ({rm2_vs_binance['latency_diff'].std() / 1e6:.2f} ms)")
# Calculate percentiles
p50 = rm2_vs_binance['latency_diff'].quantile(0.50)
p90 = rm2_vs_binance['latency_diff'].quantile(0.90)
p95 = rm2_vs_binance['latency_diff'].quantile(0.95)
p99 = rm2_vs_binance['latency_diff'].quantile(0.99)
print(f"\n[Latency Difference Percentiles (positive=RM2.0 faster, negative=RM2.0 slower)]")
print(f" P50 (Median): {p50:>15.2f} ns ({p50 / 1e6:>8.2f} ms)", end='')
if p50 > 0:
print(f" -> RM2.0 {p50 / 1e6:.2f} ms faster")
elif p50 < 0:
print(f" -> RM2.0 {abs(p50) / 1e6:.2f} ms slower")
else:
print(f" -> Same")
print(f" P90: {p90:>15.2f} ns ({p90 / 1e6:>8.2f} ms)", end='')
if p90 > 0:
print(f" -> RM2.0 {p90 / 1e6:.2f} ms faster")
elif p90 < 0:
print(f" -> RM2.0 {abs(p90) / 1e6:.2f} ms slower")
else:
print(f" -> Same")
print(f" P95: {p95:>15.2f} ns ({p95 / 1e6:>8.2f} ms)", end='')
if p95 > 0:
print(f" -> RM2.0 {p95 / 1e6:.2f} ms faster")
elif p95 < 0:
print(f" -> RM2.0 {abs(p95) / 1e6:.2f} ms slower")
else:
print(f" -> Same")
print(f" P99: {p99:>15.2f} ns ({p99 / 1e6:>8.2f} ms)", end='')
if p99 > 0:
print(f" -> RM2.0 {p99 / 1e6:.2f} ms faster")
elif p99 < 0:
print(f" -> RM2.0 {abs(p99) / 1e6:.2f} ms slower")
else:
print(f" -> Same")
else:
print("Warning: No matched data, please check if seq_num is consistent")
except pd.errors.EmptyDataError:
print("Error: File is empty", file=sys.stderr)
sys.exit(1)
except pd.errors.ParserError as e:
print(f"Error: Failed to parse CSV file: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
Performance Optimization Recommendations
CPU Affinity Settings
- Main Thread: Bind to dedicated CPU core to avoid context switching
- Write Thread: Bind to another CPU core to avoid competition with main thread
Important Notes
- Time Synchronization: Ensure all test machines are time-synchronized
- Network Latency: Consider the impact of network latency on test results
- Data Integrity: Ensure all receivers can receive complete data
- System Load: Avoid running other high-load programs during testing
- Binance WebSocket Limitations:
- Binance WebSocket has connection frequency limits, avoid frequent reconnections
- Recommend using stable network connections
- Binance data may arrive slightly later than RapidMarket data, which is normal
Troubleshooting
Common Issues
- Compilation Errors: Ensure necessary dependency libraries are installed
- Connection Failures: Check network connections and service status
- Data Loss: Check system resources and network stability
Debug Options
- Use
-vparameter to enable verbose output - Check system log files
- Monitor system resource usage
Quick Start Guide
- Install Python Dependencies:
# Run automatic installation script
chmod +x install_python_deps.sh
./install_python_deps.sh
# Or install manually
pip3 install websockets asyncio
- Run Binance WebSocket Receiver:
python3 binance_websocket_receiver.py btcusdt 0 1
- View Output Files:
ls -la binance_*.log
Quick Test Script
Create a test script to start all three receivers simultaneously:
#!/bin/bash
# test_latency.sh
echo "Starting latency comparison test..."
# Start RapidMarket 1.0
echo "Starting RapidMarket 1.0..."
./ltp_rm_1_0_receiver "tcp://localhost:50000" 0 1 &
RM1_PID=$!
# Start RapidMarket 2.0
echo "Starting RapidMarket 2.0..."
./ltp_rm_2_0_receiver "bf_m_pub" 0 2 &
RM2_PID=$!
# Start Binance WebSocket
echo "Starting Binance WebSocket..."
./binance_websocket_receiver "btcusdt" 0 3 &
BINANCE_PID=$!
echo "All receivers started. Press Ctrl+C to stop all processes."
# Wait for user interrupt
trap "echo 'Stopping all processes...'; kill $RM1_PID $RM2_PID $BINANCE_PID; exit" INT
wait
Usage:
chmod +x test_latency.sh
./test_latency.sh
Client Custom WebSocket Implementation Reference
If clients need to implement their own WebSocket receiver (supporting any programming language), they can refer to the implementation of binance_websocket_receiver.py:
Key Implementation Points:
- Unified Data Format: Ensure output CSV format is consistent with RapidMarket receivers
- Timestamp Precision: Use nanosecond-level timestamps (
time.time() * 1_000_000_000) - seq_num Processing: Use original exchange sequence numbers for easy matching with RapidMarket data
- Asynchronous Writing: Use queues to avoid blocking the main receiving thread
- Error Handling: Implement reconnection mechanisms and exception recovery
- Signal Handling: Support graceful exit
Output Format Requirements:
symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty
seq_num Field Description:
- RapidMarket 1.0 Receiver:
seq_num = channel_id * 1e18 + exchange_seq_num- Example:
2000000000000000123(channel_id=2, exchange_seq_num=123) - Requires restoration: Use
seq_num % 1e18to extract original sequence number
- Example:
- RapidMarket 2.0 Receiver:
seq_num = exchange_seq_num(original exchange sequence number)- Example:
123(original exchange sequence number) - No restoration needed: Use directly
- Example:
- Binance WebSocket:
seq_num = exchange_seq_num(original exchange sequence number)- Example:
123(original exchange sequence number) - No restoration needed: Use directly
- Example:
- Client Custom Receiver: Recommend using original exchange sequence numbers for easy data matching
Example Code Structure (Python Reference Implementation):
# 1. Asynchronous file writer
class AsyncFileWriter:
def enqueue_event(self, event):
# Add event to queue
def run(self):
# Asynchronous write thread main loop
# 2. WebSocket receiver
class CustomWebSocketReceiver:
def parse_and_write(self, data):
# Parse data and create event
event = {
'symbol': ...,
'seq_num': ...,
'receive_ts': int(time.time() * 1_000_000_000),
# ... other fields
}
self.async_writer.enqueue_event(event)
Other Programming Language Implementation Points:
- Java: Use
System.nanoTime()to get nanosecond timestamps - C++: Use
std::chrono::high_resolution_clockto get nanosecond timestamps - Go: Use
time.Now().UnixNano()to get nanosecond timestamps - Node.js: Use
process.hrtime.bigint()to get nanosecond timestamps - C#: Use
DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1000000to get nanosecond timestamps
Technical Support
For any issues, please contact LiquidityTech technical support team.
LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
