API Reference

ltp_rm_2_0_receiver

/**
 * @file ltp_rm_2_0_receiver.cpp
 * Market data receiver test program that demonstrates how to receive and process
 * market data through the topic-based messaging system.
 *
 * @copyright Copyright (c) 2025 LiquidityTech. All rights reserved.
 *
 * This software is proprietary and confidential. Unauthorized copying,
 * distribution, or modification of this file is strictly prohibited.
 *
 * For more information, visit: https://www.liquiditytech.com/
 *
 * LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
 * Trusted execution, custody, clearing, and financing solutions for digital assets.
 */

#include "ltp_ucli_ffi.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <signal.h>
#include <time.h>
#include <string>
#include <iostream>
#include <fstream>
#include <iomanip>
#include <sstream>
#include <thread>
#include <chrono>
#include <atomic>
#include <pthread.h>
#include <sys/syscall.h>
#include "concurrentqueue.h"

// Global variables for graceful shutdown
volatile int running = 1;
struct UltraMarketReceiver* receiver = NULL;

// Event structure for market data
struct Event {
    uint64_t local_ts;
    uint64_t exchange_ts;
    uint64_t seq_num;
    uint64_t receive_ts;
    double bid_price;
    double bid_qty;
    double ask_price;
    double ask_qty;
    std::string symbol;
};

// Async file writer class
class AsyncFileWriter {
public:
    AsyncFileWriter() : latency_queue_(1000000UL) {}

    void enqueueEvent(const Event& event) {
        latency_queue_.enqueue(event);
    }

    void run() {
        Event event;
        std::string currentDate = getCurrentDateString();
        std::ofstream outFile = openLogFile(currentDate);

        if (!outFile.is_open()) {
            std::cerr << "Failed to open log file: " << currentDate << ".log" << std::endl;
            return;
        }

        outFile << "symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty" << std::endl;

        while (running) {
            if (latency_queue_.try_dequeue(event)) {
                std::string newDate = getCurrentDateString();
                if (newDate != currentDate) {
                    outFile.close();
                    currentDate = newDate;
                    outFile = openLogFile(currentDate);
                }

                outFile << event.symbol << ","
                        << event.seq_num << ","
                        << event.exchange_ts << ","
                        << event.local_ts << ","
                        << event.receive_ts << ","
                        << event.bid_price << ","
                        << event.bid_qty << ","
                        << event.ask_price << ","
                        << event.ask_qty << std::endl;
            }
        }

        outFile.close();
    }

private:
    std::string getCurrentDateString() {
        auto t = std::time(nullptr);
        auto tm = *std::localtime(&t);
        std::ostringstream oss;
        oss << std::put_time(&tm, "%Y-%m-%d");
        return oss.str();
    }

    std::ofstream openLogFile(const std::string& date) {
        std::string fileName = "rm_2_0_" + date + ".log";
        return std::ofstream(fileName, std::ios::app);
    }

    moodycamel::ConcurrentQueue<Event> latency_queue_;
};

// Global async writer instance
AsyncFileWriter async_writer;

// Data type enumeration
enum DataType { bookTicker, Trade };

// Time utility functions
uint64_t getCurrentTime() {
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    return ((ts.tv_sec * 1000000000) + ts.tv_nsec);
}

// String conversion functions
int64_t string_to_int64(const std::string& str) {
    try {
        return std::stoll(str);
    } catch (const std::exception& e) {
        return 0;
    }
}

double string_to_double(const std::string& str) {
    try {
        return std::stod(str);
    } catch (const std::exception& e) {
        return 0.0;
    }
}

// Signal handler for graceful shutdown
void signal_handler(int sig) {
    printf("\nReceived signal %d, shutting down market data receiver...\n", sig);
    running = 0;
}

// Function to get current timestamp
void get_timestamp(char* buffer, size_t size) {
    time_t now = time(NULL);
    struct tm* tm_info = localtime(&now);
    strftime(buffer, size, "%Y-%m-%d %H:%M:%S", tm_info);
}

// Parse WebSocket data and create Event for async writing
// {"e":"bookTicker","u":7645390170525,"s":"BTCUSDT","b":"108946.90","B":"11.598","a":"108947.00","A":"7.239","T":1748411287461,"E":1748411287462}
void parse_bookTicker_and_write(const std::string& data) {
    size_t pos = 0;
    Event event;
    event.receive_ts = getCurrentTime();

    while (pos < data.size()) {
        size_t key_start = data.find("\"", pos);
        if (key_start == std::string::npos) break;
        size_t key_end = data.find("\"", key_start + 1);
        if (key_end == std::string::npos) break;
        std::string key = data.substr(key_start + 1, key_end - key_start - 1);

        size_t value_start = data.find(":", key_end) + 1;
        if (value_start == std::string::npos) break;
        size_t value_end = data.find(",", value_start);
        if (value_end == std::string::npos) value_end = data.find("}", value_start);
        if (value_end == std::string::npos) break;
        std::string value = data.substr(value_start, value_end - value_start);
        value.erase(remove(value.begin(), value.end(), '\"'), value.end());

        if (key == "b") {
            event.bid_price = string_to_double(value);
        } else if (key == "a") {
            event.ask_price = string_to_double(value);
        } else if (key == "B") {
            event.bid_qty = string_to_double(value);
        } else if (key == "A") {
            event.ask_qty = string_to_double(value);
        } else if (key == "T") {
            event.exchange_ts = string_to_int64(value);
        } else if (key == "s") {
            event.symbol = value;
        } else if (key == "t" || key == "u") {
            event.seq_num = string_to_int64(value);
        }

        pos = value_end + 1;
    }

    // Set local timestamp (same as receive_ts for now)
    event.local_ts = getCurrentTime();

    // Enqueue event for async writing
    async_writer.enqueueEvent(event);
}

static uint64_t getCurrentTime(){
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    return ((ts.tv_sec * 1000000000) + ts.tv_nsec );
}

int main(int argc, char* argv[]) {
    if (argc < 3) {
        std::cerr << "Usage: ltp_md_latency <topic> <main_thread_cpu_id> <print_latency_thread_cpu_id>" << std::endl;
        std::cerr << "Example: ltp_md_latency \"bf_m_pub\" 0 1" << std::endl;
        return -1;
    }

    const char* topic = argv[1];
    int main_thread_cpu_id = std::stoi(argv[2]);
    int print_latency_thread_cpu_id = std::stoi(argv[3]);

    char timestamp[32];
    unsigned char buf[8192];
    unsigned long long total_messages = 0;
    unsigned long long total_bytes = 0;
    time_t start_time = time(NULL);
    int stats_interval = 10; // Display stats every 10 messages


    std::cerr << "main thread_cpu_id: " << main_thread_cpu_id << std::endl;
    std::cerr << "print_latency_thread_cpu_id: " << print_latency_thread_cpu_id << std::endl;

    // Set CPU affinity for main thread
    if (main_thread_cpu_id != -1) {
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(main_thread_cpu_id, &cpuset);
        if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0) {
            std::cerr << "Error setting CPU affinity for main thread" << std::endl;
            return -1;
        }
    }

    // Set up signal handlers
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    // Create market data receiver
    receiver = init_market_receiver(topic, strlen(topic));
    if (!receiver) {
        fprintf(stderr, "Failed to create market receiver for topic: %s\n", topic);
        return 1;
    }

    printf("Market data receiver initialized successfully\n");
    printf("Topic: %s\n", topic);
    printf("Press Ctrl+C to stop\n");

    // Start async file writer thread
    std::thread async_writer_thread(&AsyncFileWriter::run, &async_writer);

    // Set CPU affinity for async writer thread
    if (print_latency_thread_cpu_id != -1) {
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(print_latency_thread_cpu_id, &cpuset);
        if (pthread_setaffinity_np(async_writer_thread.native_handle(), sizeof(cpu_set_t), &cpuset) != 0) {
            std::cerr << "Error setting CPU affinity for async writer thread" << std::endl;
            return -1;
        }
    }

    // Main market data receiving loop
    while (running) {
        unsigned int size = receive_market(receiver, (char*)buf);
        if (size > 0) {
            // Parse and enqueue for async writing
            std::string data_str(reinterpret_cast<char*>(buf), size);
            parse_bookTicker_and_write(data_str);
        }
    }

    // Wait for async writer thread to finish
    async_writer_thread.join();

    printf("Market data receiver stopped\n");
    return 0;
}