API Reference

ltp_rm_2_0_receiver

/**
 * @file ltp_rm_2_0_receiver.cpp
 * Market data receiver test program that demonstrates how to receive and process
 * market data through the topic-based messaging system.
 *
 * @copyright Copyright (c) 2025 LiquidityTech. All rights reserved.
 *
 * This software is proprietary and confidential. Unauthorized copying,
 * distribution, or modification of this file is strictly prohibited.
 *
 * For more information, visit: https://www.liquiditytech.com/
 *
 * LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
 * Trusted execution, custody, clearing, and financing solutions for digital assets.
 */

 #include "ltp_ucli_ffi.h"
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <unistd.h>
 #include <signal.h>
 #include <time.h>
 #include <string>
 #include <iostream>
 #include <fstream>
 #include <iomanip>
 #include <sstream>
 #include <thread>
 #include <chrono>
 #include <atomic>
 #include <pthread.h>
 #include <sys/syscall.h>
 #include "concurrentqueue.h"

 // Global variables for graceful shutdown
 volatile int running = 1;
 struct UltraMarketReceiver* receiver = NULL;

 // Event structure for market data
 struct Event {
     uint64_t local_ts;
     uint64_t exchange_ts;
     uint64_t seq_num;
     uint64_t receive_ts;
     double bid_price;
     double bid_qty;
     double ask_price;
     double ask_qty;
     std::string symbol;
 };

 // Async file writer class
 class AsyncFileWriter {
 public:
     AsyncFileWriter() : latency_queue_(1000000UL) {}

     void enqueueEvent(const Event& event) {
         latency_queue_.enqueue(event);
     }

     void run() {
         Event event;
         std::string currentDate = getCurrentDateString();
         std::ofstream outFile = openLogFile(currentDate);

         if (!outFile.is_open()) {
             std::cerr << "Failed to open log file: " << currentDate << ".log" << std::endl;
             return;
         }

         outFile << "symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty" << std::endl;

         while (running) {
             if (latency_queue_.try_dequeue(event)) {
                 std::string newDate = getCurrentDateString();
                 if (newDate != currentDate) {
                     outFile.close();
                     currentDate = newDate;
                     outFile = openLogFile(currentDate);
                 }

                 outFile << event.symbol << ","
                         << event.seq_num << ","
                         << event.exchange_ts << ","
                         << event.local_ts << ","
                         << event.receive_ts << ","
                         << event.bid_price << ","
                         << event.bid_qty << ","
                         << event.ask_price << ","
                         << event.ask_qty << std::endl;
             }
         }

         outFile.close();
     }

 private:
     std::string getCurrentDateString() {
         auto t = std::time(nullptr);
         auto tm = *std::localtime(&t);
         std::ostringstream oss;
         oss << std::put_time(&tm, "%Y-%m-%d");
         return oss.str();
     }

     std::ofstream openLogFile(const std::string& date) {
         std::string fileName = "rm_2_0_" + date + ".log";
         return std::ofstream(fileName, std::ios::app);
     }

     moodycamel::ConcurrentQueue<Event> latency_queue_;
 };

 // Global async writer instance
 AsyncFileWriter async_writer;

 // Data type enumeration
 enum DataType { bookTicker, Trade };

 // Time utility functions
 uint64_t getCurrentTime() {
     struct timespec ts;
     clock_gettime(CLOCK_REALTIME, &ts);
     return ((ts.tv_sec * 1000000000) + ts.tv_nsec);
 }

 // String conversion functions
 int64_t string_to_int64(const std::string& str) {
     try {
         return std::stoll(str);
     } catch (const std::exception& e) {
         return 0;
     }
 }

 double string_to_double(const std::string& str) {
     try {
         return std::stod(str);
     } catch (const std::exception& e) {
         return 0.0;
     }
 }

 // Signal handler for graceful shutdown
 void signal_handler(int sig) {
     printf("\nReceived signal %d, shutting down market data receiver...\n", sig);
     running = 0;
 }

 // Function to get current timestamp
 void get_timestamp(char* buffer, size_t size) {
     time_t now = time(NULL);
     struct tm* tm_info = localtime(&now);
     strftime(buffer, size, "%Y-%m-%d %H:%M:%S", tm_info);
 }

 // Parse WebSocket data and create Event for async writing
 // {"e":"bookTicker","u":7645390170525,"s":"BTCUSDT","b":"108946.90","B":"11.598","a":"108947.00","A":"7.239","T":1748411287461,"E":1748411287462}
 void parse_bookTicker_and_write(const std::string& data) {
     size_t pos = 0;
     Event event;
     event.receive_ts = getCurrentTime();

     while (pos < data.size()) {
         size_t key_start = data.find("\"", pos);
         if (key_start == std::string::npos) break;
         size_t key_end = data.find("\"", key_start + 1);
         if (key_end == std::string::npos) break;
         std::string key = data.substr(key_start + 1, key_end - key_start - 1);

         size_t value_start = data.find(":", key_end) + 1;
         if (value_start == std::string::npos) break;
         size_t value_end = data.find(",", value_start);
         if (value_end == std::string::npos) value_end = data.find("}", value_start);
         if (value_end == std::string::npos) break;
         std::string value = data.substr(value_start, value_end - value_start);
         value.erase(remove(value.begin(), value.end(), '\"'), value.end());

         if (key == "b") {
             event.bid_price = string_to_double(value);
         } else if (key == "a") {
             event.ask_price = string_to_double(value);
         } else if (key == "B") {
             event.bid_qty = string_to_double(value);
         } else if (key == "A") {
             event.ask_qty = string_to_double(value);
         } else if (key == "T") {
             event.exchange_ts = string_to_int64(value);
         } else if (key == "s") {
             event.symbol = value;
         } else if (key == "t" || key == "u") {
             event.seq_num = string_to_int64(value);
         }

         pos = value_end + 1;
     }

     // Set local timestamp (same as receive_ts for now)
     event.local_ts = getCurrentTime();

     // Enqueue event for async writing
     async_writer.enqueueEvent(event);
 }

 int main(int argc, char* argv[]) {
     if (argc < 3) {
         std::cerr << "Usage: ./ltp_rm_2_0_receiver <topic> <main_thread_cpu_id> <print_latency_thread_cpu_id>" << std::endl;
         std::cerr << "Example: ./ltp_rm_2_0_receiver \"bf_pm_pub\" 0 1" << std::endl;
         return -1;
     }

     const char* topic = argv[1];
     int main_thread_cpu_id = std::stoi(argv[2]);
     int print_latency_thread_cpu_id = std::stoi(argv[3]);

     char timestamp[32];
     unsigned char buf[8192];
     unsigned long long total_messages = 0;
     unsigned long long total_bytes = 0;
     time_t start_time = time(NULL);
     int stats_interval = 10; // Display stats every 10 messages


     std::cerr << "main thread_cpu_id: " << main_thread_cpu_id << std::endl;
     std::cerr << "print_latency_thread_cpu_id: " << print_latency_thread_cpu_id << std::endl;

     // Set CPU affinity for main thread
     if (main_thread_cpu_id != -1) {
         cpu_set_t cpuset;
         CPU_ZERO(&cpuset);
         CPU_SET(main_thread_cpu_id, &cpuset);
         if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {
             std::cerr << "Error setting CPU affinity for main thread" << std::endl;
             return -1;
         }
     }

     // Set up signal handlers
     signal(SIGINT, signal_handler);
     signal(SIGTERM, signal_handler);

     // Create market data receiver
     receiver = init_market_receiver(topic, strlen(topic));
     if (!receiver) {
         fprintf(stderr, "Failed to create market receiver for topic: %s\n", topic);
         return 1;
     }

     printf("Market data receiver initialized successfully\n");
     printf("Topic: %s\n", topic);
     printf("Press Ctrl+C to stop\n");

     // Start async file writer thread
     std::thread async_writer_thread(&AsyncFileWriter::run, &async_writer);

     // Set CPU affinity for async writer thread
     if (print_latency_thread_cpu_id != -1) {
         cpu_set_t cpuset;
         CPU_ZERO(&cpuset);
         CPU_SET(print_latency_thread_cpu_id, &cpuset);
         if (pthread_setaffinity_np(async_writer_thread.native_handle(), sizeof(cpu_set_t), &cpuset) != 0) {
             std::cerr << "Error setting CPU affinity for async writer thread" << std::endl;
             return -1;
         }
     }

     // Main market data receiving loop
     while (running) {
         unsigned int size = receive_market(receiver, (char*)buf);
         if (size > 0) {
             // Parse and enqueue for async writing
             std::string data_str(reinterpret_cast<char*>(buf), size);
             parse_bookTicker_and_write(data_str);
         }
     }

     // Wait for async writer thread to finish
     async_writer_thread.join();

     printf("Market data receiver stopped\n");
     return 0;
 }