/**
* @file ltp_rm_1_0_receiver.cpp
* Market data receiver latency test program that demonstrates how to receive and process
* market data through the ZMQ messaging system.
*
* @copyright Copyright (c) 2025 LiquidityTech. All rights reserved.
*
* This software is proprietary and confidential. Unauthorized copying,
* distribution, or modification of this file is strictly prohibited.
*
* For more information, visit: https://www.liquiditytech.com/
*
* LiquidityTech - The Leading Digital Asset Prime Brokerage for Institutions
* Trusted execution, custody, clearing, and financing solutions for digital assets.
*/
#include <zmq.hpp>
#include <zmq_addon.hpp>
#include <iomanip>
#include <iostream>
#include <stdexcept>
#include <string>
#include <thread>
#include <fstream>
#include <ctime>
#include "l2_book.pb.h"
#include "concurrentqueue.h"
#include <unistd.h>
#include <sys/syscall.h>
#include <pthread.h>
using namespace liquidityTech;
using namespace zmq;
class RapidMarketClient {
public:
RapidMarketClient(const std::string& ipcpath)
: ipcpath_(ipcpath)
, context_()
, subscriber_(context_, zmq::socket_type::sub) {
subscriber_.connect(ipcpath_);
subscriber_.set(zmq::sockopt::subscribe, "");
}
void run() {
zmq::message_t msg;
liquidityTech::md::L2BookProto book;
std::cout << "Receiving market data..." << std::setprecision(5) << std::fixed << std::endl;
while (true) {
auto result = subscriber_.recv(msg, zmq::recv_flags::none);
if (!result.has_value()) {
std::cerr << "Failed to receive message" << std::endl;
continue;
}
std::string message(static_cast<char*>(msg.data()), msg.size());
std::string actual_message = message.substr(20);
if (!book.ParseFromArray(actual_message.data(), msg.size() - 20)) {
std::cerr << "Failed to parse message" << std::endl;
continue;
}
Event event;
event.local_ts = book.local_ts();
event.exchange_ts = book.exchange_ts();
event.seq_num = book.seq_num();
event.receive_ts = getCurrentTime();
auto ask_size = book.levels_asks_size();
auto bid_size = book.levels_bids_size();
if (ask_size == 0 || bid_size == 0)
continue;
auto ask = book.levels_asks(0);
auto bid = book.levels_bids(0);
event.bid_price = bid.price();
event.bid_qty = bid.qty();
event.ask_price = ask.price();
event.ask_qty = ask.qty();
if (bid.price() < 1000)
{
throw(std::runtime_error("Invalid bid price"));
}
event.symbol = book.symbol();
latency_queue_.enqueue(std::move(event));
msg.rebuild();
book.Clear();
}
}
void printLatency() {
Event event;
std::string currentDate = getCurrentDateString();
std::ofstream outFile = openLogFile(currentDate);
if (!outFile.is_open()) {
std::cerr << "Failed to open log file: " << currentDate << ".log" << std::endl;
return;
}
outFile << "symbol,seq_num,exchange_ts,local_ts,receive_ts,bid_price,bid_qty,ask_price,ask_qty" << std::endl;
while (true) {
if (latency_queue_.try_dequeue(event)) {
std::string newDate = getCurrentDateString();
if (newDate != currentDate) {
outFile.close();
currentDate = newDate;
outFile = openLogFile(currentDate);
}
outFile << event.symbol << ","
<< event.seq_num << ","
<< event.exchange_ts << ","
<< event.local_ts << ","
<< event.receive_ts << ","
<< event.bid_price << ","
<< event.bid_qty << ","
<< event.ask_price << ","
<< event.ask_qty << std::endl;
}
}
outFile.close();
}
static uint64_t getCurrentTime(){
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return ((ts.tv_sec * 1000000000) + ts.tv_nsec );
}
private:
struct Event {
uint64_t local_ts;
uint64_t exchange_ts;
uint64_t seq_num;
uint64_t receive_ts;
double bid_price;
double bid_qty;
double ask_price;
double ask_qty;
std::string symbol;
};
std::string getCurrentDateString() {
auto t = std::time(nullptr);
auto tm = *std::localtime(&t);
std::ostringstream oss;
oss << std::put_time(&tm, "%Y-%m-%d");
return oss.str();
}
std::ofstream openLogFile(const std::string& date) {
std::string fileName = "rm_1_0_" + date + ".log";
return std::ofstream(fileName, std::ios::app);
}
std::string m_name = "rapidmarket_latency";
std::string ipcpath_;
zmq::context_t context_;
zmq::socket_t subscriber_;
moodycamel::ConcurrentQueue<Event> latency_queue_{1000000UL};
};
int main(int argc, char** argv) {
if (argc != 4) {
std::cerr << "Usage: rapidmarket_client <path> <main_thread_cpu_id> <print_latency_thread_cpu_id>" << std::endl;
std::cerr << "Example: rapidmarket_client \"tcp://localhost:50000\"" << std::endl;
return -1;
}
RapidMarketClient client(argv[1]);
int main_thread_cpu_id = std::stoi(argv[2]);
int print_latency_thread_cpu_id = std::stoi(argv[3]);
std::cerr << "main thread_cpu_id: " << main_thread_cpu_id << std::endl;
std::cerr << "print_latency_thread_cpu_id: " << print_latency_thread_cpu_id << std::endl;
if (main_thread_cpu_id != -1)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(main_thread_cpu_id, &cpuset);
if (pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset) != 0)
{
std::cerr << "Error setting CPU affinity" << std::endl;
return -1;
}
}
std::thread p_thread(&RapidMarketClient::printLatency, &client);
if (print_latency_thread_cpu_id != -1)
{
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
CPU_SET(print_latency_thread_cpu_id, &cpuset);
if (pthread_setaffinity_np(p_thread.native_handle(), sizeof(cpu_set_t), &cpuset) != 0)
{
std::cerr << "Error setting CPU affinity" << std::endl;
return -1;
}
}
client.run();
p_thread.join();
return 0;
}