Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/cmd/tests/load_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;

use cmd::options::GreptimeOptions;
use common_base::memory_limit::MemoryLimit;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_config::{Configurable, DEFAULT_DATA_HOME, ENV_VAR_SEP};
use common_options::datanode::{ClientOptions, DatanodeClientOptions};
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, DEFAULT_OTLP_HTTP_ENDPOINT, LoggingOptions};
use common_wal::config::DatanodeWalConfig;
Expand Down Expand Up @@ -311,3 +311,25 @@ fn test_load_standalone_example_config() {
};
similar_asserts::assert_eq!(options, expected);
}

#[test]
fn test_load_heartbeat_env_vars_from_env() {
let env_prefix = "HEARTBEAT_ENV_VARS_UT";
let env_key = [env_prefix, "HEARTBEAT_ENV_VARS"].join(ENV_VAR_SEP);

temp_env::with_var(env_key, Some("AZ,REGION"), || {
let expected = vec!["AZ".to_string(), "REGION".to_string()];

let datanode =
GreptimeOptions::<DatanodeOptions>::load_layered_options(None, env_prefix).unwrap();
similar_asserts::assert_eq!(datanode.component.heartbeat_env_vars, expected);

let frontend =
GreptimeOptions::<FrontendOptions>::load_layered_options(None, env_prefix).unwrap();
similar_asserts::assert_eq!(frontend.component.heartbeat_env_vars, expected);

let standalone =
GreptimeOptions::<StandaloneOptions>::load_layered_options(None, env_prefix).unwrap();
similar_asserts::assert_eq!(standalone.component.heartbeat_env_vars, expected);
});
}
61 changes: 61 additions & 0 deletions src/common/meta/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::{DefaultHasher, Hash, Hasher};
use std::str::FromStr;
Expand Down Expand Up @@ -134,6 +135,9 @@ pub struct NodeInfo {
// The node build hostname
#[serde(default)]
pub hostname: String,
/// Environment variables reported by the node.
#[serde(default)]
pub env_vars: HashMap<String, String>,
}

#[derive(Debug, Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize, PartialOrd, Ord)]
Expand Down Expand Up @@ -355,6 +359,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};

let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
Expand Down Expand Up @@ -451,6 +456,7 @@ mod tests {
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test_hostname".to_string(),
env_vars: Default::default(),
};

let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
Expand All @@ -464,4 +470,59 @@ mod tests {
} if workloads.types == vec![7]
);
}

#[test]
fn test_node_info_backward_compatible_without_env_vars() {
// Simulate a NodeInfo serialized before env_vars was added
let raw = r#"{
"peer":{"id":1,"addr":"127.0.0.1"},
"last_activity_ts":123,
"status":{"Datanode":{"rcus":0,"wcus":0,"leader_regions":0,"follower_regions":0,"workloads":{"types":[0]}}},
"version":"",
"git_commit":"",
"start_time_ms":1,
"total_cpu_millicores":0,
"total_memory_bytes":0,
"cpu_usage_millicores":0,
"memory_usage_bytes":0,
"hostname":"test"
}"#;

let node_info: NodeInfo = raw.parse().unwrap();
assert!(node_info.env_vars.is_empty());
}

#[test]
fn test_node_info_with_env_vars_round_trip() {
let mut env_vars = HashMap::new();
env_vars.insert("AZ".to_string(), "us-east-1a".to_string());

let node_info = NodeInfo {
peer: Peer {
id: 1,
addr: "127.0.0.1".to_string(),
},
last_activity_ts: 123,
status: NodeStatus::Datanode(DatanodeStatus {
rcus: 0,
wcus: 0,
leader_regions: 0,
follower_regions: 0,
workloads: DatanodeWorkloads { types: vec![] },
}),
version: "".to_string(),
git_commit: "".to_string(),
start_time_ms: 1,
total_cpu_millicores: 0,
total_memory_bytes: 0,
cpu_usage_millicores: 0,
memory_usage_bytes: 0,
hostname: "test".to_string(),
env_vars,
};

let node_info_bytes: Vec<u8> = node_info.try_into().unwrap();
let new_node_info: NodeInfo = node_info_bytes.try_into().unwrap();
assert_eq!(new_node_info.env_vars.get("AZ").unwrap(), "us-east-1a");
}
}
74 changes: 73 additions & 1 deletion src/common/meta/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::str::FromStr;

use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, RequestHeader};
Expand Down Expand Up @@ -371,6 +371,48 @@ impl GcStat {
}
}

/// Environment variables reported by a node in heartbeat messages.
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct EnvVars {
pub vars: HashMap<String, String>,
}

impl EnvVars {
pub const ENV_VARS_KEY: &str = "__env_vars";

pub fn new(vars: HashMap<String, String>) -> Self {
Self { vars }
}

/// Read the configured env var keys from the environment and build an EnvVars.
pub fn from_config(keys: &[String]) -> Self {
let vars = keys
.iter()
.filter_map(|key| std::env::var(key).ok().map(|value| (key.clone(), value)))
.collect();
Self { vars }
}

pub fn into_extensions(&self, extensions: &mut HashMap<String, Vec<u8>>) {
if self.vars.is_empty() {
return;
}
let bytes = serde_json::to_vec(self).unwrap_or_default();
extensions.insert(Self::ENV_VARS_KEY.to_string(), bytes);
}

pub fn from_extensions(extensions: &HashMap<String, Vec<u8>>) -> Result<Option<Self>> {
extensions
.get(Self::ENV_VARS_KEY)
.map(|bytes| {
serde_json::from_slice(bytes).with_context(|_| DeserializeFromJsonSnafu {
input: String::from_utf8_lossy(bytes).to_string(),
})
})
.transpose()
}
}

/// The key of the datanode stat in the memory store.
///
/// The format is `__meta_datanode_stat-0-{node_id}`.
Expand Down Expand Up @@ -596,4 +638,34 @@ mod tests {
assert_eq!(stat.region_stats.len(), 1);
assert_eq!(stat.region_stats[0].role, RegionRole::StagingLeader);
}

#[test]
fn test_env_vars_round_trip() {
let mut vars = HashMap::new();
vars.insert("AZ".to_string(), "us-east-1a".to_string());
vars.insert("REGION".to_string(), "us-east-1".to_string());
let env_vars = EnvVars::new(vars);

let mut extensions = HashMap::new();
env_vars.into_extensions(&mut extensions);

let extracted = EnvVars::from_extensions(&extensions).unwrap().unwrap();
assert_eq!(extracted.vars.get("AZ").unwrap(), "us-east-1a");
assert_eq!(extracted.vars.get("REGION").unwrap(), "us-east-1");
}

#[test]
fn test_env_vars_empty_not_written() {
let env_vars = EnvVars::default();
let mut extensions = HashMap::new();
env_vars.into_extensions(&mut extensions);
assert!(extensions.is_empty());
}

#[test]
fn test_env_vars_from_extensions_missing() {
let extensions = HashMap::new();
let result = EnvVars::from_extensions(&extensions).unwrap();
assert!(result.is_none());
}
}
11 changes: 10 additions & 1 deletion src/datanode/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ pub struct DatanodeOptions {
pub query: QueryOptions,
pub memory: MemoryOptions,

/// Environment variable keys to read and report in heartbeat messages.
/// The values of these env vars at startup will be sent to metasrv.
pub heartbeat_env_vars: Vec<String>,

/// Deprecated options, please use the new options instead.
#[deprecated(note = "Please use `grpc.bind_addr` instead.")]
pub rpc_addr: Option<String>,
Expand Down Expand Up @@ -136,6 +140,7 @@ impl Default for DatanodeOptions {
tracing: TracingOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
heartbeat_env_vars: vec![],

// Deprecated options
rpc_addr: None,
Expand All @@ -149,7 +154,11 @@ impl Default for DatanodeOptions {

impl Configurable for DatanodeOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
Some(&[
"heartbeat_env_vars",
"meta_client.metasrv_addrs",
"wal.broker_endpoints",
])
}
}

Expand Down
10 changes: 7 additions & 3 deletions src/datanode/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use api::v1::meta::heartbeat_request::NodeWorkloads;
use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
use common_base::Plugins;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::datanode::REGION_STATISTIC_KEY;
use common_meta::datanode::{EnvVars, REGION_STATISTIC_KEY};
use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
Expand Down Expand Up @@ -66,6 +66,7 @@ pub struct HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
region_alive_keeper: Arc<RegionAliveKeeper>,
resource_stat: ResourceStatRef,
env_vars: EnvVars,
}

impl Drop for HeartbeatTask {
Expand Down Expand Up @@ -114,6 +115,7 @@ impl HeartbeatTask {
resp_handler_executor,
region_alive_keeper,
resource_stat,
env_vars: EnvVars::from_config(&opts.heartbeat_env_vars),
})
}

Expand Down Expand Up @@ -258,6 +260,8 @@ impl HeartbeatTask {
.mito_engine()
.context(RegionEngineNotFoundSnafu { name: "mito" })?
.gc_limiter();
let mut env_var_extensions = HashMap::new();
self.env_vars.into_extensions(&mut env_var_extensions);

common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
Expand Down Expand Up @@ -300,7 +304,7 @@ impl HeartbeatTask {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let mut extensions = heartbeat_request.extensions.clone();
let mut extensions = env_var_extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);

Expand Down Expand Up @@ -328,7 +332,7 @@ impl HeartbeatTask {
let now = Instant::now();
let duration_since_epoch = (now - epoch).as_millis() as u64;

let mut extensions = heartbeat_request.extensions.clone();
let mut extensions = env_var_extensions.clone();
let gc_stat = gc_limiter.gc_stat();
gc_stat.into_extensions(&mut extensions);

Expand Down
5 changes: 4 additions & 1 deletion src/frontend/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub struct FrontendOptions {
pub memory: MemoryOptions,
/// The event recorder options.
pub event_recorder: EventRecorderOptions,
/// Environment variable keys to read and report in heartbeat messages.
pub heartbeat_env_vars: Vec<String>,
}

impl Default for FrontendOptions {
Expand Down Expand Up @@ -101,13 +103,14 @@ impl Default for FrontendOptions {
slow_query: SlowQueryOptions::default(),
memory: MemoryOptions::default(),
event_recorder: EventRecorderOptions::default(),
heartbeat_env_vars: vec![],
}
}
}

impl Configurable for FrontendOptions {
fn env_list_keys() -> Option<&'static [&'static str]> {
Some(&["meta_client.metasrv_addrs"])
Some(&["heartbeat_env_vars", "meta_client.metasrv_addrs"])
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/frontend/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod tests;
use std::sync::Arc;

use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
use common_meta::datanode::EnvVars;
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
Expand Down Expand Up @@ -45,6 +46,7 @@ pub struct HeartbeatTask {
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
start_time_ms: u64,
resource_stat: ResourceStatRef,
env_vars: EnvVars,
}

impl HeartbeatTask {
Expand All @@ -67,6 +69,7 @@ impl HeartbeatTask {
resp_handler_executor,
start_time_ms: common_time::util::current_time_millis() as u64,
resource_stat,
env_vars: EnvVars::from_config(&opts.heartbeat_env_vars),
}
}

Expand Down Expand Up @@ -202,17 +205,22 @@ impl HeartbeatTask {
let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
let resource_stat = self.resource_stat.clone();
let env_vars = self.env_vars.clone();
common_runtime::spawn_hb(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

let mut extensions = std::collections::HashMap::new();
env_vars.into_extensions(&mut extensions);

let heartbeat_request = HeartbeatRequest {
peer: self_peer,
info: Self::build_node_info(
start_time_ms,
total_cpu_millicores,
total_memory_bytes,
),
extensions,
..Default::default()
};

Expand Down
Loading
Loading