Skip to content
Open
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
a7e1881
DualEdnpointExtractorRDMA implementation
ibrodkin Nov 20, 2025
cd503ae
ConfigurationManager refactoring
ibrodkin Dec 15, 2025
c6abf40
Configuration management refactoring
ibrodkin Dec 18, 2025
2a6d6ad
some more of ConfigurationManager refactoring
ibrodkin Dec 18, 2025
fc08a59
minor tweak
ibrodkin Apr 14, 2026
5a84f4c
rebased on latest develop
ibrodkin Apr 14, 2026
d8ecce9
fixed merged chrono-player tests :
ibrodkin Apr 14, 2026
bc3b265
clang formatting
ibrodkin Apr 14, 2026
9a5ae41
Enhance ConfigurationManager with proper resource management and init…
iameneko Apr 14, 2026
ec87dd3
Refactor configuration return values to use CL_SUCCESS constant
iameneko Apr 14, 2026
e93ea4d
Enhance configuration validation in parseJsonConf methods
iameneko Apr 14, 2026
a74b12c
Improve error handling in ConfigurationManager for JSON parsing
iameneko Apr 14, 2026
173d690
rebase on recent 498-configuration-manager... branch
ibrodkin Apr 15, 2026
a956724
DualEndpointChunkExtractorRDMA
ibrodkin Apr 15, 2026
937caea
added ExtractionModule configuration block for ChronoKeeper
ibrodkin Apr 15, 2026
f36d544
rebased on develop
ibrodkin Apr 15, 2026
baae299
213-extractors: clang formating
ibrodkin Apr 16, 2026
4f61446
213-extractors: clang formating
ibrodkin Apr 16, 2026
5ba2c81
213-extractors: move old ExtractorBase out of chrono_common
ibrodkin Apr 17, 2026
bcda09e
213-extractors: remove cassert from Configuration headers
ibrodkin Apr 20, 2026
8d69382
213-extractors: added ExtractionModuleConfiguration
ibrodkin Apr 20, 2026
c100e41
213-extractors: added reset functions for extractors
ibrodkin Apr 24, 2026
2c0537d
213-extractors: added initialized state to StoryChunkExtractionModule…
ibrodkin Apr 24, 2026
be64854
213-exractors: working ExtractionModuleConfiguration test
ibrodkin Apr 24, 2026
fa11daf
213- configurable ExtractionChain
ibrodkin Apr 29, 2026
e34a6bc
213-exractors: new ExtractionModule * ExtractionChain implementations…
ibrodkin May 1, 2026
2997826
modified deplpoy*sh to populate ExtractionModule secgments for Keeper…
ibrodkin May 2, 2026
a0da80e
213-extractors : refined ExtractionModule shutdown procedure
ibrodkin May 4, 2026
cb8525d
Merge branch 'develop' into 213-dual-rdma-extractor
ibrodkin Apr 16, 2026
f3da4a8
213-extractors: clang formating
ibrodkin May 5, 2026
0a1845c
213-extractors: fixed the test build
ibrodkin May 5, 2026
da509d8
small build fixes
ibrodkin May 5, 2026
f3f40d8
213-extractors: clang formatting
ibrodkin May 5, 2026
5e15fa5
fixed extraction_chain_config_test
ibrodkin May 6, 2026
b0cd83c
213-extractors: temporary fix
ibrodkin May 6, 2026
003701a
213- receiver_service_id.is_valid()
ibrodkin May 7, 2026
0ffa911
213: ExtractionModule shutdown
ibrodkin May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ChronoGrapher/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ add_executable(chrono_grapher)
target_include_directories(chrono_grapher PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_SOURCE_DIR}/chrono_common/include
${HDF5_INCLUDE_DIRS}
)

target_sources(chrono_grapher PRIVATE
Expand Down
31 changes: 0 additions & 31 deletions ChronoGrapher/include/CSVFileChunkExtractor.h

This file was deleted.

20 changes: 12 additions & 8 deletions ChronoGrapher/include/ChronoGrapherConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <json-c/json.h>

#include <ConfigurationBlocks.h>
#include <ExtractionModuleConfiguration.h>

namespace chronolog
{
Expand All @@ -19,7 +20,7 @@ struct GrapherConfiguration
RPCProviderConf VISOR_REGISTRY_SERVICE_CONF;
LogConf LOG_CONF;
DataStoreConf DATA_STORE_CONF{};
ExtractorReaderConf EXTRACTOR_CONF;
ExtractionModuleConfiguration EXTRACTION_MODULE_CONF;

GrapherConfiguration()
{
Expand All @@ -44,18 +45,21 @@ struct GrapherConfiguration
DATA_STORE_CONF.acceptance_window_secs = 180;
DATA_STORE_CONF.inactive_story_delay_secs = 300;

EXTRACTOR_CONF.story_files_dir = "/tmp/";
EXTRACTION_MODULE_CONF.extraction_stream_count = 1;
}

int parseJsonConf(json_object*);
[[nodiscard]] std::string to_String() const
{
return "[CHRONO_GRAPHER_CONFIGURATION: RECORDING_GROUP: " + std::to_string(RECORDING_GROUP) +
", KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.to_String() +
", DATA_STORE_ADMIN_SERVICE_CONF: " + DATA_STORE_ADMIN_SERVICE_CONF.to_String() +
", VISOR_REGISTRY_SERVICE_CONF: " + VISOR_REGISTRY_SERVICE_CONF.to_String() +
", LOG_CONF: " + LOG_CONF.to_String() + ", DATA_STORE_CONF: " + DATA_STORE_CONF.to_String() +
", EXTRACTOR_CONF: " + EXTRACTOR_CONF.to_String() + "]";
std::string a_string = "[CHRONO_GRAPHER_CONFIGURATION: RECORDING_GROUP: " + std::to_string(RECORDING_GROUP) +
", KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.to_String() +
", DATA_STORE_ADMIN_SERVICE_CONF: " + DATA_STORE_ADMIN_SERVICE_CONF.to_String() +
", VISOR_REGISTRY_SERVICE_CONF: " + VISOR_REGISTRY_SERVICE_CONF.to_String() +
", LOG_CONF: " + LOG_CONF.to_String() +
", DATA_STORE_CONF: " + DATA_STORE_CONF.to_String() + ", ";

a_string += EXTRACTION_MODULE_CONF.to_string(a_string) + "]";
return a_string;
}
};

Expand Down
110 changes: 110 additions & 0 deletions ChronoGrapher/include/GrapherExtractionChain.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#ifndef CHRONO_GRAPHER_EXTRACTION_CHAIN
#define CHRONO_GRAPHER_EXTRACTION_CHAIN

#include <variant>
#include <vector>
#include <thallium.hpp>

#include <chronolog_errcode.h>
#include <ChunkLoggingExtractor.h>
#include <ChunkExtractorCSV.h>
#include <HDF5FileChunkExtractor.h>
#include <ExtractionModuleConfiguration.h>

namespace tl = thallium;

namespace chronolog
{
using Extractor = std::variant<LoggingExtractor, StoryChunkExtractorCSV, HDF5FileChunkExtractor>;

class ChronoGrapherExtractionChain
{
std::vector<Extractor> theExtractors;

public:
ChronoGrapherExtractionChain() {}

~ChronoGrapherExtractionChain() { theExtractors.clear(); }

void add_extractor(Extractor e) { theExtractors.push_back(std::move(e)); }

int process_chunk(StoryChunk* chunk)
{
int chain_result = CL_SUCCESS;

// If extractor fails, mark the chain result as a failure,
// but keep going for the others.
for(auto& e: theExtractors)
{
int extractor_result =
std::visit([chunk](auto& extractor) -> int { return extractor.process_chunk(chunk); }, e);

if(CL_SUCCESS != extractor_result)
{
chain_result = extractor_result;
}
}
return chain_result;
}

bool is_active_chain() const
{
if(theExtractors.empty())
{
return false;
}

for(const auto& e: theExtractors)
{
bool active = std::visit([](const auto& extractor) -> bool { return extractor.is_active(); }, e);

// if any single extractor is NOT active, the whole chain fails
if(!active)
{
return false;
}
}

return true;
}

int activate(ExtractionModuleConfiguration const& extraction_conf, ServiceId const& service_id)
{
int ret_value = CL_SUCCESS;

for(auto iter = extraction_conf.extractors.begin(); iter != extraction_conf.extractors.end(); ++iter)
{

if((*iter).first == "csv_extractor")
{
StoryChunkExtractorCSV csv_extractor(service_id);
ret_value = csv_extractor.reset((*iter).second);
if(CL_SUCCESS != ret_value)
{
break;
}
add_extractor(csv_extractor);
}
else if((*iter).first == "hdf5_extractor")
{
HDF5FileChunkExtractor hdf5_extractor;
ret_value = hdf5_extractor.reset((*iter).second);
if(CL_SUCCESS != ret_value)
{
break;
}
add_extractor(hdf5_extractor);
}
else if((*iter).first == "logging_extractor")
{
add_extractor(LoggingExtractor());
}
}

return ret_value;
}
};

} // namespace chronolog

#endif
9 changes: 7 additions & 2 deletions ChronoGrapher/include/HDF5FileChunkExtractor.h
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#ifndef CHRONOLOG_HDF5_FILE_CHUNK_EXTRACTOR_H
#define CHRONOLOG_HDF5_FILE_CHUNK_EXTRACTOR_H

#include <filesystem>
#include <string>


namespace chronolog
{

Expand All @@ -12,12 +12,17 @@ class StoryChunk;
class HDF5FileChunkExtractor
{
public:
HDF5FileChunkExtractor(std::string const& hdf5_files_root_dir);
HDF5FileChunkExtractor(std::string const& hdf5_archive_dir = "/tmp");

~HDF5FileChunkExtractor();

int process_chunk(StoryChunk*);

int reset(std::string const& hdf5_archive_dir);
int reset(json_object*);

bool is_active() const { return (std::filesystem::exists(rootDirectory)); }

private:
std::string rootDirectory;
};
Expand Down
50 changes: 0 additions & 50 deletions ChronoGrapher/src/CSVFileChunkExtractor.cpp

This file was deleted.

34 changes: 22 additions & 12 deletions ChronoGrapher/src/ChronoGrapher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@
#include <DataStoreAdminService.h>
#include <ConfigurationManager.h>
#include <StoryChunkExtractionModule.h>
#include <ChunkLoggingExtractor.h>
#include <ChunkExtractorCSV.h>
#include <HDF5FileChunkExtractor.h>
#include <cmd_arg_parse.h>
#include <ChronoGrapherConfiguration.h>
#include <GrapherExtractionChain.h>

namespace chl = chronolog;
namespace tl = thallium;
Expand Down Expand Up @@ -158,18 +156,30 @@ int main(int argc, char** argv)

LOG_INFO("[ChronoGrapher] GrapherIdCard: {}", chl::to_string(processIdCard));

// Instantiate MemoryDataStore & ExtractionModule
chronolog::ChunkIngestionQueue ingestionQueue;
// Instantiate StoryChunkExtractionModule

std::string log_string;
GRAPHER_CONF.EXTRACTION_MODULE_CONF.to_string(log_string);
LOG_INFO("[ChronoGrapherInstance] Initializing StoryChunkExtractionModule with {}", log_string);

chronolog::StoryChunkExtractionModule<chronolog::ChronoGrapherExtractionChain> theExtractionModule;

std::string archive_directory = GRAPHER_CONF.EXTRACTOR_CONF.story_files_dir;
theExtractionModule.getExtractionChain().activate(GRAPHER_CONF.EXTRACTION_MODULE_CONF,
processIdCard.getRecordingServiceId());

chl::LoggingExtractor logging_extractor;
chl::HDF5FileChunkExtractor hdf5_extractor(archive_directory);
theExtractionModule.initialize(GRAPHER_CONF.EXTRACTION_MODULE_CONF.extraction_stream_count);

chronolog::StoryChunkExtractionModule extractionModule(logging_extractor, hdf5_extractor);
if(!theExtractionModule.is_initialized())
{
LOG_ERROR("[ChronoGrapher] StoryChunkExtractionModule failed to initialize, exiting");
return (-1);
}

// Instantiate MemoryDataStore
chronolog::ChunkIngestionQueue ingestionQueue;

chronolog::GrapherDataStore theDataStore(ingestionQueue,
extractionModule.getExtractionQueue(),
theExtractionModule.getExtractionQueue(),
GRAPHER_CONF.DATA_STORE_CONF.max_story_chunk_size,
GRAPHER_CONF.DATA_STORE_CONF.story_chunk_duration_secs,
GRAPHER_CONF.DATA_STORE_CONF.acceptance_window_secs,
Expand Down Expand Up @@ -291,7 +301,7 @@ int main(int argc, char** argv)
tl::abt scope;
theDataStore.startDataCollection(3);
// start extraction streams & threads
extractionModule.startExtraction(2);
theExtractionModule.startExtraction();

/// Main loop for sending stats message until receiving SIGTERM ____________________________________________________
// now we are ready to ingest records coming from the storyteller clients ....
Expand All @@ -318,7 +328,7 @@ int main(int argc, char** argv)
theDataStore.shutdownDataCollection();
// Shutdown extraction module
// drain extractionQueue and stop extraction xStreams
extractionModule.shutdownExtraction();
theExtractionModule.shutdownExtraction();
// these are not probably needed as thallium handles the engine finalization...
// recordingEngine.finalize();
// collectionEngine.finalize();
Expand Down
27 changes: 9 additions & 18 deletions ChronoGrapher/src/ChronoGrapherConfiguration.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <chronolog_errcode.h>
#include <ConfigurationBlocks.h>
#include <ChronoGrapherConfiguration.h>

#include <ExtractionModuleConfiguration.h>

namespace chl = chronolog;

Expand Down Expand Up @@ -113,31 +113,22 @@ int chronolog::GrapherConfiguration::parseJsonConf(json_object* json_conf)
if(DATA_STORE_CONF.parseJsonConf(data_store_conf) != chl::CL_SUCCESS)
return chl::CL_ERR_INVALID_CONF;
}
else if(strcmp(key, "Extractors") == 0)
else if(strcmp(key, "ExtractionModule") == 0)
{
if(!json_object_is_type(val, json_type_object))
{
std::cerr << "[GrapherConfiguration] Invalid 'Extractors': expected object" << std::endl;
std::cerr << "[GrapherConfiguration] Invalid 'ExtractionModule' segment: expected json object"
<< std::endl;
return chl::CL_ERR_INVALID_CONF;
}
json_object* extractors = json_object_object_get(json_conf, "Extractors");
json_object_object_foreach(extractors, key, val)
json_object* extraction_module_json_object = json_object_object_get(json_conf, "ExtractionModule");
if(EXTRACTION_MODULE_CONF.parse_json_object(extraction_module_json_object) != chl::CL_SUCCESS)
{
if(strcmp(key, "story_files_dir") == 0)
{
if(!json_object_is_type(val, json_type_string))
{
std::cerr << "[GrapherConfiguration] Invalid 'story_files_dir': expected string" << std::endl;
return chl::CL_ERR_INVALID_CONF;
}
EXTRACTOR_CONF.story_files_dir = json_object_get_string(val);
}
else
{
std::cerr << "[GrapherConfiguration] Unknown Extractors configuration " << key << std::endl;
}
return chl::CL_ERR_INVALID_CONF;
std::cerr << "[GrapherConfiguration] Error parsing ExtractionModule configuration: " << std::endl;
}
}

else
{
std::cerr << "[GrapherConfiguration] Unknown Grapher configuration " << key << std::endl;
Expand Down
Loading
Loading