From 1dad12c19d1468e9ae39bb8fa8000e25172e94c0 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Mon, 27 Apr 2026 17:45:11 -0500 Subject: [PATCH 01/12] Add KEEPER_CONF.INGESTION_THREAD_COUNT to control the ingestion thread count in Keeper --- ChronoKeeper/include/ChronoKeeperConfiguration.h | 2 ++ ChronoKeeper/src/ChronoKeeperConfiguration.cpp | 10 ++++++++++ ChronoKeeper/src/ChronoKeeperInstance.cpp | 3 ++- chrono_common/src/deprecated_ConfigurationManager.cpp | 6 ++++++ default_conf.json.in | 1 + 5 files changed, 21 insertions(+), 1 deletion(-) diff --git a/ChronoKeeper/include/ChronoKeeperConfiguration.h b/ChronoKeeper/include/ChronoKeeperConfiguration.h index 64a799833..ef1aece5d 100644 --- a/ChronoKeeper/include/ChronoKeeperConfiguration.h +++ b/ChronoKeeper/include/ChronoKeeperConfiguration.h @@ -17,6 +17,7 @@ namespace chronolog struct KeeperConfiguration { uint32_t RECORDING_GROUP; + uint32_t INGESTION_THREAD_COUNT; RPCProviderConf KEEPER_RECORDING_SERVICE_CONF; RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF; RPCProviderConf VISOR_REGISTRY_SERVICE_CONF; @@ -28,6 +29,7 @@ struct KeeperConfiguration KeeperConfiguration() { RECORDING_GROUP = 0; + INGESTION_THREAD_COUNT = 1; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP = "127.0.0.1"; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT = 9999; diff --git a/ChronoKeeper/src/ChronoKeeperConfiguration.cpp b/ChronoKeeper/src/ChronoKeeperConfiguration.cpp index aa99fb353..99866cdad 100644 --- a/ChronoKeeper/src/ChronoKeeperConfiguration.cpp +++ b/ChronoKeeper/src/ChronoKeeperConfiguration.cpp @@ -23,6 +23,16 @@ int chronolog::KeeperConfiguration::parseJsonConf(json_object* json_conf) int value = json_object_get_int(val); RECORDING_GROUP = (value >= 0 ? value : 0); } + else if(strcmp(key, "IngestionThreadCount") == 0) + { + if(!json_object_is_type(val, json_type_int)) + { + std::cerr << "[KeeperConfiguration] Invalid 'IngestionThreadCount': expected integer" << std::endl; + return chl::CL_ERR_INVALID_CONF; + } + int value = json_object_get_int(val); + INGESTION_THREAD_COUNT = (value >= 1 ? value : 1); + } else if(strcmp(key, "KeeperRecordingService") == 0) { if(!json_object_is_type(val, json_type_object)) diff --git a/ChronoKeeper/src/ChronoKeeperInstance.cpp b/ChronoKeeper/src/ChronoKeeperInstance.cpp index d9063e554..9f16c688d 100644 --- a/ChronoKeeper/src/ChronoKeeperInstance.cpp +++ b/ChronoKeeper/src/ChronoKeeperInstance.cpp @@ -242,7 +242,8 @@ int main(int argc, char** argv) try { - margo_instance_id margo_id = margo_init(KEEPER_RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, 1); + int rpc_thread_count = static_cast(KEEPER_CONF.INGESTION_THREAD_COUNT); + margo_instance_id margo_id = margo_init(KEEPER_RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count); recordingEngine = new tl::engine(margo_id); std::stringstream s1; diff --git a/chrono_common/src/deprecated_ConfigurationManager.cpp b/chrono_common/src/deprecated_ConfigurationManager.cpp index faf79a19b..fb7ffdcaf 100644 --- a/chrono_common/src/deprecated_ConfigurationManager.cpp +++ b/chrono_common/src/deprecated_ConfigurationManager.cpp @@ -279,6 +279,12 @@ int chronolog::KeeperConfiguration::parseJsonConf(json_object* json_conf) int value = json_object_get_int(val); RECORDING_GROUP = (value >= 0 ? value : 0); } + else if(strcmp(key, "IngestionThreadCount") == 0) + { + assert(json_object_is_type(val, json_type_int)); + int value = json_object_get_int(val); + INGESTION_THREAD_COUNT = (value >= 1 ? value : 1); + } else if(strcmp(key, "KeeperRecordingService") == 0) { assert(json_object_is_type(val, json_type_object)); diff --git a/default_conf.json.in b/default_conf.json.in index 2a6f9e5f5..eb6fcdb56 100644 --- a/default_conf.json.in +++ b/default_conf.json.in @@ -40,6 +40,7 @@ }, "chrono_keeper": { "RecordingGroup": 7, + "IngestionThreadCount": 1, "KeeperRecordingService": { "rpc": { "protocol_conf": "ofi+sockets", From 88d96c78dbd00bf577481bb7e3920e078658f921 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Tue, 5 May 2026 16:48:19 -0500 Subject: [PATCH 02/12] Only log event string in trace log in Debug build --- ChronoKeeper/include/KeeperRecordingService.h | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/ChronoKeeper/include/KeeperRecordingService.h b/ChronoKeeper/include/KeeperRecordingService.h index 6de00d48c..314ff0576 100644 --- a/ChronoKeeper/include/KeeperRecordingService.h +++ b/ChronoKeeper/include/KeeperRecordingService.h @@ -39,9 +39,12 @@ class KeeperRecordingService: public tl::provider { // ClientId teller_id, StoryId story_id, // ChronoTick const& chrono_tick, std::string const& record) +#ifndef NDEBUG std::stringstream ss; ss << log_event; - LOG_DEBUG("[KeeperRecordingService] Recording event: {}", ss.str()); + LOG_TRACE("[KeeperRecordingService] Recording event: {}", ss.str()); +#endif + LOG_DEBUG("[KeeperRecordingService] Recording event in ULT={}, ES={}", tl::thread::self_id(), tl::xstream::self().get_rank()); theIngestionQueue.ingestLogEvent(log_event); request.respond(chronolog::CL_SUCCESS); } From 6fd197273c0ba50fd889464ece4844ce610e959c Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Tue, 5 May 2026 17:03:17 -0500 Subject: [PATCH 03/12] Only log event string in trace log in Debug build --- ChronoKeeper/include/IngestionQueue.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ChronoKeeper/include/IngestionQueue.h b/ChronoKeeper/include/IngestionQueue.h index 5d62459fe..93e31d86d 100644 --- a/ChronoKeeper/include/IngestionQueue.h +++ b/ChronoKeeper/include/IngestionQueue.h @@ -59,12 +59,14 @@ class IngestionQueue void ingestLogEvent(LogEvent const& event) { +#ifndef NDEBUG std::stringstream ss; ss << event; - LOG_DEBUG("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}", + LOG_TRACE("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}", event.storyId, ss.str(), storyIngestionHandles.size()); +#endif auto ingestionHandle_iter = storyIngestionHandles.find(event.storyId); if(ingestionHandle_iter == storyIngestionHandles.end()) { From f37d71a05b6cbb0e72bf0c4952df75d995f45fa2 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Tue, 5 May 2026 17:04:39 -0500 Subject: [PATCH 04/12] Move IngestionThreadCount under KeeperRecordingService, and default to 4 --- .../include/ChronoKeeperConfiguration.h | 10 +++++--- .../src/ChronoKeeperConfiguration.cpp | 23 ++++++++++--------- ChronoKeeper/src/ChronoKeeperInstance.cpp | 18 +++++++-------- default_conf.json.in | 2 +- 4 files changed, 29 insertions(+), 24 deletions(-) diff --git a/ChronoKeeper/include/ChronoKeeperConfiguration.h b/ChronoKeeper/include/ChronoKeeperConfiguration.h index ef1aece5d..cb4c1f582 100644 --- a/ChronoKeeper/include/ChronoKeeperConfiguration.h +++ b/ChronoKeeper/include/ChronoKeeperConfiguration.h @@ -14,11 +14,16 @@ namespace chronolog { +struct KeeperRecordingServiceConf +{ + RPCProviderConf RPC_CONF; + uint32_t INGESTION_THREAD_COUNT = 1; +}; + struct KeeperConfiguration { uint32_t RECORDING_GROUP; - uint32_t INGESTION_THREAD_COUNT; - RPCProviderConf KEEPER_RECORDING_SERVICE_CONF; + KeeperRecordingServiceConf KEEPER_RECORDING_SERVICE_CONF; RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF; RPCProviderConf VISOR_REGISTRY_SERVICE_CONF; RPCProviderConf KEEPER_GRAPHER_DRAIN_SERVICE_CONF; @@ -29,7 +34,6 @@ struct KeeperConfiguration KeeperConfiguration() { RECORDING_GROUP = 0; - INGESTION_THREAD_COUNT = 1; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP = "127.0.0.1"; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT = 9999; diff --git a/ChronoKeeper/src/ChronoKeeperConfiguration.cpp b/ChronoKeeper/src/ChronoKeeperConfiguration.cpp index 99866cdad..33665cd60 100644 --- a/ChronoKeeper/src/ChronoKeeperConfiguration.cpp +++ b/ChronoKeeper/src/ChronoKeeperConfiguration.cpp @@ -23,16 +23,6 @@ int chronolog::KeeperConfiguration::parseJsonConf(json_object* json_conf) int value = json_object_get_int(val); RECORDING_GROUP = (value >= 0 ? value : 0); } - else if(strcmp(key, "IngestionThreadCount") == 0) - { - if(!json_object_is_type(val, json_type_int)) - { - std::cerr << "[KeeperConfiguration] Invalid 'IngestionThreadCount': expected integer" << std::endl; - return chl::CL_ERR_INVALID_CONF; - } - int value = json_object_get_int(val); - INGESTION_THREAD_COUNT = (value >= 1 ? value : 1); - } else if(strcmp(key, "KeeperRecordingService") == 0) { if(!json_object_is_type(val, json_type_object)) @@ -45,8 +35,19 @@ int chronolog::KeeperConfiguration::parseJsonConf(json_object* json_conf) { if(strcmp(key, "rpc") == 0) { - if(KEEPER_RECORDING_SERVICE_CONF.parseJsonConf(val) != chl::CL_SUCCESS) + if(KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.parseJsonConf(val) != chl::CL_SUCCESS) + return chl::CL_ERR_INVALID_CONF; + } + else if(strcmp(key, "IngestionThreadCount") == 0) + { + if(!json_object_is_type(val, json_type_int)) + { + std::cerr << "[KeeperConfiguration] Invalid 'IngestionThreadCount': expected integer" + << std::endl; return chl::CL_ERR_INVALID_CONF; + } + int value = json_object_get_int(val); + KEEPER_RECORDING_SERVICE_CONF.INGESTION_THREAD_COUNT = (value >= 1 ? value : 1); } else { diff --git a/ChronoKeeper/src/ChronoKeeperInstance.cpp b/ChronoKeeper/src/ChronoKeeperInstance.cpp index 9f16c688d..260c64ab9 100644 --- a/ChronoKeeper/src/ChronoKeeperInstance.cpp +++ b/ChronoKeeper/src/ChronoKeeperInstance.cpp @@ -127,10 +127,10 @@ int main(int argc, char** argv) /// KeeperRecordingService setup ___________________________________________________________________________________ // Instantiate KeeperRecordingService - std::string KEEPER_RECORDING_SERVICE_PROTOCOL = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.PROTO_CONF; - std::string KEEPER_RECORDING_SERVICE_IP = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.IP; - uint16_t KEEPER_RECORDING_SERVICE_PORT = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.BASE_PORT; - uint16_t recording_service_provider_id = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.SERVICE_PROVIDER_ID; + std::string KEEPER_RECORDING_SERVICE_PROTOCOL = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.PROTO_CONF; + std::string KEEPER_RECORDING_SERVICE_IP = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.IP; + uint16_t KEEPER_RECORDING_SERVICE_PORT = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.BASE_PORT; + uint16_t recording_service_provider_id = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID; // validate ip address, instantiate Recording Service and create KeeperIdCard @@ -149,10 +149,10 @@ int main(int argc, char** argv) chronolog::RecordingGroupId keeper_group_id = KEEPER_CONF.RECORDING_GROUP; chronolog::KeeperIdCard keeperIdCard( keeper_group_id, - chronolog::ServiceId(KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.PROTO_CONF, - KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.IP, - KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.BASE_PORT, - KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.SERVICE_PROVIDER_ID)); + chronolog::ServiceId(KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.PROTO_CONF, + KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.IP, + KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.BASE_PORT, + KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID)); std::string KEEPER_RECORDING_SERVICE_NA_STRING; keeperIdCard.getRecordingServiceId().get_service_as_string(KEEPER_RECORDING_SERVICE_NA_STRING); @@ -242,7 +242,7 @@ int main(int argc, char** argv) try { - int rpc_thread_count = static_cast(KEEPER_CONF.INGESTION_THREAD_COUNT); + int rpc_thread_count = static_cast(KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.INGESTION_THREAD_COUNT); margo_instance_id margo_id = margo_init(KEEPER_RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count); recordingEngine = new tl::engine(margo_id); diff --git a/default_conf.json.in b/default_conf.json.in index eb6fcdb56..2ee30a949 100644 --- a/default_conf.json.in +++ b/default_conf.json.in @@ -40,8 +40,8 @@ }, "chrono_keeper": { "RecordingGroup": 7, - "IngestionThreadCount": 1, "KeeperRecordingService": { + "IngestionThreadCount": 4, "rpc": { "protocol_conf": "ofi+sockets", "service_ip": "127.0.0.1", From d97f569b0b8181b0cd01480c5d6f618296472860 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Tue, 5 May 2026 17:39:57 -0500 Subject: [PATCH 05/12] Add ingestion thread count config for Grapher --- .../include/ChronoGrapherConfiguration.h | 18 +++++++++++------ ChronoGrapher/src/ChronoGrapher.cpp | 20 ++++++++++--------- .../src/ChronoGrapherConfiguration.cpp | 13 +++++++++++- default_conf.json.in | 1 + .../keeper_grapher_ingest_test.cpp | 8 ++++---- 5 files changed, 40 insertions(+), 20 deletions(-) diff --git a/ChronoGrapher/include/ChronoGrapherConfiguration.h b/ChronoGrapher/include/ChronoGrapherConfiguration.h index 341a5e9d1..f0c110341 100644 --- a/ChronoGrapher/include/ChronoGrapherConfiguration.h +++ b/ChronoGrapher/include/ChronoGrapherConfiguration.h @@ -11,10 +11,16 @@ namespace chronolog { +struct KeeperGrapherDrainServiceConf +{ + RPCProviderConf RPC_CONF; + uint32_t INGESTION_THREAD_COUNT = 1; +}; + struct GrapherConfiguration { uint32_t RECORDING_GROUP{}; - RPCProviderConf KEEPER_GRAPHER_DRAIN_SERVICE_CONF; + KeeperGrapherDrainServiceConf KEEPER_GRAPHER_DRAIN_SERVICE_CONF; RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF; RPCProviderConf VISOR_REGISTRY_SERVICE_CONF; LogConf LOG_CONF; @@ -24,10 +30,10 @@ struct GrapherConfiguration GrapherConfiguration() { RECORDING_GROUP = 0; - KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; - KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP = "127.0.0.1"; - KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT = 9999; - KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID = 99; + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF = "ofi+sockets"; + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP = "127.0.0.1"; + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT = 9999; + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID = 99; DATA_STORE_ADMIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; DATA_STORE_ADMIN_SERVICE_CONF.IP = "127.0.0.1"; @@ -51,7 +57,7 @@ struct GrapherConfiguration [[nodiscard]] std::string to_String() const { return "[CHRONO_GRAPHER_CONFIGURATION: RECORDING_GROUP: " + std::to_string(RECORDING_GROUP) + - ", KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.to_String() + + ", KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.to_String() + ", DATA_STORE_ADMIN_SERVICE_CONF: " + DATA_STORE_ADMIN_SERVICE_CONF.to_String() + ", VISOR_REGISTRY_SERVICE_CONF: " + VISOR_REGISTRY_SERVICE_CONF.to_String() + ", LOG_CONF: " + LOG_CONF.to_String() + ", DATA_STORE_CONF: " + DATA_STORE_CONF.to_String() + diff --git a/ChronoGrapher/src/ChronoGrapher.cpp b/ChronoGrapher/src/ChronoGrapher.cpp index 2f71fcb0f..592f144d7 100644 --- a/ChronoGrapher/src/ChronoGrapher.cpp +++ b/ChronoGrapher/src/ChronoGrapher.cpp @@ -129,15 +129,15 @@ int main(int argc, char** argv) // Instantiate GrapherRecordingService chronolog::RecordingGroupId recording_group_id = GRAPHER_CONF.RECORDING_GROUP; - std::string RECORDING_SERVICE_PROTOCOL = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF; - std::string RECORDING_SERVICE_IP = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP; - uint16_t RECORDING_SERVICE_PORT = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT; - uint16_t recording_service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID; + std::string RECORDING_SERVICE_PROTOCOL = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF; + std::string RECORDING_SERVICE_IP = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP; + uint16_t RECORDING_SERVICE_PORT = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT; + uint16_t recording_service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID; - chl::ServiceId recordingServiceId(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF, - GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP, - GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT, - GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID); + chl::ServiceId recordingServiceId(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF, + GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP, + GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT, + GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID); std::string RECORDING_SERVICE_NA_STRING; @@ -213,7 +213,9 @@ int main(int argc, char** argv) try { - margo_instance_id margo_id = margo_init(RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, 1); + int rpc_thread_count = static_cast(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.INGESTION_THREAD_COUNT); + margo_instance_id margo_id = + margo_init(RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count); recordingEngine = new tl::engine(margo_id); std::stringstream s1; diff --git a/ChronoGrapher/src/ChronoGrapherConfiguration.cpp b/ChronoGrapher/src/ChronoGrapherConfiguration.cpp index 90950b76b..a86964b50 100644 --- a/ChronoGrapher/src/ChronoGrapherConfiguration.cpp +++ b/ChronoGrapher/src/ChronoGrapherConfiguration.cpp @@ -37,9 +37,20 @@ int chronolog::GrapherConfiguration::parseJsonConf(json_object* json_conf) { if(strcmp(key, "rpc") == 0) { - if(KEEPER_GRAPHER_DRAIN_SERVICE_CONF.parseJsonConf(val) != chl::CL_SUCCESS) + if(KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.parseJsonConf(val) != chl::CL_SUCCESS) return chl::CL_ERR_INVALID_CONF; } + else if(strcmp(key, "IngestionThreadCount") == 0) + { + if(!json_object_is_type(val, json_type_int)) + { + std::cerr << "[GrapherConfiguration] Invalid 'IngestionThreadCount': expected integer" + << std::endl; + return chl::CL_ERR_INVALID_CONF; + } + int value = json_object_get_int(val); + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.INGESTION_THREAD_COUNT = (value >= 1 ? value : 1); + } else { std::cerr << "[GrapherConfiguration] Unknown KeeperGrapherDrainService configuration: " << key diff --git a/default_conf.json.in b/default_conf.json.in index 2ee30a949..2a5852f63 100644 --- a/default_conf.json.in +++ b/default_conf.json.in @@ -97,6 +97,7 @@ "chrono_grapher": { "RecordingGroup": 7, "KeeperGrapherDrainService": { + "IngestionThreadCount": 1, "rpc": { "protocol_conf": "ofi+sockets", "service_ip": "127.0.0.1", diff --git a/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp b/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp index b69ec068b..de5831617 100644 --- a/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp +++ b/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp @@ -146,9 +146,9 @@ int main(int argc, char** argv) /** * Keeper-push */ - std::string KEEPER_COLLECTOR_NA_STRING = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF + "://" + - GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP + ":" + - std::to_string(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT); + std::string KEEPER_COLLECTOR_NA_STRING = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF + "://" + + GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP + ":" + + std::to_string(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT); tl::engine extraction_engine = tl::engine(KEEPER_COLLECTOR_NA_STRING, THALLIUM_SERVER_MODE); std::stringstream ss; ss << extraction_engine.self(); @@ -164,7 +164,7 @@ int main(int argc, char** argv) tl_es_vec.push_back(std::move(es)); } - int service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID; + int service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID; StoryChunkRecordService* story_chunk_recording_service = StoryChunkRecordService::CreateStoryChunkRecordService(extraction_engine, tl_pool, service_provider_id); LOG_DEBUG("[standalone_ingest_test] StoryChunkRecordService created"); From e9266564974c4ebb66ee5095bd861b8f731ea5e2 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Tue, 5 May 2026 17:48:29 -0500 Subject: [PATCH 06/12] Add ingestion thread count config for Player --- .../include/ChronoPlayerConfiguration.h | 18 +++++++++++------ ChronoPlayer/src/ChronoPlayer.cpp | 20 ++++++++++--------- .../src/ChronoPlayerConfiguration.cpp | 13 +++++++++++- default_conf.json.in | 1 + 4 files changed, 36 insertions(+), 16 deletions(-) diff --git a/ChronoPlayer/include/ChronoPlayerConfiguration.h b/ChronoPlayer/include/ChronoPlayerConfiguration.h index 1d7ce3c52..5f5e79b53 100644 --- a/ChronoPlayer/include/ChronoPlayerConfiguration.h +++ b/ChronoPlayer/include/ChronoPlayerConfiguration.h @@ -11,11 +11,17 @@ namespace chronolog { +struct PlaybackServiceConf +{ + RPCProviderConf RPC_CONF; + uint32_t INGESTION_THREAD_COUNT = 1; +}; + struct PlayerConfiguration { uint32_t RECORDING_GROUP; RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF; - RPCProviderConf PLAYBACK_SERVICE_CONF; + PlaybackServiceConf PLAYBACK_SERVICE_CONF; RPCProviderConf VISOR_REGISTRY_SERVICE_CONF; LogConf LOG_CONF; DataStoreConf DATA_STORE_CONF{}; @@ -29,10 +35,10 @@ struct PlayerConfiguration DATA_STORE_ADMIN_SERVICE_CONF.BASE_PORT = 2222; DATA_STORE_ADMIN_SERVICE_CONF.SERVICE_PROVIDER_ID = 22; - PLAYBACK_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; - PLAYBACK_SERVICE_CONF.IP = "127.0.0.1"; - PLAYBACK_SERVICE_CONF.BASE_PORT = 2225; - PLAYBACK_SERVICE_CONF.SERVICE_PROVIDER_ID = 25; + PLAYBACK_SERVICE_CONF.RPC_CONF.PROTO_CONF = "ofi+sockets"; + PLAYBACK_SERVICE_CONF.RPC_CONF.IP = "127.0.0.1"; + PLAYBACK_SERVICE_CONF.RPC_CONF.BASE_PORT = 2225; + PLAYBACK_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID = 25; VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; VISOR_REGISTRY_SERVICE_CONF.IP = "127.0.0.1"; @@ -52,7 +58,7 @@ struct PlayerConfiguration { return "[CHRONO_PLAYER_CONFIGURATION: RECORDING_GROUP: " + std::to_string(RECORDING_GROUP) + ", DATA_STORE_ADMIN_SERVICE_CONF: " + DATA_STORE_ADMIN_SERVICE_CONF.to_String() + - ", PLAYBACK_SERVICE_CONF: " + PLAYBACK_SERVICE_CONF.to_String() + + ", PLAYBACK_SERVICE_CONF: " + PLAYBACK_SERVICE_CONF.RPC_CONF.to_String() + ", VISOR_REGISTRY_SERVICE_CONF: " + VISOR_REGISTRY_SERVICE_CONF.to_String() + ", LOG_CONF: " + LOG_CONF.to_String() + ", DATA_STORE_CONF: " + DATA_STORE_CONF.to_String() + ", READER_CONF: " + READER_CONF.to_String() + "]"; diff --git a/ChronoPlayer/src/ChronoPlayer.cpp b/ChronoPlayer/src/ChronoPlayer.cpp index 8ee858670..43ad2d290 100644 --- a/ChronoPlayer/src/ChronoPlayer.cpp +++ b/ChronoPlayer/src/ChronoPlayer.cpp @@ -87,17 +87,17 @@ int main(int argc, char** argv) } // Instantiate PlaybackService - std::string PLAYBACK_SERVICE_PROTOCOL = PLAYER_CONF.PLAYBACK_SERVICE_CONF.PROTO_CONF; - std::string PLAYBACK_SERVICE_IP = PLAYER_CONF.PLAYBACK_SERVICE_CONF.IP; - uint16_t PLAYBACK_SERVICE_PORT = PLAYER_CONF.PLAYBACK_SERVICE_CONF.BASE_PORT; - uint16_t playback_service_provider_id = PLAYER_CONF.PLAYBACK_SERVICE_CONF.SERVICE_PROVIDER_ID; + std::string PLAYBACK_SERVICE_PROTOCOL = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.PROTO_CONF; + std::string PLAYBACK_SERVICE_IP = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.IP; + uint16_t PLAYBACK_SERVICE_PORT = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.BASE_PORT; + uint16_t playback_service_provider_id = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID; // validate ip address, instantiate Playback Service and create IdCard - chronolog::ServiceId playbackServiceId(PLAYER_CONF.PLAYBACK_SERVICE_CONF.PROTO_CONF, - PLAYER_CONF.PLAYBACK_SERVICE_CONF.IP, - PLAYER_CONF.PLAYBACK_SERVICE_CONF.BASE_PORT, - PLAYER_CONF.PLAYBACK_SERVICE_CONF.SERVICE_PROVIDER_ID); + chronolog::ServiceId playbackServiceId(PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.PROTO_CONF, + PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.IP, + PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.BASE_PORT, + PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID); if(!playbackServiceId.is_valid()) { @@ -116,7 +116,9 @@ int main(int argc, char** argv) playbackServiceId.get_service_as_string(PLAYBACK_SERVICE_NA_STRING); - margo_instance_id playback_margo_id = margo_init(PLAYBACK_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, 1); + int rpc_thread_count = static_cast(PLAYER_CONF.PLAYBACK_SERVICE_CONF.INGESTION_THREAD_COUNT); + margo_instance_id playback_margo_id = + margo_init(PLAYBACK_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count); playbackEngine = new tl::engine(playback_margo_id); diff --git a/ChronoPlayer/src/ChronoPlayerConfiguration.cpp b/ChronoPlayer/src/ChronoPlayerConfiguration.cpp index b91fdbe26..a71c62d53 100644 --- a/ChronoPlayer/src/ChronoPlayerConfiguration.cpp +++ b/ChronoPlayer/src/ChronoPlayerConfiguration.cpp @@ -59,9 +59,20 @@ int chronolog::PlayerConfiguration::parseJsonConf(json_object* json_conf) { if(strcmp(key, "rpc") == 0) { - if(PLAYBACK_SERVICE_CONF.parseJsonConf(val) != chl::CL_SUCCESS) + if(PLAYBACK_SERVICE_CONF.RPC_CONF.parseJsonConf(val) != chl::CL_SUCCESS) return chl::CL_ERR_INVALID_CONF; } + else if(strcmp(key, "IngestionThreadCount") == 0) + { + if(!json_object_is_type(val, json_type_int)) + { + std::cerr << "[PlayerConfiguration] Invalid 'IngestionThreadCount': expected integer" + << std::endl; + return chl::CL_ERR_INVALID_CONF; + } + int value = json_object_get_int(val); + PLAYBACK_SERVICE_CONF.INGESTION_THREAD_COUNT = (value >= 1 ? value : 1); + } else { std::cerr << "[ConfigurationManager] [chrono_player] Unknown PlaybackQueryService configuration: " diff --git a/default_conf.json.in b/default_conf.json.in index 2a5852f63..39f1f15dd 100644 --- a/default_conf.json.in +++ b/default_conf.json.in @@ -153,6 +153,7 @@ } }, "PlaybackQueryService": { + "IngestionThreadCount": 1, "rpc": { "protocol_conf": "ofi+sockets", "service_ip": "127.0.0.1", From c9e3ba1159331fcfd321a6c07e620515d16a2a81 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Wed, 6 May 2026 15:07:42 -0500 Subject: [PATCH 07/12] Fix clang-format --- ChronoGrapher/src/ChronoGrapher.cpp | 3 ++- ChronoKeeper/include/KeeperRecordingService.h | 4 +++- ChronoKeeper/src/ChronoKeeperInstance.cpp | 3 ++- .../keeper-grapher/keeper_grapher_ingest_test.cpp | 7 ++++--- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/ChronoGrapher/src/ChronoGrapher.cpp b/ChronoGrapher/src/ChronoGrapher.cpp index 592f144d7..7d16b5f2e 100644 --- a/ChronoGrapher/src/ChronoGrapher.cpp +++ b/ChronoGrapher/src/ChronoGrapher.cpp @@ -132,7 +132,8 @@ int main(int argc, char** argv) std::string RECORDING_SERVICE_PROTOCOL = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF; std::string RECORDING_SERVICE_IP = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP; uint16_t RECORDING_SERVICE_PORT = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT; - uint16_t recording_service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID; + uint16_t recording_service_provider_id = + GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID; chl::ServiceId recordingServiceId(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF, GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP, diff --git a/ChronoKeeper/include/KeeperRecordingService.h b/ChronoKeeper/include/KeeperRecordingService.h index 314ff0576..f7cfa18fe 100644 --- a/ChronoKeeper/include/KeeperRecordingService.h +++ b/ChronoKeeper/include/KeeperRecordingService.h @@ -44,7 +44,9 @@ class KeeperRecordingService: public tl::provider ss << log_event; LOG_TRACE("[KeeperRecordingService] Recording event: {}", ss.str()); #endif - LOG_DEBUG("[KeeperRecordingService] Recording event in ULT={}, ES={}", tl::thread::self_id(), tl::xstream::self().get_rank()); + LOG_DEBUG("[KeeperRecordingService] Recording event in ULT={}, ES={}", + tl::thread::self_id(), + tl::xstream::self().get_rank()); theIngestionQueue.ingestLogEvent(log_event); request.respond(chronolog::CL_SUCCESS); } diff --git a/ChronoKeeper/src/ChronoKeeperInstance.cpp b/ChronoKeeper/src/ChronoKeeperInstance.cpp index 260c64ab9..88c57e81c 100644 --- a/ChronoKeeper/src/ChronoKeeperInstance.cpp +++ b/ChronoKeeper/src/ChronoKeeperInstance.cpp @@ -243,7 +243,8 @@ int main(int argc, char** argv) try { int rpc_thread_count = static_cast(KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.INGESTION_THREAD_COUNT); - margo_instance_id margo_id = margo_init(KEEPER_RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count); + margo_instance_id margo_id = + margo_init(KEEPER_RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count); recordingEngine = new tl::engine(margo_id); std::stringstream s1; diff --git a/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp b/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp index de5831617..3e9d1bf54 100644 --- a/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp +++ b/test/integration/keeper-grapher/keeper_grapher_ingest_test.cpp @@ -146,9 +146,10 @@ int main(int argc, char** argv) /** * Keeper-push */ - std::string KEEPER_COLLECTOR_NA_STRING = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF + "://" + - GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP + ":" + - std::to_string(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT); + std::string KEEPER_COLLECTOR_NA_STRING = + GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF + "://" + + GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP + ":" + + std::to_string(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT); tl::engine extraction_engine = tl::engine(KEEPER_COLLECTOR_NA_STRING, THALLIUM_SERVER_MODE); std::stringstream ss; ss << extraction_engine.self(); From 0ec24ab69ec68fcd4a960ee96d0b0d9e344f559b Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Wed, 6 May 2026 15:27:23 -0500 Subject: [PATCH 08/12] Remove IngestionThreadCount config from deprecated conf manager --- chrono_common/src/deprecated_ConfigurationManager.cpp | 6 ------ 1 file changed, 6 deletions(-) diff --git a/chrono_common/src/deprecated_ConfigurationManager.cpp b/chrono_common/src/deprecated_ConfigurationManager.cpp index fb7ffdcaf..faf79a19b 100644 --- a/chrono_common/src/deprecated_ConfigurationManager.cpp +++ b/chrono_common/src/deprecated_ConfigurationManager.cpp @@ -279,12 +279,6 @@ int chronolog::KeeperConfiguration::parseJsonConf(json_object* json_conf) int value = json_object_get_int(val); RECORDING_GROUP = (value >= 0 ? value : 0); } - else if(strcmp(key, "IngestionThreadCount") == 0) - { - assert(json_object_is_type(val, json_type_int)); - int value = json_object_get_int(val); - INGESTION_THREAD_COUNT = (value >= 1 ? value : 1); - } else if(strcmp(key, "KeeperRecordingService") == 0) { assert(json_object_is_type(val, json_type_object)); From 4d31af51bff573ea1685d6b3337c19dfed21aa79 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Wed, 6 May 2026 15:30:21 -0500 Subject: [PATCH 09/12] Remove debug loggin in record_event() --- ChronoKeeper/include/KeeperRecordingService.h | 3 --- 1 file changed, 3 deletions(-) diff --git a/ChronoKeeper/include/KeeperRecordingService.h b/ChronoKeeper/include/KeeperRecordingService.h index f7cfa18fe..983b798a4 100644 --- a/ChronoKeeper/include/KeeperRecordingService.h +++ b/ChronoKeeper/include/KeeperRecordingService.h @@ -44,9 +44,6 @@ class KeeperRecordingService: public tl::provider ss << log_event; LOG_TRACE("[KeeperRecordingService] Recording event: {}", ss.str()); #endif - LOG_DEBUG("[KeeperRecordingService] Recording event in ULT={}, ES={}", - tl::thread::self_id(), - tl::xstream::self().get_rank()); theIngestionQueue.ingestLogEvent(log_event); request.respond(chronolog::CL_SUCCESS); } From 4daa7ff165ffa8bff2eafc1eaf1ef606aa0211b1 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Wed, 6 May 2026 15:34:29 -0500 Subject: [PATCH 10/12] Consistently initialize ingestion thread count in Keeper --- ChronoKeeper/include/ChronoKeeperConfiguration.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ChronoKeeper/include/ChronoKeeperConfiguration.h b/ChronoKeeper/include/ChronoKeeperConfiguration.h index cb4c1f582..1f6681780 100644 --- a/ChronoKeeper/include/ChronoKeeperConfiguration.h +++ b/ChronoKeeper/include/ChronoKeeperConfiguration.h @@ -34,6 +34,13 @@ struct KeeperConfiguration KeeperConfiguration() { RECORDING_GROUP = 0; + + KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.PROTO_CONF = "ofi+sockets"; + KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.IP = "127.0.0.1"; + KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.BASE_PORT = 6666; + KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID = 66; + KEEPER_RECORDING_SERVICE_CONF.INGESTION_THREAD_COUNT = 4; + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets"; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP = "127.0.0.1"; KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT = 9999; From c55d4b172ffa73b76d3275e28eea1e7053b1d361 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Thu, 7 May 2026 00:01:14 -0500 Subject: [PATCH 11/12] Fix IngestionQueue data race on concurrent RPC handlers Co-Authored-By: Claude Opus 4.7 --- ChronoKeeper/include/IngestionQueue.h | 77 +++++++++++++++++++-------- 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/ChronoKeeper/include/IngestionQueue.h b/ChronoKeeper/include/IngestionQueue.h index 93e31d86d..c7d52eb7e 100644 --- a/ChronoKeeper/include/IngestionQueue.h +++ b/ChronoKeeper/include/IngestionQueue.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -32,8 +33,8 @@ class IngestionQueue void addStoryIngestionHandle(StoryId const& story_id, StoryIngestionHandle* ingestion_handle) { - std::lock_guard lock(ingestionQueueMutex); - storyIngestionHandles.emplace(std::pair(story_id, ingestion_handle)); + std::unique_lock lock(handleMapMutex); + storyIngestionHandles.emplace(story_id, ingestion_handle); LOG_DEBUG("[IngestionQueue] Added handle for StoryID={}: HandleAddress={}, StoryIngestionHandles={}, " "HandleMapSize={}", story_id, @@ -44,7 +45,7 @@ class IngestionQueue void removeIngestionHandle(StoryId const& story_id) { - std::lock_guard lock(ingestionQueueMutex); + std::unique_lock lock(handleMapMutex); if(storyIngestionHandles.erase(story_id)) { LOG_DEBUG("[IngestionQueue] Removed handle for StoryID={}. Current handle MapSize={}", @@ -57,38 +58,55 @@ class IngestionQueue } } + // Hot path: called concurrently by the keeper's RPC handler threads + // (rpc_thread_count = INGESTION_THREAD_COUNT). The map lookup is guarded + // by a shared lock so multiple ingest threads can resolve handles in + // parallel; only add/removeIngestionHandle take an exclusive lock. + // The handle itself is invoked outside the map lock so handler threads + // don't serialize on it -- StoryIngestionHandle::ingestEvent has its own + // per-story mutex. void ingestLogEvent(LogEvent const& event) { + StoryIngestionHandle* handle = nullptr; + { + std::shared_lock lock(handleMapMutex); #ifndef NDEBUG - std::stringstream ss; - ss << event; - LOG_TRACE("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}", - event.storyId, - ss.str(), - storyIngestionHandles.size()); + std::stringstream ss; + ss << event; + LOG_TRACE("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}", + event.storyId, + ss.str(), + storyIngestionHandles.size()); #endif - auto ingestionHandle_iter = storyIngestionHandles.find(event.storyId); - if(ingestionHandle_iter == storyIngestionHandles.end()) + auto ingestionHandle_iter = storyIngestionHandles.find(event.storyId); + if(ingestionHandle_iter != storyIngestionHandles.end()) + handle = ingestionHandle_iter->second; + } + + if(handle != nullptr) { - LOG_WARNING("[IngestionQueue] Orphan event for story {}. Storing for later processing.", event.storyId); - std::lock_guard lock(ingestionQueueMutex); - orphanEventQueue.push_back(event); + handle->ingestEvent(event); } else { - //individual StoryIngestionHandle has its own mutex - (*ingestionHandle_iter).second->ingestEvent(event); + LOG_WARNING("[IngestionQueue] Orphan event for story {}. Storing for later processing.", event.storyId); + std::lock_guard lock(orphanQueueMutex); + orphanEventQueue.push_back(event); } } + // Lock ordering when both mutexes are needed: orphanQueueMutex first, + // then handleMapMutex. ingestLogEvent never holds them simultaneously, so + // there is no inverse-order acquisition path. void drainOrphanEvents() { + std::lock_guard orphanLock(orphanQueueMutex); if(orphanEventQueue.empty()) { LOG_DEBUG("[IngestionQueue] Orphan event queue is empty. No actions taken."); return; } - std::lock_guard lock(ingestionQueueMutex); + std::shared_lock mapLock(handleMapMutex); for(EventDeque::iterator iter = orphanEventQueue.begin(); iter != orphanEventQueue.end();) { auto ingestionHandle_iter = storyIngestionHandles.find((*iter).storyId); @@ -107,17 +125,26 @@ class IngestionQueue LOG_DEBUG("[IngestionQueue] Drained {} orphan events into known handles.", orphanEventQueue.size()); } - bool is_empty() const { return (orphanEventQueue.empty() && storyIngestionHandles.empty()); } + bool is_empty() const + { + std::lock_guard orphanLock(orphanQueueMutex); + std::shared_lock mapLock(handleMapMutex); + return (orphanEventQueue.empty() && storyIngestionHandles.empty()); + } void shutDown() { - LOG_INFO("[IngestionQueue] Initiating shutdown. HandleMapSize={}, Orphan EventQueueSize={}", - storyIngestionHandles.size(), - orphanEventQueue.size()); + { + std::lock_guard orphanLock(orphanQueueMutex); + std::shared_lock mapLock(handleMapMutex); + LOG_INFO("[IngestionQueue] Initiating shutdown. HandleMapSize={}, Orphan EventQueueSize={}", + storyIngestionHandles.size(), + orphanEventQueue.size()); + } // last attempt to drain orphanEventQueue into known ingestionHandles drainOrphanEvents(); // disengage all handles - std::lock_guard lock(ingestionQueueMutex); + std::unique_lock lock(handleMapMutex); storyIngestionHandles.clear(); LOG_INFO("[IngestionQueue] Shutdown completed. All handles disengaged."); } @@ -127,11 +154,15 @@ class IngestionQueue IngestionQueue& operator=(IngestionQueue const&) = delete; - std::mutex ingestionQueueMutex; + // handleMapMutex is shared/exclusive: ingestLogEvent and drainOrphanEvents + // take shared locks (concurrent map lookups); add/remove take exclusive. + // mutable so const observers (is_empty) can take a shared lock. + mutable std::shared_mutex handleMapMutex; std::unordered_map storyIngestionHandles; // events for unknown stories or late events for closed stories will end up // in orphanEventQueue that we'll periodically try to drain into the DataStore + mutable std::mutex orphanQueueMutex; std::deque orphanEventQueue; //Timer to triger periodic attempt to drain orphanEventQueue and collect/log statistics From 9b1d9cdf3e633e7c4664daa84462a7f8b6f631d6 Mon Sep 17 00:00:00 2001 From: Kun Feng Date: Thu, 7 May 2026 10:11:27 -0500 Subject: [PATCH 12/12] Hold IngestionQueue shared lock across ingestEvent Co-Authored-By: Claude Opus 4.7 --- ChronoKeeper/include/IngestionQueue.h | 47 +++++++++++++-------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/ChronoKeeper/include/IngestionQueue.h b/ChronoKeeper/include/IngestionQueue.h index c7d52eb7e..d58b1228e 100644 --- a/ChronoKeeper/include/IngestionQueue.h +++ b/ChronoKeeper/include/IngestionQueue.h @@ -62,37 +62,34 @@ class IngestionQueue // (rpc_thread_count = INGESTION_THREAD_COUNT). The map lookup is guarded // by a shared lock so multiple ingest threads can resolve handles in // parallel; only add/removeIngestionHandle take an exclusive lock. - // The handle itself is invoked outside the map lock so handler threads - // don't serialize on it -- StoryIngestionHandle::ingestEvent has its own - // per-story mutex. + // + // The shared lock is held across handle->ingestEvent so the retirement + // path (KeeperDataStore::retireDecayedPipelines: removeIngestionHandle + // followed by `delete pipeline`) cannot tear down the handle mid-call. + // Concurrent ingest threads still proceed in parallel since shared locks + // don't block each other; only the rare retire path waits. void ingestLogEvent(LogEvent const& event) { - StoryIngestionHandle* handle = nullptr; - { - std::shared_lock lock(handleMapMutex); + std::shared_lock lock(handleMapMutex); #ifndef NDEBUG - std::stringstream ss; - ss << event; - LOG_TRACE("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}", - event.storyId, - ss.str(), - storyIngestionHandles.size()); + std::stringstream ss; + ss << event; + LOG_TRACE("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}", + event.storyId, + ss.str(), + storyIngestionHandles.size()); #endif - auto ingestionHandle_iter = storyIngestionHandles.find(event.storyId); - if(ingestionHandle_iter != storyIngestionHandles.end()) - handle = ingestionHandle_iter->second; - } - - if(handle != nullptr) + auto ingestionHandle_iter = storyIngestionHandles.find(event.storyId); + if(ingestionHandle_iter != storyIngestionHandles.end()) { - handle->ingestEvent(event); - } - else - { - LOG_WARNING("[IngestionQueue] Orphan event for story {}. Storing for later processing.", event.storyId); - std::lock_guard lock(orphanQueueMutex); - orphanEventQueue.push_back(event); + ingestionHandle_iter->second->ingestEvent(event); + return; } + lock.unlock(); + + LOG_WARNING("[IngestionQueue] Orphan event for story {}. Storing for later processing.", event.storyId); + std::lock_guard orphan_lock(orphanQueueMutex); + orphanEventQueue.push_back(event); } // Lock ordering when both mutexes are needed: orphanQueueMutex first,