/**
* @file ltp_rm_2_0_receiver.cpp
* Market data receiver test program that demonstrates how to receive and process
* market data through the topic-based messaging system.
*
* @copyright Copyright (c) 2025 LiquidityTech. All rights reserved.
*
* This software is proprietary and confidential. Unauthorized copying,
* distribution, or modification of this file is strictly prohibited.
*
* For more information, visit: https://www.liquiditytech.com/
*
* LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
* Trusted execution, custody, clearing, and financing solutions for digital assets.
*/
#include "ltp_ucli_ffi.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <string>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <sstream>
#include <thread>
#include <chrono>
#include <atomic>
#include <pthread.h>
#include <sys/syscall.h>
#include "concurrentqueue.h"
// Global variables for graceful shutdown
volatile int running = 1;
struct UltraMarketReceiver* receiver = NULL;
// Event structure for market data
struct Event {
uint64_t local_ts;
uint64_t exchange_ts;
uint64_t seq_num;
uint64_t receive_ts;
double bid_price;
double bid_qty;
double ask_price;
double ask_qty;
std::string symbol;
};
// Async file writer class
class AsyncFileWriter {
public:
AsyncFileWriter() : latency_queue_(1000000UL) {}
void enqueueEvent(const Event& event) {
latency_queue_.enqueue(event);
}
void run() {
Event event;
std::string currentDate = getCurrentDateString();
std::ofstream outFile = openLogFile(currentDate);
if (!outFile.is_open()) {
std::cerr << "Failed to open log file: " << currentDate << ".log" << std::endl;
return;
}
outFile << "symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty" << std::endl;
while (running) {
if (latency_queue_.try_dequeue(event)) {
std::string newDate = getCurrentDateString();
if (newDate != currentDate) {
outFile.close();
currentDate = newDate;
outFile = openLogFile(currentDate);
}
outFile << event.symbol << ","
<< event.seq_num << ","
<< event.exchange_ts << ","
<< event.local_ts << ","
<< event.receive_ts << ","
<< event.bid_price << ","
<< event.bid_qty << ","
<< event.ask_price << ","
<< event.ask_qty << std::endl;
}
}
outFile.close();
}
private:
std::string getCurrentDateString() {
auto t = std::time(nullptr);
auto tm = *std::localtime(&t);
std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d");
return oss.str();
}
std::ofstream openLogFile(const std::string& date) {
std::string fileName = "rm_2_0_" + date + ".log";
return std::ofstream(fileName, std::ios::app);
}
moodycamel::ConcurrentQueue<Event> latency_queue_;
};
// Global async writer instance
AsyncFileWriter async_writer;
// Data type enumeration
enum DataType { bookTicker, Trade };
// Time utility functions
uint64_t getCurrentTime() {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return ((ts.tv_sec * 1000000000) + ts.tv_nsec);
}
// String conversion functions
int64_t string_to_int64(const std::string& str) {
try {
return std::stoll(str);
} catch (const std::exception& e) {
return 0;
}
}
double string_to_double(const std::string& str) {
try {
return std::stod(str);
} catch (const std::exception& e) {
return 0.0;
}
}
// Signal handler for graceful shutdown
void signal_handler(int sig) {
printf("\nReceived signal %d, shutting down market data receiver...\n", sig);
running = 0;
}
// Function to get current timestamp
void get_timestamp(char* buffer, size_t size) {
time_t now = time(NULL);
struct tm* tm_info = localtime(&now);
strftime(buffer, size, "%Y-%m-%d %H:%M:%S", tm_info);
}
// Parse WebSocket data and create Event for async writing
// {"e":"bookTicker","u":7645390170525,"s":"BTCUSDT","b":"108946.90","B":"11.598","a":"108947.00","A":"7.239","T":1748411287461,"E":1748411287462}
void parse_bookTicker_and_write(const std::string& data) {
size_t pos = 0;
Event event;
event.receive_ts = getCurrentTime();
while (pos < data.size()) {
size_t key_start = data.find("\"", pos);
if (key_start == std::string::npos) break;
size_t key_end = data.find("\"", key_start + 1);
if (key_end == std::string::npos) break;
std::string key = data.substr(key_start + 1, key_end - key_start - 1);
size_t value_start = data.find(":", key_end) + 1;
if (value_start == std::string::npos) break;
size_t value_end = data.find(",", value_start);
if (value_end == std::string::npos) value_end = data.find("}", value_start);
if (value_end == std::string::npos) break;
std::string value = data.substr(value_start, value_end - value_start);
value.erase(remove(value.begin(), value.end(), '\"'), value.end());
if (key == "b") {
event.bid_price = string_to_double(value);
} else if (key == "a") {
event.ask_price = string_to_double(value);
} else if (key == "B") {
event.bid_qty = string_to_double(value);
} else if (key == "A") {
event.ask_qty = string_to_double(value);
} else if (key == "T") {
event.exchange_ts = string_to_int64(value);
} else if (key == "s") {
event.symbol = value;
} else if (key == "t" || key == "u") {
event.seq_num = string_to_int64(value);
}
pos = value_end + 1;
}
// Set local timestamp (same as receive_ts for now)
event.local_ts = getCurrentTime();
// Enqueue event for async writing
async_writer.enqueueEvent(event);
}
static uint64_t getCurrentTime(){
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return ((ts.tv_sec * 1000000000) + ts.tv_nsec );
}
int main(int argc, char* argv[]) {
if (argc < 3) {
std::cerr << "Usage: ltp_md_latency <topic> <main_thread_cpu_id> <print_latency_thread_cpu_id>" << std::endl;
std::cerr << "Example: ltp_md_latency \"bf_m_pub\" 0 1" << std::endl;
return -1;
}
const char* topic = argv[1];
int main_thread_cpu_id = std::stoi(argv[2]);
int print_latency_thread_cpu_id = std::stoi(argv[3]);
char timestamp[32];
unsigned char buf[8192];
unsigned long long total_messages = 0;
unsigned long long total_bytes = 0;
time_t start_time = time(NULL);
int stats_interval = 10; // Display stats every 10 messages
std::cerr << "main thread_cpu_id: " << main_thread_cpu_id << std::endl;
std::cerr << "print_latency_thread_cpu_id: " << print_latency_thread_cpu_id << std::endl;
// Set CPU affinity for main thread
if (main_thread_cpu_id != -1) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(main_thread_cpu_id, &cpuset);
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {
std::cerr << "Error setting CPU affinity for main thread" << std::endl;
return -1;
}
}
// Set up signal handlers
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
// Create market data receiver
receiver = init_market_receiver(topic, strlen(topic));
if (!receiver) {
fprintf(stderr, "Failed to create market receiver for topic: %s\n", topic);
return 1;
}
printf("Market data receiver initialized successfully\n");
printf("Topic: %s\n", topic);
printf("Press Ctrl+C to stop\n");
// Start async file writer thread
std::thread async_writer_thread(&AsyncFileWriter::run, &async_writer);
// Set CPU affinity for async writer thread
if (print_latency_thread_cpu_id != -1) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(print_latency_thread_cpu_id, &cpuset);
if (pthread_setaffinity_np(async_writer_thread.native_handle(), sizeof(cpu_set_t), &cpuset) != 0) {
std::cerr << "Error setting CPU affinity for async writer thread" << std::endl;
return -1;
}
}
// Main market data receiving loop
while (running) {
unsigned int size = receive_market(receiver, (char*)buf);
if (size > 0) {
// Parse and enqueue for async writing
std::string data_str(reinterpret_cast<char*>(buf), size);
parse_bookTicker_and_write(data_str);
}
}
// Wait for async writer thread to finish
async_writer_thread.join();
printf("Market data receiver stopped\n");
return 0;
}