API Reference

ltp_rm_1_0_receiver

/**
 * @file ltp_rm_1_0_receiver.cpp
 * Market data receiver latency test program that demonstrates how to receive and process
 * market data through the ZMQ messaging system.
 *
 * @copyright Copyright (c) 2025 LiquidityTech. All rights reserved.
 *
 * This software is proprietary and confidential. Unauthorized copying,
 * distribution, or modification of this file is strictly prohibited.
 *
 * For more information, visit: https://www.liquiditytech.com/
 *
 * LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
 * Trusted execution, custody, clearing, and financing solutions for digital assets.
 */

#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <iomanip>
#include <iostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <fstream>
#include <ctime>
#include "l2_book.pb.h"
#include "concurrentqueue.h"
#include <unistd.h>
#include <sys/syscall.h>
#include <pthread.h>

using namespace liquidityTech;
using namespace zmq;

class RapidMarketClient {
public:
    RapidMarketClient(const std::string& ipcpath)
        : ipcpath_(ipcpath)
        , context_()
        , subscriber_(context_, zmq::socket_type::sub) {
        subscriber_.connect(ipcpath_);
        subscriber_.set(zmq::sockopt::subscribe, "");
    }

    void run() {
        zmq::message_t msg;
        liquidityTech::md::L2BookProto book;
        std::cout << "Receiving market data..." << std::setprecision(5) << std::fixed << std::endl;

        while (true) {
            auto result = subscriber_.recv(msg, zmq::recv_flags::none);
            if (!result.has_value()) {
                std::cerr << "Failed to receive message" << std::endl;
                continue;
            }

            std::string message(static_cast<char*>(msg.data()), msg.size());
            std::string actual_message = message.substr(20);
            if (!book.ParseFromArray(actual_message.data(), msg.size() - 20)) {
                std::cerr << "Failed to parse message" << std::endl;
                continue;
            }

            Event event;
            event.local_ts = book.local_ts();
            event.exchange_ts = book.exchange_ts();
            event.seq_num = book.seq_num();
            event.receive_ts = getCurrentTime();

            auto ask_size = book.levels_asks_size();
            auto bid_size = book.levels_bids_size();
            if (ask_size == 0 || bid_size == 0)
                continue;

            auto ask = book.levels_asks(0);
            auto bid = book.levels_bids(0);
            event.bid_price = bid.price();
            event.bid_qty = bid.qty();
            event.ask_price = ask.price();
            event.ask_qty = ask.qty();

            if (bid.price() < 1000)
            {
                throw(std::runtime_error("Invalid bid price"));
            }

            event.symbol = book.symbol();
            latency_queue_.enqueue(std::move(event));

            msg.rebuild();
            book.Clear();
        }
    }

    void printLatency() {
        Event event;

        std::string currentDate = getCurrentDateString();
        std::ofstream outFile = openLogFile(currentDate);

        if (!outFile.is_open()) {
            std::cerr << "Failed to open log file: " << currentDate << ".log" << std::endl;
            return;
        }

        outFile << "symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty" << std::endl;

        while (true) {
            if (latency_queue_.try_dequeue(event)) {
                std::string newDate = getCurrentDateString();
                if (newDate != currentDate) {
                    outFile.close();
                    currentDate = newDate;
                    outFile = openLogFile(currentDate);
                }

                outFile << event.symbol << ","
                        << event.seq_num << ","
                        << event.exchange_ts << ","
                        << event.local_ts << ","
                        << event.receive_ts << ","
                        << event.bid_price << ","
                        << event.bid_qty << ","
                        << event.ask_price << ","
                        << event.ask_qty << std::endl;
            }
        }

        outFile.close();
    }

    static uint64_t getCurrentTime(){
        struct timespec ts;
        clock_gettime(CLOCK_REALTIME, &ts);
        return ((ts.tv_sec * 1000000000) + ts.tv_nsec );
      }

private:
    struct Event {
        uint64_t local_ts;
        uint64_t exchange_ts;
        uint64_t seq_num;
        uint64_t receive_ts;
        double bid_price;
        double bid_qty;
        double ask_price;
        double ask_qty;
        std::string symbol;
    };

    std::string getCurrentDateString() {
        auto t = std::time(nullptr);
        auto tm = *std::localtime(&t);
        std::ostringstream oss;
        oss << std::put_time(&tm, "%Y-%m-%d");
        return oss.str();
    }

    std::ofstream openLogFile(const std::string& date) {
        std::string fileName = "rm_1_0_" + date + ".log";
        return std::ofstream(fileName, std::ios::app);
    }

    std::string m_name = "rapidmarket_latency";
    std::string ipcpath_;
    zmq::context_t context_;
    zmq::socket_t subscriber_;
    moodycamel::ConcurrentQueue<Event> latency_queue_{1000000UL};
};

int main(int argc, char** argv) {
    if (argc != 4) {
        std::cerr << "Usage: rapidmarket_client <path> <main_thread_cpu_id> <print_latency_thread_cpu_id>" << std::endl;
        std::cerr << "Example: rapidmarket_client \"tcp://localhost:50000\"" << std::endl;
        return -1;
    }

    RapidMarketClient client(argv[1]);

    int main_thread_cpu_id = std::stoi(argv[2]);
    int print_latency_thread_cpu_id = std::stoi(argv[3]);
    std::cerr << "main thread_cpu_id: " << main_thread_cpu_id << std::endl;
    std::cerr << "print_latency_thread_cpu_id: " << print_latency_thread_cpu_id << std::endl;

    if (main_thread_cpu_id != -1)
    {
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(main_thread_cpu_id, &cpuset);
        if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0)
        {
            std::cerr << "Error setting CPU affinity" << std::endl;
            return -1;
        }
    }

    std::thread p_thread(&RapidMarketClient::printLatency, &client);
    if (print_latency_thread_cpu_id != -1)
    {
        cpu_set_t cpuset;
        CPU_ZERO(&cpuset);
        CPU_SET(print_latency_thread_cpu_id, &cpuset);
        if (pthread_setaffinity_np(p_thread.native_handle(), sizeof(cpu_set_t), &cpuset) != 0)
        {
            std::cerr << "Error setting CPU affinity" << std::endl;
            return -1;
        }
    }

    client.run();

    p_thread.join();

    return 0;
}