Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions ChronoGrapher/include/ChronoGrapherConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@
namespace chronolog
{

struct KeeperGrapherDrainServiceConf
{
RPCProviderConf RPC_CONF;
uint32_t INGESTION_THREAD_COUNT = 1;
};

struct GrapherConfiguration
{
uint32_t RECORDING_GROUP{};
RPCProviderConf KEEPER_GRAPHER_DRAIN_SERVICE_CONF;
KeeperGrapherDrainServiceConf KEEPER_GRAPHER_DRAIN_SERVICE_CONF;
RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF;
RPCProviderConf VISOR_REGISTRY_SERVICE_CONF;
LogConf LOG_CONF;
Expand All @@ -24,10 +30,10 @@ struct GrapherConfiguration
GrapherConfiguration()
{
RECORDING_GROUP = 0;
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets";
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP = "127.0.0.1";
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT = 9999;
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID = 99;
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF = "ofi+sockets";
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP = "127.0.0.1";
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT = 9999;
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID = 99;

DATA_STORE_ADMIN_SERVICE_CONF.PROTO_CONF = "ofi+sockets";
DATA_STORE_ADMIN_SERVICE_CONF.IP = "127.0.0.1";
Expand All @@ -51,7 +57,7 @@ struct GrapherConfiguration
[[nodiscard]] std::string to_String() const
{
return "[CHRONO_GRAPHER_CONFIGURATION: RECORDING_GROUP: " + std::to_string(RECORDING_GROUP) +
", KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.to_String() +
", KEEPER_GRAPHER_DRAIN_SERVICE_CONF: " + KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.to_String() +
", DATA_STORE_ADMIN_SERVICE_CONF: " + DATA_STORE_ADMIN_SERVICE_CONF.to_String() +
", VISOR_REGISTRY_SERVICE_CONF: " + VISOR_REGISTRY_SERVICE_CONF.to_String() +
", LOG_CONF: " + LOG_CONF.to_String() + ", DATA_STORE_CONF: " + DATA_STORE_CONF.to_String() +
Expand Down
21 changes: 12 additions & 9 deletions ChronoGrapher/src/ChronoGrapher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,16 @@ int main(int argc, char** argv)

// Instantiate GrapherRecordingService
chronolog::RecordingGroupId recording_group_id = GRAPHER_CONF.RECORDING_GROUP;
std::string RECORDING_SERVICE_PROTOCOL = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF;
std::string RECORDING_SERVICE_IP = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP;
uint16_t RECORDING_SERVICE_PORT = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT;
uint16_t recording_service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID;
std::string RECORDING_SERVICE_PROTOCOL = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF;
std::string RECORDING_SERVICE_IP = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP;
uint16_t RECORDING_SERVICE_PORT = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT;
uint16_t recording_service_provider_id =
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID;

chl::ServiceId recordingServiceId(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF,
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP,
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT,
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID);
chl::ServiceId recordingServiceId(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF,
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP,
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT,
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID);


std::string RECORDING_SERVICE_NA_STRING;
Expand Down Expand Up @@ -213,7 +214,9 @@ int main(int argc, char** argv)

try
{
margo_instance_id margo_id = margo_init(RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, 1);
int rpc_thread_count = static_cast<int>(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.INGESTION_THREAD_COUNT);
margo_instance_id margo_id =
margo_init(RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count);
Comment thread
fkengun marked this conversation as resolved.
recordingEngine = new tl::engine(margo_id);

std::stringstream s1;
Expand Down
13 changes: 12 additions & 1 deletion ChronoGrapher/src/ChronoGrapherConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,20 @@ int chronolog::GrapherConfiguration::parseJsonConf(json_object* json_conf)
{
if(strcmp(key, "rpc") == 0)
{
if(KEEPER_GRAPHER_DRAIN_SERVICE_CONF.parseJsonConf(val) != chl::CL_SUCCESS)
if(KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.parseJsonConf(val) != chl::CL_SUCCESS)
return chl::CL_ERR_INVALID_CONF;
}
else if(strcmp(key, "IngestionThreadCount") == 0)
{
if(!json_object_is_type(val, json_type_int))
{
std::cerr << "[GrapherConfiguration] Invalid 'IngestionThreadCount': expected integer"
<< std::endl;
return chl::CL_ERR_INVALID_CONF;
}
int value = json_object_get_int(val);
KEEPER_GRAPHER_DRAIN_SERVICE_CONF.INGESTION_THREAD_COUNT = (value >= 1 ? value : 1);
}
else
{
std::cerr << "[GrapherConfiguration] Unknown KeeperGrapherDrainService configuration: " << key
Expand Down
8 changes: 7 additions & 1 deletion ChronoKeeper/include/ChronoKeeperConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@

namespace chronolog
{
struct KeeperRecordingServiceConf
{
RPCProviderConf RPC_CONF;
uint32_t INGESTION_THREAD_COUNT = 1;
};

struct KeeperConfiguration
{
uint32_t RECORDING_GROUP;
RPCProviderConf KEEPER_RECORDING_SERVICE_CONF;
KeeperRecordingServiceConf KEEPER_RECORDING_SERVICE_CONF;
RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF;
Comment thread
fkengun marked this conversation as resolved.
RPCProviderConf VISOR_REGISTRY_SERVICE_CONF;
RPCProviderConf KEEPER_GRAPHER_DRAIN_SERVICE_CONF;
Expand Down
4 changes: 3 additions & 1 deletion ChronoKeeper/include/IngestionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ class IngestionQueue

void ingestLogEvent(LogEvent const& event)
{
#ifndef NDEBUG
std::stringstream ss;
ss << event;
LOG_DEBUG("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}",
LOG_TRACE("[IngestionQueue] Received event for StoryID={}: Event Details={}, HandleMapSize={}",
event.storyId,
ss.str(),
storyIngestionHandles.size());
#endif
auto ingestionHandle_iter = storyIngestionHandles.find(event.storyId);
if(ingestionHandle_iter == storyIngestionHandles.end())
{
Expand Down
7 changes: 6 additions & 1 deletion ChronoKeeper/include/KeeperRecordingService.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ class KeeperRecordingService: public tl::provider<KeeperRecordingService>
{
// ClientId teller_id, StoryId story_id,
// ChronoTick const& chrono_tick, std::string const& record)
#ifndef NDEBUG
std::stringstream ss;
ss << log_event;
LOG_DEBUG("[KeeperRecordingService] Recording event: {}", ss.str());
LOG_TRACE("[KeeperRecordingService] Recording event: {}", ss.str());
#endif
LOG_DEBUG("[KeeperRecordingService] Recording event in ULT={}, ES={}",
Comment thread
fkengun marked this conversation as resolved.
Outdated
tl::thread::self_id(),
tl::xstream::self().get_rank());
theIngestionQueue.ingestLogEvent(log_event);
request.respond(chronolog::CL_SUCCESS);
}
Expand Down
13 changes: 12 additions & 1 deletion ChronoKeeper/src/ChronoKeeperConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,20 @@ int chronolog::KeeperConfiguration::parseJsonConf(json_object* json_conf)
{
if(strcmp(key, "rpc") == 0)
{
if(KEEPER_RECORDING_SERVICE_CONF.parseJsonConf(val) != chl::CL_SUCCESS)
if(KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.parseJsonConf(val) != chl::CL_SUCCESS)
return chl::CL_ERR_INVALID_CONF;
}
else if(strcmp(key, "IngestionThreadCount") == 0)
{
if(!json_object_is_type(val, json_type_int))
{
std::cerr << "[KeeperConfiguration] Invalid 'IngestionThreadCount': expected integer"
<< std::endl;
return chl::CL_ERR_INVALID_CONF;
}
int value = json_object_get_int(val);
KEEPER_RECORDING_SERVICE_CONF.INGESTION_THREAD_COUNT = (value >= 1 ? value : 1);
}
else
{
std::cerr << "[KeeperConfiguration] Unknown KeeperRecordingService "
Expand Down
20 changes: 11 additions & 9 deletions ChronoKeeper/src/ChronoKeeperInstance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ int main(int argc, char** argv)

/// KeeperRecordingService setup ___________________________________________________________________________________
// Instantiate KeeperRecordingService
std::string KEEPER_RECORDING_SERVICE_PROTOCOL = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.PROTO_CONF;
std::string KEEPER_RECORDING_SERVICE_IP = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.IP;
uint16_t KEEPER_RECORDING_SERVICE_PORT = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.BASE_PORT;
uint16_t recording_service_provider_id = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.SERVICE_PROVIDER_ID;
std::string KEEPER_RECORDING_SERVICE_PROTOCOL = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.PROTO_CONF;
std::string KEEPER_RECORDING_SERVICE_IP = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.IP;
uint16_t KEEPER_RECORDING_SERVICE_PORT = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.BASE_PORT;
uint16_t recording_service_provider_id = KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID;


// validate ip address, instantiate Recording Service and create KeeperIdCard
Expand All @@ -149,10 +149,10 @@ int main(int argc, char** argv)
chronolog::RecordingGroupId keeper_group_id = KEEPER_CONF.RECORDING_GROUP;
chronolog::KeeperIdCard keeperIdCard(
keeper_group_id,
chronolog::ServiceId(KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.PROTO_CONF,
KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.IP,
KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.BASE_PORT,
KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.SERVICE_PROVIDER_ID));
chronolog::ServiceId(KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.PROTO_CONF,
KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.IP,
KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.BASE_PORT,
KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID));

std::string KEEPER_RECORDING_SERVICE_NA_STRING;
keeperIdCard.getRecordingServiceId().get_service_as_string(KEEPER_RECORDING_SERVICE_NA_STRING);
Expand Down Expand Up @@ -242,7 +242,9 @@ int main(int argc, char** argv)

try
{
margo_instance_id margo_id = margo_init(KEEPER_RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, 1);
int rpc_thread_count = static_cast<int>(KEEPER_CONF.KEEPER_RECORDING_SERVICE_CONF.INGESTION_THREAD_COUNT);
Comment thread
fkengun marked this conversation as resolved.
margo_instance_id margo_id =
margo_init(KEEPER_RECORDING_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count);
recordingEngine = new tl::engine(margo_id);

std::stringstream s1;
Expand Down
18 changes: 12 additions & 6 deletions ChronoPlayer/include/ChronoPlayerConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@
namespace chronolog
{

struct PlaybackServiceConf
{
RPCProviderConf RPC_CONF;
uint32_t INGESTION_THREAD_COUNT = 1;
};

struct PlayerConfiguration
{
uint32_t RECORDING_GROUP;
RPCProviderConf DATA_STORE_ADMIN_SERVICE_CONF;
RPCProviderConf PLAYBACK_SERVICE_CONF;
PlaybackServiceConf PLAYBACK_SERVICE_CONF;
RPCProviderConf VISOR_REGISTRY_SERVICE_CONF;
LogConf LOG_CONF;
DataStoreConf DATA_STORE_CONF{};
Expand All @@ -29,10 +35,10 @@ struct PlayerConfiguration
DATA_STORE_ADMIN_SERVICE_CONF.BASE_PORT = 2222;
DATA_STORE_ADMIN_SERVICE_CONF.SERVICE_PROVIDER_ID = 22;

PLAYBACK_SERVICE_CONF.PROTO_CONF = "ofi+sockets";
PLAYBACK_SERVICE_CONF.IP = "127.0.0.1";
PLAYBACK_SERVICE_CONF.BASE_PORT = 2225;
PLAYBACK_SERVICE_CONF.SERVICE_PROVIDER_ID = 25;
PLAYBACK_SERVICE_CONF.RPC_CONF.PROTO_CONF = "ofi+sockets";
PLAYBACK_SERVICE_CONF.RPC_CONF.IP = "127.0.0.1";
PLAYBACK_SERVICE_CONF.RPC_CONF.BASE_PORT = 2225;
PLAYBACK_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID = 25;

VISOR_REGISTRY_SERVICE_CONF.PROTO_CONF = "ofi+sockets";
VISOR_REGISTRY_SERVICE_CONF.IP = "127.0.0.1";
Expand All @@ -52,7 +58,7 @@ struct PlayerConfiguration
{
return "[CHRONO_PLAYER_CONFIGURATION: RECORDING_GROUP: " + std::to_string(RECORDING_GROUP) +
", DATA_STORE_ADMIN_SERVICE_CONF: " + DATA_STORE_ADMIN_SERVICE_CONF.to_String() +
", PLAYBACK_SERVICE_CONF: " + PLAYBACK_SERVICE_CONF.to_String() +
", PLAYBACK_SERVICE_CONF: " + PLAYBACK_SERVICE_CONF.RPC_CONF.to_String() +
", VISOR_REGISTRY_SERVICE_CONF: " + VISOR_REGISTRY_SERVICE_CONF.to_String() +
", LOG_CONF: " + LOG_CONF.to_String() + ", DATA_STORE_CONF: " + DATA_STORE_CONF.to_String() +
", READER_CONF: " + READER_CONF.to_String() + "]";
Expand Down
20 changes: 11 additions & 9 deletions ChronoPlayer/src/ChronoPlayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,17 +87,17 @@ int main(int argc, char** argv)
}

// Instantiate PlaybackService
std::string PLAYBACK_SERVICE_PROTOCOL = PLAYER_CONF.PLAYBACK_SERVICE_CONF.PROTO_CONF;
std::string PLAYBACK_SERVICE_IP = PLAYER_CONF.PLAYBACK_SERVICE_CONF.IP;
uint16_t PLAYBACK_SERVICE_PORT = PLAYER_CONF.PLAYBACK_SERVICE_CONF.BASE_PORT;
uint16_t playback_service_provider_id = PLAYER_CONF.PLAYBACK_SERVICE_CONF.SERVICE_PROVIDER_ID;
std::string PLAYBACK_SERVICE_PROTOCOL = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.PROTO_CONF;
std::string PLAYBACK_SERVICE_IP = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.IP;
uint16_t PLAYBACK_SERVICE_PORT = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.BASE_PORT;
uint16_t playback_service_provider_id = PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID;

// validate ip address, instantiate Playback Service and create IdCard

chronolog::ServiceId playbackServiceId(PLAYER_CONF.PLAYBACK_SERVICE_CONF.PROTO_CONF,
PLAYER_CONF.PLAYBACK_SERVICE_CONF.IP,
PLAYER_CONF.PLAYBACK_SERVICE_CONF.BASE_PORT,
PLAYER_CONF.PLAYBACK_SERVICE_CONF.SERVICE_PROVIDER_ID);
chronolog::ServiceId playbackServiceId(PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.PROTO_CONF,
PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.IP,
PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.BASE_PORT,
PLAYER_CONF.PLAYBACK_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID);

if(!playbackServiceId.is_valid())
{
Expand All @@ -116,7 +116,9 @@ int main(int argc, char** argv)

playbackServiceId.get_service_as_string(PLAYBACK_SERVICE_NA_STRING);

margo_instance_id playback_margo_id = margo_init(PLAYBACK_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, 1);
int rpc_thread_count = static_cast<int>(PLAYER_CONF.PLAYBACK_SERVICE_CONF.INGESTION_THREAD_COUNT);
margo_instance_id playback_margo_id =
margo_init(PLAYBACK_SERVICE_NA_STRING.c_str(), MARGO_SERVER_MODE, 1, rpc_thread_count);

playbackEngine = new tl::engine(playback_margo_id);

Expand Down
13 changes: 12 additions & 1 deletion ChronoPlayer/src/ChronoPlayerConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,20 @@ int chronolog::PlayerConfiguration::parseJsonConf(json_object* json_conf)
{
if(strcmp(key, "rpc") == 0)
{
if(PLAYBACK_SERVICE_CONF.parseJsonConf(val) != chl::CL_SUCCESS)
if(PLAYBACK_SERVICE_CONF.RPC_CONF.parseJsonConf(val) != chl::CL_SUCCESS)
return chl::CL_ERR_INVALID_CONF;
}
else if(strcmp(key, "IngestionThreadCount") == 0)
{
if(!json_object_is_type(val, json_type_int))
{
std::cerr << "[PlayerConfiguration] Invalid 'IngestionThreadCount': expected integer"
<< std::endl;
return chl::CL_ERR_INVALID_CONF;
}
int value = json_object_get_int(val);
PLAYBACK_SERVICE_CONF.INGESTION_THREAD_COUNT = (value >= 1 ? value : 1);
}
else
{
std::cerr << "[ConfigurationManager] [chrono_player] Unknown PlaybackQueryService configuration: "
Expand Down
6 changes: 6 additions & 0 deletions chrono_common/src/deprecated_ConfigurationManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ int chronolog::KeeperConfiguration::parseJsonConf(json_object* json_conf)
int value = json_object_get_int(val);
RECORDING_GROUP = (value >= 0 ? value : 0);
}
else if(strcmp(key, "IngestionThreadCount") == 0)
{
assert(json_object_is_type(val, json_type_int));
int value = json_object_get_int(val);
INGESTION_THREAD_COUNT = (value >= 1 ? value : 1);
Comment thread
fkengun marked this conversation as resolved.
Outdated
}
else if(strcmp(key, "KeeperRecordingService") == 0)
{
assert(json_object_is_type(val, json_type_object));
Expand Down
3 changes: 3 additions & 0 deletions default_conf.json.in
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"chrono_keeper": {
"RecordingGroup": 7,
"KeeperRecordingService": {
"IngestionThreadCount": 4,
"rpc": {
"protocol_conf": "ofi+sockets",
"service_ip": "127.0.0.1",
Expand Down Expand Up @@ -96,6 +97,7 @@
"chrono_grapher": {
"RecordingGroup": 7,
"KeeperGrapherDrainService": {
"IngestionThreadCount": 1,
"rpc": {
"protocol_conf": "ofi+sockets",
"service_ip": "127.0.0.1",
Expand Down Expand Up @@ -151,6 +153,7 @@
}
},
"PlaybackQueryService": {
"IngestionThreadCount": 1,
"rpc": {
"protocol_conf": "ofi+sockets",
"service_ip": "127.0.0.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ int main(int argc, char** argv)
/**
* Keeper-push
*/
std::string KEEPER_COLLECTOR_NA_STRING = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.PROTO_CONF + "://" +
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.IP + ":" +
std::to_string(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.BASE_PORT);
std::string KEEPER_COLLECTOR_NA_STRING =
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.PROTO_CONF + "://" +
GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.IP + ":" +
std::to_string(GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.BASE_PORT);
tl::engine extraction_engine = tl::engine(KEEPER_COLLECTOR_NA_STRING, THALLIUM_SERVER_MODE);
std::stringstream ss;
ss << extraction_engine.self();
Expand All @@ -164,7 +165,7 @@ int main(int argc, char** argv)
tl_es_vec.push_back(std::move(es));
}

int service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.SERVICE_PROVIDER_ID;
int service_provider_id = GRAPHER_CONF.KEEPER_GRAPHER_DRAIN_SERVICE_CONF.RPC_CONF.SERVICE_PROVIDER_ID;
StoryChunkRecordService* story_chunk_recording_service =
StoryChunkRecordService::CreateStoryChunkRecordService(extraction_engine, tl_pool, service_provider_id);
LOG_DEBUG("[standalone_ingest_test] StoryChunkRecordService created");
Expand Down
Loading