#include #include #include #include #include #include #include //#include #include #include #include //#include using namespace std; namespace StarQuant { extern std::atomic gShutdown; atomic MICRO_SERVICE_NUMBER(0); void DataBoardService() { try { // std::unique_ptr msgq_sub_; // msgq_sub_ = std::make_unique(MSGQ_PROTOCOL::SUB, CConfig::instance().SERVERPUB_URL, false); // while (!gShutdown) { // string msg = msgq_sub_->recmsg(0); // if (!msg.empty()) { // vector vs = stringsplit(msg, SERIALIZATION_SEPARATOR); // //cout<<"Databoad rec msg size:"< msgq_sub_; msgq_sub_ = std::make_unique(MSGQ_PROTOCOL::SUB, CConfig::instance().SERVERPUB_URL, false); string ymdstr = ymd(); string fname = CConfig::instance().dataDir() + "/marketdata-" + ymdstr + ".txt"; FILE* fp = fopen(fname.c_str(), "a+"); TickWriter fwriter; fwriter.fp = fp; char *buf = nullptr; if (fp) { while (!gShutdown) { string msg = msgq_sub_->recmsg(0); if (!msg.empty()) fwriter.put(msg); //fwriter.insertdb(msg); } } //PRINT_TO_FILE("INFO:[%s,%d][%s]recording service stopped: %s\n", __FILE__, __LINE__, __FUNCTION__); } void TickReplayService(const std::string& filetoreplay,int32_t tickinterval) { std::unique_ptr msgq_pub_; msgq_pub_ = std::make_unique(MSGQ_PROTOCOL::PUB, CConfig::instance().SERVERPUB_URL); uint64_t curt = 0; uint64_t logt = 0; //vector lines = readreplayfile(filetoreplay); vector lines = readreplayfile(filetoreplay); int32_t i = 0, sz = lines.size(); while (!gShutdown && i++ < sz) { // logt = lines[i].t; // curt = getMicroTime(); // static uint64_t diff = curt - logt; //89041208806 // while (!gShutdown && (diff + logt > curt)) { // curt = getMicroTime(); // } // string& msg = lines[i].msg; string& msg =lines[i]; //cout<< "REPLAY data:"<sendmsg(msg); msleep(tickinterval); } msleep(2000); PRINT_TO_FILE("INFO:[%s,%d][%s]replay service stopped: %s\n", __FILE__, __LINE__, __FUNCTION__); } }