diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 7283fe96..522be7f3 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -111,6 +111,7 @@ func main() { }() tp = sdktp } + tracer := tp.Tracer(svcName) mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.ClientID, cfg.ClientID, cfg.ClientKey, cfg.DomainID, cfg.ChannelID, cfg.MQTTTimeout, logger) diff --git a/cmd/proxy/main.go b/cmd/proxy/main.go index bba1942f..567b21d0 100644 --- a/cmd/proxy/main.go +++ b/cmd/proxy/main.go @@ -6,19 +6,28 @@ import ( "fmt" "log" "log/slog" + "net/http" + "net/url" "os" + "os/signal" + "syscall" "time" "github.com/absmach/propeller" "github.com/absmach/propeller/pkg/mqtt" "github.com/absmach/propeller/proxy" + "github.com/absmach/magistrala/pkg/jaeger" "github.com/caarlos0/env/v11" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/otel/trace" + "go.opentelemetry.io/otel/trace/noop" "golang.org/x/sync/errgroup" ) const ( - svcName = "proxy" - configPath = "config.toml" + svcName = "proxy" + configPath = "config.toml" + defMetricsPort = "7071" ) type config struct { @@ -37,11 +46,13 @@ type config struct { Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""` Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""` RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"` + // Observability + MetricsPort string `env:"PROXY_METRICS_PORT" envDefault:"7071"` + OTELURL url.URL `env:"PROXY_OTEL_URL"` + TraceRatio float64 `env:"PROXY_TRACE_RATIO" envDefault:"0"` } func main() { - g, ctx := errgroup.WithContext(context.Background()) - cfg := config{} if err := env.Parse(&cfg); err != nil { log.Fatalf("failed to load configuration : %s", err.Error()) @@ -72,12 +83,37 @@ func main() { if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil { log.Fatalf("failed to parse log level: %s", err.Error()) } + logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: level, }) logger := slog.New(logHandler) slog.SetDefault(logger) + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT) + defer stop() + g, ctx := errgroup.WithContext(ctx) + + var tp trace.TracerProvider + switch cfg.OTELURL { + case url.URL{}: + tp = noop.NewTracerProvider() + default: + sdktp, err := jaeger.NewProvider(ctx, svcName, cfg.OTELURL, "", cfg.TraceRatio) + if err != nil { + logger.Error("failed to initialize jaeger tracing", slog.String("error", err.Error())) + //nolint:gocritic + os.Exit(1) + } + defer func() { + if err := sdktp.Shutdown(ctx); err != nil { + logger.Error("error shutting down tracer provider", slog.Any("error", err)) + } + }() + tp = sdktp + } + _ = tp + mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.ClientID, cfg.ClientID, cfg.ClientKey, cfg.DomainID, cfg.ChannelID, cfg.MQTTTimeout, logger) if err != nil { logger.Error("failed to initialize mqtt client", slog.Any("error", err)) @@ -118,6 +154,27 @@ func main() { slog.Info("successfully subscribed to topic") + // Start Prometheus metrics server + metricsPort := cfg.MetricsPort + if metricsPort == "" { + metricsPort = defMetricsPort + } + g.Go(func() error { + mux := http.NewServeMux() + mux.Handle("/metrics", promhttp.Handler()) + mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("OK")) + }) + server := &http.Server{ + Addr: ":" + metricsPort, + Handler: mux, + } + logger.Info("starting metrics server", slog.String("port", metricsPort)) + + return server.ListenAndServe() + }) + g.Go(func() error { return service.StreamHTTP(ctx) }) diff --git a/deploy/otel-collector-config.yaml b/deploy/otel-collector-config.yaml new file mode 100644 index 00000000..aac050d8 --- /dev/null +++ b/deploy/otel-collector-config.yaml @@ -0,0 +1,138 @@ +# OpenTelemetry Collector Configuration for Propeller +# This configuration receives telemetry data from Propeller services and exports +# it to various backends for visualization and alerting. +# +# Usage: +# docker run --rm -p 4317:4317 -p 4318:4318 -p 8889:8889 \ +# -v $(pwd)/deploy/otel-collector-config.yaml:/etc/otelcol/config.yaml \ +# otel/opentelemetry-collector-contrib:latest + +receivers: + # OTLP receiver for traces, metrics, and logs from Propeller services + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + + # Prometheus receiver to scrape metrics from services + prometheus: + config: + scrape_configs: + - job_name: 'propeller-manager' + scrape_interval: 15s + static_configs: + - targets: ['manager:7070'] + metrics_path: '/metrics' + + - job_name: 'propeller-proxy' + scrape_interval: 15s + static_configs: + - targets: ['proxy:7071'] + metrics_path: '/metrics' + +processors: + # Batch processor for efficient export + batch: + timeout: 1s + send_batch_size: 1024 + send_batch_max_size: 2048 + + # Memory limiter to prevent OOM + memory_limiter: + check_interval: 1s + limit_mib: 512 + spike_limit_mib: 128 + + # Resource processor to add common attributes + resource: + attributes: + - key: deployment.environment + value: ${PROPELLER_ENV:-development} + action: upsert + - key: service.namespace + value: propeller + action: upsert + + # Attributes processor for filtering sensitive data + attributes: + actions: + - key: client_key + action: delete + - key: password + action: delete + - key: token + action: delete + +exporters: + # Debug exporter for local development + debug: + verbosity: detailed + sampling_initial: 5 + sampling_thereafter: 200 + + # Prometheus exporter for metrics (scraped by Prometheus/Grafana) + prometheus: + endpoint: "0.0.0.0:8889" + namespace: propeller + const_labels: + collector: otel + + # OTLP exporter for traces (to Jaeger, Tempo, etc.) + otlp/traces: + endpoint: ${JAEGER_ENDPOINT:-jaeger:4317} + tls: + insecure: true + + # Loki exporter for logs (optional) + # Uncomment if using Grafana Loki + # loki: + # endpoint: ${LOKI_ENDPOINT:-http://loki:3100/loki/api/v1/push} + # labels: + # attributes: + # service.name: "service_name" + # service.namespace: "namespace" + +extensions: + # Health check extension + health_check: + endpoint: 0.0.0.0:13133 + + # Performance profiler (pprof) + pprof: + endpoint: 0.0.0.0:1777 + + # zPages for debugging + zpages: + endpoint: 0.0.0.0:55679 + +service: + extensions: [health_check, pprof, zpages] + + pipelines: + # Traces pipeline + traces: + receivers: [otlp] + processors: [memory_limiter, batch, resource, attributes] + exporters: [otlp/traces, debug] + + # Metrics pipeline + metrics: + receivers: [otlp, prometheus] + processors: [memory_limiter, batch, resource] + exporters: [prometheus, debug] + + # Logs pipeline + logs: + receivers: [otlp] + processors: [memory_limiter, batch, resource, attributes] + exporters: [debug] + # Uncomment to enable Loki export: + # exporters: [loki, debug] + + telemetry: + logs: + level: info + metrics: + address: 0.0.0.0:8888 diff --git a/docker/compose.observability.yaml b/docker/compose.observability.yaml new file mode 100644 index 00000000..651d4430 --- /dev/null +++ b/docker/compose.observability.yaml @@ -0,0 +1,90 @@ +# Observability Stack for Propeller +# This compose file extends the main compose.yaml with observability services. +# +# Usage: +# docker-compose -f compose.yaml -f compose.observability.yaml up -d +# +# Or with make: +# make start-observability + +name: "propeller-observability" + +networks: + supermq-base-net: + external: true + +services: + otel-collector: + image: docker.io/otel/opentelemetry-collector-contrib:0.96.0 + container_name: propeller-otel-collector + restart: unless-stopped + command: ["--config=/etc/otelcol/config.yaml"] + networks: + - supermq-base-net + ports: + - "4317:4317" # OTLP gRPC receiver + - "4318:4318" # OTLP HTTP receiver + - "8889:8889" # Prometheus exporter + - "13133:13133" # Health check + - "55679:55679" # zPages + volumes: + - ../deploy/otel-collector-config.yaml:/etc/otelcol/config.yaml:ro + environment: + - PROPELLER_ENV=${PROPELLER_ENV:-development} + - JAEGER_ENDPOINT=jaeger:4317 + depends_on: + - jaeger + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:13133/"] + interval: 30s + timeout: 10s + retries: 3 + + # Prometheus - Metrics collection and storage + prometheus: + image: docker.io/prom/prometheus:v3.3.0 + container_name: propeller-prometheus + restart: unless-stopped + networks: + - supermq-base-net + ports: + - "9090:9090" + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + - prometheus-data:/prometheus + command: + - '--config.file=/etc/prometheus/prometheus.yml' + - '--storage.tsdb.path=/prometheus' + - '--storage.tsdb.retention.time=15d' + - '--web.enable-lifecycle' + - '--web.enable-admin-api' + healthcheck: + test: ["CMD", "wget", "-q", "--spider", "http://localhost:9090/-/healthy"] + interval: 30s + timeout: 10s + retries: 3 + + # Grafana - Visualization and dashboards + grafana: + image: docker.io/grafana/grafana:12.0.1 + container_name: propeller-grafana + restart: unless-stopped + networks: + - supermq-base-net + ports: + - "3000:3000" + volumes: + - grafana-data:/var/lib/grafana + - ./grafana/provisioning:/etc/grafana/provisioning:ro + environment: + - GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin} + - GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin} + - GF_USERS_ALLOW_SIGN_UP=false + - GF_FEATURE_TOGGLES_ENABLE=traceqlEditor + depends_on: + - prometheus + - jaeger + +volumes: + prometheus-data: + grafana-data: diff --git a/docker/grafana/provisioning/datasources/datasources.yml b/docker/grafana/provisioning/datasources/datasources.yml new file mode 100644 index 00000000..e2e75256 --- /dev/null +++ b/docker/grafana/provisioning/datasources/datasources.yml @@ -0,0 +1,26 @@ +# Grafana Data Sources for Propeller Observability +apiVersion: 1 + +datasources: + # Prometheus for metrics + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: true + jsonData: + timeInterval: "15s" + httpMethod: POST + + # Jaeger for traces + - name: Jaeger + type: jaeger + access: proxy + url: http://jaeger:16686 + editable: true + jsonData: + tracesToLogs: + datasourceUid: prometheus + filterByTraceID: true + filterBySpanID: false diff --git a/docker/prometheus/prometheus.yml b/docker/prometheus/prometheus.yml new file mode 100644 index 00000000..8ba591bd --- /dev/null +++ b/docker/prometheus/prometheus.yml @@ -0,0 +1,73 @@ +# Prometheus configuration for Propeller observability +global: + scrape_interval: 15s + evaluation_interval: 15s + + external_labels: + monitor: 'propeller' + +# Alertmanager configuration (optional) +# alerting: +# alertmanagers: +# - static_configs: +# - targets: +# - alertmanager:9093 + +# Rule files (optional) +# rule_files: +# - "first_rules.yml" +# - "second_rules.yml" + +scrape_configs: + # Prometheus self-monitoring + - job_name: 'prometheus' + static_configs: + - targets: ['localhost:9090'] + + # OpenTelemetry Collector metrics + - job_name: 'otel-collector' + static_configs: + - targets: ['otel-collector:8888'] + metrics_path: '/metrics' + + # Propeller Manager service + - job_name: 'propeller-manager' + static_configs: + - targets: ['manager:7070'] + metrics_path: '/metrics' + relabel_configs: + - source_labels: [__address__] + target_label: instance + replacement: 'manager' + + # Propeller Proxy service + - job_name: 'propeller-proxy' + static_configs: + - targets: ['proxy:7071'] + metrics_path: '/metrics' + relabel_configs: + - source_labels: [__address__] + target_label: instance + replacement: 'proxy' + + # OTEL Collector Prometheus exporter (for OTLP metrics) + - job_name: 'propeller-otlp-metrics' + static_configs: + - targets: ['otel-collector:8889'] + metrics_path: '/metrics' + + # Jaeger metrics + - job_name: 'jaeger' + static_configs: + - targets: ['jaeger:14269'] + + # Proplet (Rust) metrics - for local development + # In production, proplets register dynamically via service discovery + - job_name: 'proplet' + static_configs: + - targets: ['proplet:7072'] + metrics_path: '/metrics' + relabel_configs: + - source_labels: [__address__] + target_label: instance + replacement: 'proplet' diff --git a/embed-proplet/src/mqtt_client.c b/embed-proplet/src/mqtt_client.c index 5a02cd05..7e9e071b 100644 --- a/embed-proplet/src/mqtt_client.c +++ b/embed-proplet/src/mqtt_client.c @@ -1653,6 +1653,63 @@ void publish_active_task_metrics(const char *domain_id, const char *channel_id, } } +void publish_wasm_metrics(const char *domain_id, const char *channel_id, + const char *proplet_id) { + wasm_metrics_t metrics; + + if (task_monitor_get_wasm_metrics(&metrics) != 0) { + LOG_DBG("No WASM metrics available"); + return; + } + + /* Skip publishing if no executions have occurred */ + if (metrics.execution_count == 0 && metrics.error_count == 0) { + return; + } + + cJSON *root = cJSON_CreateObject(); + if (root == NULL) { + LOG_ERR("Failed to create JSON for WASM metrics"); + return; + } + + cJSON_AddStringToObject(root, "proplet_id", proplet_id); + cJSON_AddNumberToObject(root, "timestamp", (double)k_uptime_get()); + + cJSON *wasm_obj = cJSON_AddObjectToObject(root, "wasm_metrics"); + if (wasm_obj != NULL) { + cJSON_AddNumberToObject(wasm_obj, "execution_count", + (double)metrics.execution_count); + cJSON_AddNumberToObject(wasm_obj, "total_execution_us", + (double)metrics.total_execution_us); + cJSON_AddNumberToObject(wasm_obj, "error_count", + (double)metrics.error_count); + cJSON_AddNumberToObject(wasm_obj, "oom_count", + (double)metrics.oom_count); + cJSON_AddNumberToObject(wasm_obj, "last_execution_us", + (double)metrics.last_execution_us); + cJSON_AddNumberToObject(wasm_obj, "max_execution_us", + (double)metrics.max_execution_us); + + /* Calculate average execution time */ + if (metrics.execution_count > 0) { + double avg_execution_us = (double)metrics.total_execution_us / + (double)metrics.execution_count; + cJSON_AddNumberToObject(wasm_obj, "avg_execution_us", avg_execution_us); + } + } + + char *payload = cJSON_PrintUnformatted(root); + if (payload != NULL) { + if (publish(domain_id, channel_id, METRICS_TOPIC_TEMPLATE, payload) == 0) { + LOG_DBG("Published WASM metrics"); + } + cJSON_free(payload); + } + + cJSON_Delete(root); +} + void mqtt_client_process(void) { if (mqtt_connected) { int ret = diff --git a/embed-proplet/src/mqtt_client.h b/embed-proplet/src/mqtt_client.h index 43aeba8c..7baeb68e 100644 --- a/embed-proplet/src/mqtt_client.h +++ b/embed-proplet/src/mqtt_client.h @@ -209,6 +209,19 @@ void publish_task_metrics(const char *domain_id, const char *channel_id, void publish_active_task_metrics(const char *domain_id, const char *channel_id, const char *proplet_id); +/** + * @brief Publish WASM execution metrics. + * + * Publishes aggregate WASM execution statistics (execution count, timing, errors). + * + * @param domain_id Domain ID used for topic generation. + * @param channel_id Channel ID used for topic generation. + * @param proplet_id Proplet identity. + */ +void publish_wasm_metrics(const char *domain_id, + const char *channel_id, + const char *proplet_id); + /** * @brief Process incoming MQTT messages and maintain keepalive. */ diff --git a/embed-proplet/src/task_monitor.c b/embed-proplet/src/task_monitor.c index 90af13d0..9d892218 100644 --- a/embed-proplet/src/task_monitor.c +++ b/embed-proplet/src/task_monitor.c @@ -33,6 +33,10 @@ static monitored_task_t g_monitored_tasks[MAX_MONITORED_TASKS]; static bool g_initialized = false; static K_MUTEX_DEFINE(g_task_monitor_mutex); +/* Global WASM execution metrics */ +static wasm_metrics_t g_wasm_metrics = {0}; +static K_MUTEX_DEFINE(g_wasm_metrics_mutex); + static int find_task_slot(const char *task_id); static int find_free_slot(void); static void collect_current_metrics(process_metrics_t *metrics, @@ -369,3 +373,73 @@ static void collect_current_metrics(process_metrics_t *metrics, metrics->thread_count = 1; #endif } + +/* WASM execution metrics functions */ + +uint64_t task_monitor_wasm_start(void) +{ + return (uint64_t)k_uptime_ticks(); +} + +void task_monitor_wasm_end(uint64_t start_ticks) +{ + uint64_t end_ticks = (uint64_t)k_uptime_ticks(); + uint32_t elapsed_us = (uint32_t)k_ticks_to_us_floor64(end_ticks - start_ticks); + + k_mutex_lock(&g_wasm_metrics_mutex, K_FOREVER); + + g_wasm_metrics.execution_count++; + g_wasm_metrics.total_execution_us += elapsed_us; + g_wasm_metrics.last_execution_us = elapsed_us; + + if (elapsed_us > g_wasm_metrics.max_execution_us) { + g_wasm_metrics.max_execution_us = elapsed_us; + } + + k_mutex_unlock(&g_wasm_metrics_mutex); + + LOG_DBG("WASM execution completed in %u us (total: %u executions)", + elapsed_us, g_wasm_metrics.execution_count); +} + +void task_monitor_wasm_error(void) +{ + k_mutex_lock(&g_wasm_metrics_mutex, K_FOREVER); + g_wasm_metrics.error_count++; + k_mutex_unlock(&g_wasm_metrics_mutex); + + LOG_WRN("WASM execution error recorded (total errors: %u)", + g_wasm_metrics.error_count); +} + +void task_monitor_wasm_oom(void) +{ + k_mutex_lock(&g_wasm_metrics_mutex, K_FOREVER); + g_wasm_metrics.oom_count++; + g_wasm_metrics.error_count++; + k_mutex_unlock(&g_wasm_metrics_mutex); + + LOG_ERR("WASM OOM error recorded (total OOM: %u)", g_wasm_metrics.oom_count); +} + +int task_monitor_get_wasm_metrics(wasm_metrics_t *metrics) +{ + if (metrics == NULL) { + return -EINVAL; + } + + k_mutex_lock(&g_wasm_metrics_mutex, K_FOREVER); + memcpy(metrics, &g_wasm_metrics, sizeof(wasm_metrics_t)); + k_mutex_unlock(&g_wasm_metrics_mutex); + + return 0; +} + +void task_monitor_reset_wasm_metrics(void) +{ + k_mutex_lock(&g_wasm_metrics_mutex, K_FOREVER); + memset(&g_wasm_metrics, 0, sizeof(g_wasm_metrics)); + k_mutex_unlock(&g_wasm_metrics_mutex); + + LOG_INF("WASM metrics reset"); +} diff --git a/embed-proplet/src/task_monitor.h b/embed-proplet/src/task_monitor.h index 8dc6fb2b..8674f3bd 100644 --- a/embed-proplet/src/task_monitor.h +++ b/embed-proplet/src/task_monitor.h @@ -11,6 +11,18 @@ extern "C" { #define MAX_MONITORED_TASKS 4 #define MAX_TASK_ID_LEN 64 +/** + * @brief WASM-specific execution metrics. + */ +typedef struct { + uint32_t execution_count; /**< Number of WASM executions completed */ + uint64_t total_execution_us; /**< Total WASM execution time in microseconds */ + uint32_t error_count; /**< Number of WASM execution errors */ + uint32_t oom_count; /**< Number of out-of-memory errors */ + uint32_t last_execution_us; /**< Duration of last WASM execution in microseconds */ + uint32_t max_execution_us; /**< Maximum WASM execution time in microseconds */ +} wasm_metrics_t; + /** * @brief Process-level metrics for a single sample. */ @@ -128,6 +140,45 @@ int task_monitor_get_active_count(void); */ int task_monitor_get_active_task_id_at(int index, char *task_id_out); +/** + * @brief Record a WASM execution start. + * + * Call this just before starting WASM execution to begin timing. + * + * @return Start timestamp in microseconds (for use with record_wasm_execution_end). + */ +uint64_t task_monitor_wasm_start(void); + +/** + * @brief Record a successful WASM execution completion. + * + * @param start_us Start timestamp from task_monitor_wasm_start(). + */ +void task_monitor_wasm_end(uint64_t start_us); + +/** + * @brief Record a WASM execution error. + */ +void task_monitor_wasm_error(void); + +/** + * @brief Record an out-of-memory error during WASM execution. + */ +void task_monitor_wasm_oom(void); + +/** + * @brief Get global WASM execution metrics. + * + * @param metrics Pointer to wasm_metrics_t to populate. + * @return 0 on success, negative errno on failure. + */ +int task_monitor_get_wasm_metrics(wasm_metrics_t *metrics); + +/** + * @brief Reset WASM execution metrics. + */ +void task_monitor_reset_wasm_metrics(void); + #ifdef __cplusplus } #endif diff --git a/embed-proplet/src/wasm_handler.c b/embed-proplet/src/wasm_handler.c index e11b930d..57e27f03 100644 --- a/embed-proplet/src/wasm_handler.c +++ b/embed-proplet/src/wasm_handler.c @@ -167,6 +167,8 @@ void execute_wasm_module(const char *task_id, const uint8_t *wasm_data, if (!module_inst) { LOG_ERR("Failed to instantiate WASM module: %s", error_buf); + /* Record potential OOM error (most common instantiation failure) */ + task_monitor_wasm_oom(); if (monitoring_started) { task_monitor_stop(task_id); @@ -256,8 +258,7 @@ void execute_wasm_module(const char *task_id, const uint8_t *wasm_data, wasm_runtime_create_exec_env(module_inst, 16 * 1024); if (!exec_env) { - - + task_monitor_wasm_error(); LOG_ERR("Failed to create execution environment for WASM module."); if (monitoring_started) { @@ -275,6 +276,9 @@ void execute_wasm_module(const char *task_id, const uint8_t *wasm_data, return; } + /* Start WASM execution timing */ + uint64_t wasm_start = task_monitor_wasm_start(); + if (!wasm_runtime_call_wasm_a(exec_env, func, result_count, results, n_args, args)) { @@ -283,13 +287,18 @@ void execute_wasm_module(const char *task_id, const uint8_t *wasm_data, snprintf(error_msg, sizeof(error_msg), "WASM execution failed: %s", exception ? exception : "Unknown error"); LOG_ERR("Error invoking WASM function: %s", error_msg); - + + /* Record WASM execution error */ + task_monitor_wasm_error(); + extern const char *channel_id; extern const char *domain_id; publish_results_with_error(domain_id, channel_id, task_id, NULL, error_msg); } else { + /* Record successful WASM execution with timing */ + task_monitor_wasm_end(wasm_start); char results_string[MAX_RESULTS * 16] = {0}; for (uint32_t i = 0; i < result_count; i++) diff --git a/manager/middleware/logging.go b/manager/middleware/logging.go index a96c9cee..bef31125 100644 --- a/manager/middleware/logging.go +++ b/manager/middleware/logging.go @@ -63,6 +63,7 @@ func (lm *loggingMiddleware) GetPropletSDF(ctx context.Context, id string) (resp return lm.svc.GetPropletSDF(ctx, id) } + func (lm *loggingMiddleware) ListProplets(ctx context.Context, offset, limit uint64, status string) (resp proplet.PropletPage, err error) { defer func(begin time.Time) { args := []any{ diff --git a/manager/middleware/metrics.go b/manager/middleware/metrics.go index 648f578e..7b5a6a59 100644 --- a/manager/middleware/metrics.go +++ b/manager/middleware/metrics.go @@ -45,6 +45,7 @@ func (mm *metricsMiddleware) GetPropletSDF(ctx context.Context, id string) (sdf. return mm.svc.GetPropletSDF(ctx, id) } + func (mm *metricsMiddleware) ListProplets(ctx context.Context, offset, limit uint64, status string) (proplet.PropletPage, error) { defer func(begin time.Time) { mm.counter.With("method", "list-proplets").Add(1) diff --git a/manager/middleware/tracing.go b/manager/middleware/tracing.go index d377639d..94fd647e 100644 --- a/manager/middleware/tracing.go +++ b/manager/middleware/tracing.go @@ -48,6 +48,7 @@ func (tm *tracing) ListProplets(ctx context.Context, offset, limit uint64, statu )) defer span.End() + return tm.svc.ListProplets(ctx, offset, limit, status) } diff --git a/manager/mocks/service.go b/manager/mocks/service.go index c4db18b5..f4c0f7a0 100644 --- a/manager/mocks/service.go +++ b/manager/mocks/service.go @@ -1300,6 +1300,7 @@ func (_mock *MockService) ListProplets(ctx context.Context, offset uint64, limit } else { r1 = ret.Error(1) } + return r0, r1 } diff --git a/manager/service.go b/manager/service.go index 5fd37494..39d61b78 100644 --- a/manager/service.go +++ b/manager/service.go @@ -21,6 +21,7 @@ import ( "github.com/absmach/propeller/pkg/job" "github.com/absmach/propeller/pkg/maps" "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/pkg/observability" "github.com/absmach/propeller/pkg/proplet" "github.com/absmach/propeller/pkg/scheduler" "github.com/absmach/propeller/pkg/sdf" @@ -242,9 +243,18 @@ func (svc *service) CreateTask(ctx context.Context, t task.Task) (task.Task, err t, err := svc.taskRepo.Create(ctx, t) if err != nil { + observability.RecordError(observability.ErrorTypeTask) + return task.Task{}, err } + // Record task creation metric + kind := observability.TaskKindStandard + if t.Kind == task.TaskKindFederated { + kind = observability.TaskKindFederated + } + observability.RecordTaskCreated(kind) + if t.Schedule != "" && svc.cronScheduler != nil { if err := svc.cronScheduler.ScheduleTask(ctx, t.ID); err != nil { svc.logger.WarnContext(ctx, "failed to schedule task in cron scheduler", "error", err, "task_id", t.ID) @@ -299,12 +309,24 @@ func (svc *service) CreateWorkflow(ctx context.Context, tasks []task.Task) ([]ta created := &createdTasks[j] _ = svc.taskRepo.Delete(ctx, created.ID) } + observability.RecordError(observability.ErrorTypeWorkflow) return nil, fmt.Errorf("failed to create task %s: %w", t.ID, err) } createdTasks = append(createdTasks, created) } + // Record workflow and task creation metrics + observability.RecordWorkflowCreated() + //nolint:gocritic + for _, t := range createdTasks { + kind := observability.TaskKindStandard + if t.Kind == task.TaskKindFederated { + kind = observability.TaskKindFederated + } + observability.RecordTaskCreated(kind) + } + return createdTasks, nil } @@ -364,12 +386,24 @@ func (svc *service) CreateJob(ctx context.Context, name string, tasks []task.Tas if svc.jobRepo != nil { _ = svc.jobRepo.Delete(ctx, jobID) } + observability.RecordError(observability.ErrorTypeJob) return "", nil, fmt.Errorf("failed to create task %s: %w", t.ID, err) } createdTasks = append(createdTasks, created) } + // Record job and task creation metrics + observability.RecordJobCreated() + //nolint:gocritic + for _, t := range createdTasks { + kind := observability.TaskKindStandard + if t.Kind == task.TaskKindFederated { + kind = observability.TaskKindFederated + } + observability.RecordTaskCreated(kind) + } + return jobID, createdTasks, nil } @@ -734,6 +768,9 @@ func (svc *service) StartTask(ctx context.Context, taskID string) error { return err } + // Record task started metric + observability.RecordTaskStarted() + return nil } @@ -1184,18 +1221,29 @@ func (svc *service) handle(ctx context.Context) func(topic string, msg map[strin return func(topic string, msg map[string]any) error { switch topic { case svc.baseTopic + "/control/proplet/create": + observability.RecordMQTTReceived(observability.TopicTypeDiscovery) if err := svc.createPropletHandler(ctx, msg); err != nil { return err } svc.logger.InfoContext(ctx, "successfully created proplet") case svc.baseTopic + "/control/proplet/alive": + observability.RecordMQTTReceived(observability.TopicTypeAlive) + return svc.updateLivenessHandler(ctx, msg) case svc.baseTopic + "/control/proplet/results": + observability.RecordMQTTReceived(observability.TopicTypeResults) + return svc.updateResultsHandler(ctx, msg) case svc.baseTopic + "/control/proplet/task_metrics": + observability.RecordMQTTReceived(observability.TopicTypeMetrics) + return svc.handleTaskMetrics(ctx, msg) case svc.baseTopic + "/control/proplet/metrics": + observability.RecordMQTTReceived(observability.TopicTypeMetrics) + return svc.handlePropletMetrics(ctx, msg) + default: + observability.RecordMQTTReceived(observability.TopicTypeControl) } return nil @@ -1231,9 +1279,14 @@ func (svc *service) createPropletHandler(ctx context.Context, msg map[string]any }, } if err := svc.propletRepo.Create(ctx, p); err != nil { + observability.RecordError(observability.ErrorTypeProplet) + return err } + // Record proplet registration metric + observability.RecordPropletRegistered() + return nil } @@ -1295,6 +1348,19 @@ func (svc *service) updateResultsHandler(ctx context.Context, msg map[string]any return err } + // Record task completion metrics + if t.State == task.Failed { + observability.RecordTaskCompleted(observability.TaskStatusFailed) + } else { + observability.RecordTaskCompleted(observability.TaskStatusSuccess) + } + + // Record execution duration if start time is available + if !t.StartTime.IsZero() { + duration := t.FinishTime.Sub(t.StartTime).Seconds() + observability.ObserveWasmExecution(t.Name, duration) + } + if t.JobID == "" { if err := svc.coordinator.OnTaskCompletion(ctx, taskID); err != nil { svc.logger.ErrorContext(ctx, "failed to trigger workflow coordinator", "task_id", taskID, "error", err) diff --git a/pkg/mqtt/tracing.go b/pkg/mqtt/tracing.go new file mode 100644 index 00000000..51a3dbbe --- /dev/null +++ b/pkg/mqtt/tracing.go @@ -0,0 +1,148 @@ +package mqtt + +import ( + "context" + "encoding/json" + + "go.opentelemetry.io/otel/trace" +) + +const ( + // TraceIDKey is the JSON key for trace ID in MQTT payloads. + TraceIDKey = "_trace_id" + // SpanIDKey is the JSON key for span ID in MQTT payloads. + SpanIDKey = "_span_id" + // TraceFlagsKey is the JSON key for trace flags in MQTT payloads. + TraceFlagsKey = "_trace_flags" +) + +// TraceContext represents trace context that can be embedded in MQTT messages. +type TraceContext struct { + TraceID string `json:"_trace_id,omitempty"` + SpanID string `json:"_span_id,omitempty"` + TraceFlags string `json:"_trace_flags,omitempty"` +} + +// InjectTraceContext injects trace context from the context into a map payload. +// This modifies the payload in place by adding trace context fields. +func InjectTraceContext(ctx context.Context, payload map[string]any) { + span := trace.SpanFromContext(ctx) + if span == nil { + return + } + + spanCtx := span.SpanContext() + if !spanCtx.IsValid() { + return + } + + payload[TraceIDKey] = spanCtx.TraceID().String() + payload[SpanIDKey] = spanCtx.SpanID().String() + payload[TraceFlagsKey] = spanCtx.TraceFlags().String() +} + +// InjectTraceContextJSON injects trace context into a JSON payload byte slice. +// Returns the modified payload or the original if injection fails. +func InjectTraceContextJSON(ctx context.Context, payload []byte) []byte { + span := trace.SpanFromContext(ctx) + if span == nil { + return payload + } + + spanCtx := span.SpanContext() + if !spanCtx.IsValid() { + return payload + } + + // Parse the existing JSON + var data map[string]any + if err := json.Unmarshal(payload, &data); err != nil { + // Not valid JSON or not an object, return as-is + return payload + } + + // Inject trace context + data[TraceIDKey] = spanCtx.TraceID().String() + data[SpanIDKey] = spanCtx.SpanID().String() + data[TraceFlagsKey] = spanCtx.TraceFlags().String() + + // Re-marshal + modified, err := json.Marshal(data) + if err != nil { + return payload + } + + return modified +} + +// ExtractTraceContext extracts trace context from a map payload. +// Returns a new context with the extracted trace context, or the original context if not found. +func ExtractTraceContext(ctx context.Context, payload map[string]any) context.Context { + traceIDStr, ok := payload[TraceIDKey].(string) + if !ok || traceIDStr == "" { + return ctx + } + + spanIDStr, ok := payload[SpanIDKey].(string) + if !ok || spanIDStr == "" { + return ctx + } + + traceID, err := trace.TraceIDFromHex(traceIDStr) + if err != nil { + return ctx + } + + spanID, err := trace.SpanIDFromHex(spanIDStr) + if err != nil { + return ctx + } + + // Parse trace flags if present + var flags trace.TraceFlags + if flagsStr, ok := payload[TraceFlagsKey].(string); ok && len(flagsStr) == 2 { + // TraceFlags is a single byte represented as 2 hex chars + var flagByte byte + for i := range 2 { + c := flagsStr[i] + switch { + case c >= '0' && c <= '9': + flagByte = flagByte*16 + (c - '0') + case c >= 'a' && c <= 'f': + flagByte = flagByte*16 + (c - 'a' + 10) + case c >= 'A' && c <= 'F': + flagByte = flagByte*16 + (c - 'A' + 10) + } + } + flags = trace.TraceFlags(flagByte) + } + + // Create a span context from the extracted data + spanCtx := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: traceID, + SpanID: spanID, + TraceFlags: flags, + Remote: true, + }) + + return trace.ContextWithRemoteSpanContext(ctx, spanCtx) +} + +// ExtractTraceContextJSON extracts trace context from a JSON payload. +// Returns a new context with the extracted trace context, or the original context if not found. +func ExtractTraceContextJSON(ctx context.Context, payload []byte) context.Context { + var data map[string]any + if err := json.Unmarshal(payload, &data); err != nil { + return ctx + } + + return ExtractTraceContext(ctx, data) +} + +// RemoveTraceContext removes trace context fields from a map payload. +// Useful when forwarding messages without exposing internal trace context. +func RemoveTraceContext(payload map[string]any) { + delete(payload, TraceIDKey) + delete(payload, SpanIDKey) + delete(payload, TraceFlagsKey) +} diff --git a/pkg/observability/config.go b/pkg/observability/config.go new file mode 100644 index 00000000..79e4dafb --- /dev/null +++ b/pkg/observability/config.go @@ -0,0 +1,48 @@ +package observability + +import "time" + +// Config holds unified observability configuration for all Propeller services. +type Config struct { + // Metrics settings + MetricsEnabled bool `env:"PROPELLER_METRICS_ENABLED" envDefault:"true"` + MetricsPort int `env:"PROPELLER_METRICS_PORT" envDefault:"9090"` + + // Logging settings + LogExportEnabled bool `env:"PROPELLER_LOG_EXPORT_ENABLED" envDefault:"false"` + LogLevel string `env:"PROPELLER_LOG_LEVEL" envDefault:"info"` + + // Service identification + ServiceName string `env:"OTEL_SERVICE_NAME"` + ServiceVersion string `env:"OTEL_SERVICE_VERSION" envDefault:"dev"` + + // Export settings + ExportTimeout time.Duration `env:"PROPELLER_EXPORT_TIMEOUT" envDefault:"10s"` + ExportInterval time.Duration `env:"PROPELLER_EXPORT_INTERVAL" envDefault:"15s"` + + // Retry settings for graceful degradation + MaxRetries int `env:"PROPELLER_OTEL_MAX_RETRIES" envDefault:"3"` + RetryInterval time.Duration `env:"PROPELLER_OTEL_RETRY_INTERVAL" envDefault:"5s"` +} + +// DefaultConfig returns a Config with sensible defaults. +func DefaultConfig() Config { + return Config{ + MetricsEnabled: true, + MetricsPort: 9090, + LogExportEnabled: false, + LogLevel: "info", + ServiceVersion: "dev", + ExportTimeout: 10 * time.Second, + ExportInterval: 15 * time.Second, + MaxRetries: 3, + RetryInterval: 5 * time.Second, + } +} + +// WithServiceName sets the service name for the config. +func (c Config) WithServiceName(name string) Config { + c.ServiceName = name + + return c +} diff --git a/pkg/observability/doc.go b/pkg/observability/doc.go new file mode 100644 index 00000000..81f2d0a8 --- /dev/null +++ b/pkg/observability/doc.go @@ -0,0 +1,51 @@ +// Package observability provides unified observability primitives for the Propeller platform. +// +// This package provides: +// - Application-level Prometheus metrics for Manager, Proxy, and Proplet services +// - Standardized log field names for consistent structured logging +// - Tracing utilities for span creation and context propagation +// - Utilities for trace context injection in logs +// +// # Metrics +// +// The package defines Prometheus metrics for tracking: +// - Task lifecycle events (created, started, completed, failed) +// - Proplet registrations and connections +// - Job and workflow execution +// - WASM execution duration +// - MQTT message throughput +// - Proxy container fetch operations +// +// Example usage: +// +// import "github.com/absmach/propeller/pkg/observability" +// +// // Record a task creation +// observability.RecordTaskCreated(observability.TaskKindStandard) +// +// // Record task completion +// observability.RecordTaskCompleted(observability.TaskStatusSuccess) +// +// // Observe WASM execution time +// observability.ObserveWasmExecution("my-task", 1.5) // 1.5 seconds +// +// # Tracing +// +// Distributed tracing is configured via the Jaeger provider from supermq. +// This package provides helper functions for span creation and context propagation. +// +// # Logging +// +// Create a logger with trace context injection: +// +// logger := observability.NewLogger(slog.LevelInfo, "manager", true) +// +// Use standardized field names: +// +// logger.Info("task started", +// slog.String(observability.FieldTaskID, taskID), +// slog.String(observability.FieldPropletID, propletID), +// ) +// +//nolint:godoclint +package observability diff --git a/pkg/observability/fields.go b/pkg/observability/fields.go new file mode 100644 index 00000000..38f2a23d --- /dev/null +++ b/pkg/observability/fields.go @@ -0,0 +1,256 @@ +package observability + +// Standard field names for structured logging across all Propeller services. +// Using consistent field names enables better log aggregation and querying. + +// FieldTaskID is the task identifier field. +const FieldTaskID = "task_id" + +// FieldPropletID is the proplet identifier field. +const FieldPropletID = "proplet_id" + +// FieldJobID is the job identifier field. +const FieldJobID = "job_id" + +// FieldWorkflowID is the workflow identifier field. +const FieldWorkflowID = "workflow_id" + +// FieldRoundID is the FL round identifier field. +const FieldRoundID = "round_id" + +// FieldAppName is the application name field. +const FieldAppName = "app_name" + +// FieldTraceID is the trace identifier field for distributed tracing. +const FieldTraceID = "trace_id" + +// FieldSpanID is the span identifier field for distributed tracing. +const FieldSpanID = "span_id" + +// FieldDuration is the duration field in milliseconds. +const FieldDuration = "duration_ms" + +// FieldTimestamp is the timestamp field. +const FieldTimestamp = "timestamp" + +// FieldStartTime is the start time field. +const FieldStartTime = "start_time" + +// FieldEndTime is the end time field. +const FieldEndTime = "end_time" + +// FieldUptime is the uptime field in seconds. +const FieldUptime = "uptime_seconds" + +// FieldError is the error message field. +const FieldError = "error" + +// FieldErrorType is the error type/category field. +const FieldErrorType = "error_type" + +// FieldStatus is the status field. +const FieldStatus = "status" + +// FieldState is the state field. +const FieldState = "state" + +// FieldCount is the count field. +const FieldCount = "count" + +// FieldOffset is the pagination offset field. +const FieldOffset = "offset" + +// FieldLimit is the pagination limit field. +const FieldLimit = "limit" + +// FieldTotal is the total count field. +const FieldTotal = "total" + +// FieldSize is the size in bytes field. +const FieldSize = "size_bytes" + +// FieldChunkNum is the chunk number field. +const FieldChunkNum = "chunk_num" + +// FieldTotalChunks is the total chunks field. +const FieldTotalChunks = "total_chunks" + +// FieldComponent is the component name field. +const FieldComponent = "component" + +// FieldService is the service name field. +const FieldService = "service" + +// FieldMethod is the method name field. +const FieldMethod = "method" + +// FieldOperation is the operation name field. +const FieldOperation = "operation" + +// FieldTopic is the MQTT topic field. +const FieldTopic = "topic" + +// FieldQoS is the MQTT QoS field. +const FieldQoS = "qos" + +// FieldPayload is the payload size field. +const FieldPayload = "payload_size" + +// FieldCPUPercent is the CPU usage percentage field. +const FieldCPUPercent = "cpu_percent" + +// FieldMemoryBytes is the memory usage in bytes field. +const FieldMemoryBytes = "memory_bytes" + +// FieldMemoryPercent is the memory usage percentage field. +const FieldMemoryPercent = "memory_percent" + +// FieldDiskRead is the disk read bytes field. +const FieldDiskRead = "disk_read_bytes" + +// FieldDiskWrite is the disk write bytes field. +const FieldDiskWrite = "disk_write_bytes" + +// FieldThreadCount is the thread count field. +const FieldThreadCount = "thread_count" + +// FieldTaskKind is the task kind/type field. +const FieldTaskKind = "task_kind" + +// FieldExecutionMode is the execution mode field. +const FieldExecutionMode = "execution_mode" + +// FieldWasmRuntime is the WASM runtime field. +const FieldWasmRuntime = "wasm_runtime" + +// FieldWasmModuleSize is the WASM module size field. +const FieldWasmModuleSize = "wasm_module_size" + +// FieldRemoteAddr is the remote address field. +const FieldRemoteAddr = "remote_addr" + +// FieldHTTPMethod is the HTTP method field. +const FieldHTTPMethod = "http_method" + +// FieldHTTPPath is the HTTP path field. +const FieldHTTPPath = "http_path" + +// FieldHTTPStatus is the HTTP status code field. +const FieldHTTPStatus = "http_status" + +// Component names for consistent service identification. +const ( + // ComponentManager is the manager service. + ComponentManager = "manager" + // ComponentProxy is the proxy service. + ComponentProxy = "proxy" + // ComponentProplet is the proplet service. + ComponentProplet = "proplet" + // ComponentScheduler is the scheduler component. + ComponentScheduler = "scheduler" + // ComponentStorage is the storage component. + ComponentStorage = "storage" + // ComponentMQTT is the MQTT component. + ComponentMQTT = "mqtt" + // ComponentHTTP is the HTTP component. + ComponentHTTP = "http" + // ComponentWorkflow is the workflow component. + ComponentWorkflow = "workflow" + // ComponentFL is the federated learning component. + ComponentFL = "fl" + // ComponentCron is the cron scheduler component. + ComponentCron = "cron" +) + +// Operation names for tracing spans. +const ( + // OpCreateTask is the create task operation. + OpCreateTask = "create-task" + // OpStartTask is the start task operation. + OpStartTask = "start-task" + // OpStopTask is the stop task operation. + OpStopTask = "stop-task" + // OpGetTask is the get task operation. + OpGetTask = "get-task" + // OpListTasks is the list tasks operation. + OpListTasks = "list-tasks" + // OpUpdateTask is the update task operation. + OpUpdateTask = "update-task" + // OpDeleteTask is the delete task operation. + OpDeleteTask = "delete-task" + // OpGetTaskResults is the get task results operation. + OpGetTaskResults = "get-task-results" + // OpGetTaskMetrics is the get task metrics operation. + OpGetTaskMetrics = "get-task-metrics" + + // OpCreateProplet is the create proplet operation. + OpCreateProplet = "create-proplet" + // OpGetProplet is the get proplet operation. + OpGetProplet = "get-proplet" + // OpListProplets is the list proplets operation. + OpListProplets = "list-proplets" + // OpDeleteProplet is the delete proplet operation. + OpDeleteProplet = "delete-proplet" + // OpSelectProplet is the select proplet operation. + OpSelectProplet = "select-proplet" + // OpGetPropletMetrics is the get proplet metrics operation. + OpGetPropletMetrics = "get-proplet-metrics" + + // OpCreateJob is the create job operation. + OpCreateJob = "create-job" + // OpStartJob is the start job operation. + OpStartJob = "start-job" + // OpStopJob is the stop job operation. + OpStopJob = "stop-job" + // OpGetJob is the get job operation. + OpGetJob = "get-job" + // OpListJobs is the list jobs operation. + OpListJobs = "list-jobs" + + // OpCreateWorkflow is the create workflow operation. + OpCreateWorkflow = "create-workflow" + + // OpConfigureExperiment is the configure FL experiment operation. + OpConfigureExperiment = "configure-experiment" + // OpGetFLTask is the get FL task operation. + OpGetFLTask = "get-fl-task" + // OpPostFLUpdate is the post FL update operation. + OpPostFLUpdate = "post-fl-update" + // OpGetRoundStatus is the get round status operation. + OpGetRoundStatus = "get-round-status" + + // OpFetchContainer is the fetch container operation. + OpFetchContainer = "fetch-container" + // OpSendChunk is the send chunk operation. + OpSendChunk = "send-chunk" + + // OpSubscribe is the MQTT subscribe operation. + OpSubscribe = "subscribe" + // OpPublish is the MQTT publish operation. + OpPublish = "publish" + // OpHandleMessage is the handle message operation. + OpHandleMessage = "handle-message" + + // OpExecuteWasm is the execute WASM operation. + OpExecuteWasm = "execute-wasm" + // OpLoadModule is the load module operation. + OpLoadModule = "load-module" +) + +// MQTT topic type labels for metrics. +const ( + // TopicTypeControl is the control topic type. + TopicTypeControl = "control" + // TopicTypeResults is the results topic type. + TopicTypeResults = "results" + // TopicTypeMetrics is the metrics topic type. + TopicTypeMetrics = "metrics" + // TopicTypeDiscovery is the discovery topic type. + TopicTypeDiscovery = "discovery" + // TopicTypeAlive is the alive/heartbeat topic type. + TopicTypeAlive = "alive" + // TopicTypeRegistry is the registry topic type. + TopicTypeRegistry = "registry" + // TopicTypeFL is the federated learning topic type. + TopicTypeFL = "fl" +) diff --git a/pkg/observability/logging.go b/pkg/observability/logging.go new file mode 100644 index 00000000..99e44545 --- /dev/null +++ b/pkg/observability/logging.go @@ -0,0 +1,193 @@ +package observability + +import ( + "context" + "log/slog" + "os" + "sync" + "time" + + "go.opentelemetry.io/otel/trace" +) + +// TracingHandler wraps an slog.Handler to inject trace context into log records. +type TracingHandler struct { + handler slog.Handler +} + +// NewTracingHandler creates a new handler that injects trace context. +func NewTracingHandler(handler slog.Handler) *TracingHandler { + return &TracingHandler{handler: handler} +} + +// Enabled reports whether the handler handles records at the given level. +func (h *TracingHandler) Enabled(ctx context.Context, level slog.Level) bool { + return h.handler.Enabled(ctx, level) +} + +// Handle adds trace context to the record and delegates to the wrapped handler. +func (h *TracingHandler) Handle(ctx context.Context, r slog.Record) error { + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + r.AddAttrs( + slog.String(FieldTraceID, span.SpanContext().TraceID().String()), + slog.String(FieldSpanID, span.SpanContext().SpanID().String()), + ) + } + + return h.handler.Handle(ctx, r) +} + +// WithAttrs returns a new Handler with the given attributes. +func (h *TracingHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return &TracingHandler{handler: h.handler.WithAttrs(attrs)} +} + +// WithGroup returns a new Handler with the given group name. +func (h *TracingHandler) WithGroup(name string) slog.Handler { + return &TracingHandler{handler: h.handler.WithGroup(name)} +} + +// NewLogger creates a new logger with the given level and optional trace injection. +func NewLogger(level slog.Level, serviceName string, injectTraceContext bool) *slog.Logger { + opts := &slog.HandlerOptions{ + Level: level, + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + // Add service name to all logs + if a.Key == slog.TimeKey { + return slog.Attr{ + Key: a.Key, + Value: slog.StringValue(a.Value.Time().Format(time.RFC3339Nano)), + } + } + + return a + }, + } + + baseHandler := slog.NewJSONHandler(os.Stdout, opts) + + // Add service name as default attribute + handlerWithService := baseHandler.WithAttrs([]slog.Attr{ + slog.String(FieldService, serviceName), + }) + + handler := handlerWithService + if injectTraceContext { + handler = NewTracingHandler(handlerWithService) + } + + return slog.New(handler) +} + +// ParseLogLevel parses a string log level into slog.Level. +func ParseLogLevel(level string) (slog.Level, error) { + var l slog.Level + err := l.UnmarshalText([]byte(level)) + + return l, err +} + +// BufferedLogExporter buffers logs for batch export. +type BufferedLogExporter struct { + mu sync.Mutex + records []LogRecord + batchSize int + exportFn func([]LogRecord) error +} + +// LogRecord represents a log entry for export. +type LogRecord struct { + Timestamp time.Time + Level slog.Level + Message string + Attributes map[string]any + TraceID string + SpanID string + ServiceName string +} + +// NewBufferedLogExporter creates a new buffered log exporter. +func NewBufferedLogExporter(batchSize int, exportFn func([]LogRecord) error) *BufferedLogExporter { + return &BufferedLogExporter{ + records: make([]LogRecord, 0, batchSize), + batchSize: batchSize, + exportFn: exportFn, + } +} + +// Add adds a log record to the buffer, flushing if needed. +func (e *BufferedLogExporter) Add(record LogRecord) error { + e.mu.Lock() + defer e.mu.Unlock() + + e.records = append(e.records, record) + + if len(e.records) >= e.batchSize { + return e.flushLocked() + } + + return nil +} + +// Flush exports all buffered records. +func (e *BufferedLogExporter) Flush() error { + e.mu.Lock() + defer e.mu.Unlock() + + return e.flushLocked() +} + +func (e *BufferedLogExporter) flushLocked() error { + if len(e.records) == 0 { + return nil + } + + records := e.records + e.records = make([]LogRecord, 0, e.batchSize) + + if e.exportFn != nil { + return e.exportFn(records) + } + + return nil +} + +// ContextLogger returns a logger with context-specific fields. +func ContextLogger(logger *slog.Logger, ctx context.Context, fields ...any) *slog.Logger { + args := make([]any, 0, len(fields)+4) + args = append(args, fields...) + + // Add trace context if available + span := trace.SpanFromContext(ctx) + if span.SpanContext().IsValid() { + args = append(args, + slog.String(FieldTraceID, span.SpanContext().TraceID().String()), + slog.String(FieldSpanID, span.SpanContext().SpanID().String()), + ) + } + + return logger.With(args...) +} + +// LogOperation logs the start and end of an operation with timing. +func LogOperation(logger *slog.Logger, operation string) func(err error) { + start := time.Now() + logger.Info("operation started", slog.String(FieldOperation, operation)) + + return func(err error) { + duration := time.Since(start) + if err != nil { + logger.Error("operation failed", + slog.String(FieldOperation, operation), + slog.Duration(FieldDuration, duration), + slog.String(FieldError, err.Error()), + ) + } else { + logger.Info("operation completed", + slog.String(FieldOperation, operation), + slog.Duration(FieldDuration, duration), + ) + } + } +} diff --git a/pkg/observability/metrics.go b/pkg/observability/metrics.go new file mode 100644 index 00000000..a458dde3 --- /dev/null +++ b/pkg/observability/metrics.go @@ -0,0 +1,386 @@ +// Package observability provides unified observability primitives for the Propeller platform. +// It includes application-level metrics, logging utilities, and tracing configuration +// with support for OpenTelemetry Collector as the backend. +// +//nolint:godoclint +package observability + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +const ( + namespace = "propeller" +) + +// Manager metrics track task and proplet lifecycle events. +var ( + // TasksCreatedTotal counts the total number of tasks created, labeled by kind. + TasksCreatedTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "tasks_created_total", + Help: "Total number of tasks created", + }, + []string{"kind"}, + ) + + // TasksStartedTotal counts the total number of tasks started. + TasksStartedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "tasks_started_total", + Help: "Total number of tasks started", + }, + ) + + // TasksCompletedTotal counts completed tasks, labeled by status (success/failed). + TasksCompletedTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "tasks_completed_total", + Help: "Total number of tasks completed", + }, + []string{"status"}, + ) + + // ActiveTasks tracks currently running tasks. + ActiveTasks = promauto.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "active_tasks", + Help: "Number of currently active tasks", + }, + ) + + // PropletRegistrations counts proplet registration events. + PropletRegistrations = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "proplet_registrations_total", + Help: "Total number of proplet registrations", + }, + ) + + // PropletDeregistrations counts proplet deregistration/leave events. + PropletDeregistrations = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "proplet_deregistrations_total", + Help: "Total number of proplet deregistrations", + }, + ) + + // JobsCreatedTotal counts jobs created. + JobsCreatedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "jobs_created_total", + Help: "Total number of jobs created", + }, + ) + + // JobsCompletedTotal counts completed jobs, labeled by status. + JobsCompletedTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "jobs_completed_total", + Help: "Total number of jobs completed", + }, + []string{"status"}, + ) + + // WorkflowsCreatedTotal counts workflows created. + WorkflowsCreatedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "workflows_created_total", + Help: "Total number of workflows created", + }, + ) + + // WasmExecutionDuration tracks WASM task execution duration. + WasmExecutionDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "wasm_execution_duration_seconds", + Help: "WASM task execution duration in seconds", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 12), // 0.1s to ~400s + }, + []string{"task_name"}, + ) + + // ErrorsTotal counts errors by type. + ErrorsTotal = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "errors_total", + Help: "Total number of errors by type", + }, + []string{"error_type"}, + ) + + // MQTTMessagesReceived counts MQTT messages received by topic type. + MQTTMessagesReceived = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "mqtt_messages_received_total", + Help: "Total number of MQTT messages received", + }, + []string{"topic_type"}, + ) + + // MQTTMessagesPublished counts MQTT messages published by topic type. + MQTTMessagesPublished = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "mqtt_messages_published_total", + Help: "Total number of MQTT messages published", + }, + []string{"topic_type"}, + ) + + // FLRoundsTotal counts federated learning rounds. + FLRoundsTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "fl_rounds_total", + Help: "Total number of federated learning rounds", + }, + ) + + // FLUpdatesReceived counts FL updates received from proplets. + FLUpdatesReceived = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "fl_updates_received_total", + Help: "Total number of federated learning updates received", + }, + ) + + // ConnectedProplets tracks currently connected proplets. + ConnectedProplets = promauto.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "manager", + Name: "connected_proplets", + Help: "Number of currently connected proplets", + }, + ) +) + +// Proxy metrics track container fetching and chunk transfer. +var ( + // ProxyContainerFetchesTotal counts container fetch requests. + ProxyContainerFetchesTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "proxy", + Name: "container_fetches_total", + Help: "Total number of container fetch requests", + }, + ) + + // ProxyContainerFetchErrors counts failed container fetches. + ProxyContainerFetchErrors = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "proxy", + Name: "container_fetch_errors_total", + Help: "Total number of container fetch errors", + }, + ) + + // ProxyChunksSentTotal counts chunks sent via MQTT. + ProxyChunksSentTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "proxy", + Name: "chunks_sent_total", + Help: "Total number of WASM chunks sent via MQTT", + }, + ) + + // ProxyActiveFetches tracks concurrent fetch operations. + ProxyActiveFetches = promauto.NewGauge( + prometheus.GaugeOpts{ + Namespace: namespace, + Subsystem: "proxy", + Name: "active_fetches", + Help: "Number of active container fetch operations", + }, + ) + + // ProxyFetchDuration tracks container fetch duration. + ProxyFetchDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: "proxy", + Name: "fetch_duration_seconds", + Help: "Container fetch duration in seconds", + Buckets: prometheus.ExponentialBuckets(0.5, 2, 10), // 0.5s to ~500s + }, + ) + + // ProxyBytesTransferred tracks total bytes transferred. + ProxyBytesTransferred = promauto.NewCounter( + prometheus.CounterOpts{ + Namespace: namespace, + Subsystem: "proxy", + Name: "bytes_transferred_total", + Help: "Total bytes transferred for WASM modules", + }, + ) +) + +// Error type constants for consistent labeling. +const ( + ErrorTypeMQTT = "mqtt" + ErrorTypeStorage = "storage" + ErrorTypeTask = "task" + ErrorTypeProplet = "proplet" + ErrorTypeWorkflow = "workflow" + ErrorTypeJob = "job" + ErrorTypeFL = "fl" + ErrorTypeValidation = "validation" +) + +// Task status constants for consistent labeling. +const ( + TaskStatusSuccess = "success" + TaskStatusFailed = "failed" + TaskStatusTimeout = "timeout" +) + +// Task kind constants for consistent labeling. +const ( + TaskKindStandard = "standard" + TaskKindFederated = "federated" +) + +// RecordTaskCreated increments the task created counter. +func RecordTaskCreated(kind string) { + TasksCreatedTotal.WithLabelValues(kind).Inc() +} + +// RecordTaskStarted increments the task started counter and active tasks gauge. +func RecordTaskStarted() { + TasksStartedTotal.Inc() + ActiveTasks.Inc() +} + +// RecordTaskCompleted increments the completed counter and decrements active tasks. +func RecordTaskCompleted(status string) { + TasksCompletedTotal.WithLabelValues(status).Inc() + ActiveTasks.Dec() +} + +// RecordError increments the error counter for the given type. +func RecordError(errorType string) { + ErrorsTotal.WithLabelValues(errorType).Inc() +} + +// RecordPropletRegistered increments registration counter and connected gauge. +func RecordPropletRegistered() { + PropletRegistrations.Inc() + ConnectedProplets.Inc() +} + +// RecordPropletDeregistered increments deregistration counter and decrements connected gauge. +func RecordPropletDeregistered() { + PropletDeregistrations.Inc() + ConnectedProplets.Dec() +} + +// RecordJobCreated increments the job created counter. +func RecordJobCreated() { + JobsCreatedTotal.Inc() +} + +// RecordJobCompleted increments the job completed counter with status. +func RecordJobCompleted(status string) { + JobsCompletedTotal.WithLabelValues(status).Inc() +} + +// RecordWorkflowCreated increments the workflow created counter. +func RecordWorkflowCreated() { + WorkflowsCreatedTotal.Inc() +} + +// RecordMQTTReceived increments the MQTT received counter. +func RecordMQTTReceived(topicType string) { + MQTTMessagesReceived.WithLabelValues(topicType).Inc() +} + +// RecordMQTTPublished increments the MQTT published counter. +func RecordMQTTPublished(topicType string) { + MQTTMessagesPublished.WithLabelValues(topicType).Inc() +} + +// RecordFLRound increments the FL rounds counter. +func RecordFLRound() { + FLRoundsTotal.Inc() +} + +// RecordFLUpdate increments the FL updates counter. +func RecordFLUpdate() { + FLUpdatesReceived.Inc() +} + +// ObserveWasmExecution records the WASM execution duration. +func ObserveWasmExecution(taskName string, durationSeconds float64) { + WasmExecutionDuration.WithLabelValues(taskName).Observe(durationSeconds) +} + +// Proxy metric helpers. + +// RecordContainerFetch increments the container fetch counter. +func RecordContainerFetch() { + ProxyContainerFetchesTotal.Inc() +} + +// RecordContainerFetchError increments the fetch error counter. +func RecordContainerFetchError() { + ProxyContainerFetchErrors.Inc() +} + +// RecordChunkSent increments the chunks sent counter. +func RecordChunkSent() { + ProxyChunksSentTotal.Inc() +} + +// RecordActiveFetchStart increments the active fetches gauge. +func RecordActiveFetchStart() { + ProxyActiveFetches.Inc() +} + +// RecordActiveFetchEnd decrements the active fetches gauge. +func RecordActiveFetchEnd() { + ProxyActiveFetches.Dec() +} + +// ObserveFetchDuration records the container fetch duration. +func ObserveFetchDuration(durationSeconds float64) { + ProxyFetchDuration.Observe(durationSeconds) +} + +// RecordBytesTransferred adds to the bytes transferred counter. +func RecordBytesTransferred(bytes float64) { + ProxyBytesTransferred.Add(bytes) +} diff --git a/pkg/observability/metrics_test.go b/pkg/observability/metrics_test.go new file mode 100644 index 00000000..4994e7aa --- /dev/null +++ b/pkg/observability/metrics_test.go @@ -0,0 +1,135 @@ +//nolint:testpackage +package observability + +import ( + "testing" + + "github.com/prometheus/client_golang/prometheus" +) + +//nolint:paralleltest +func TestMetricsRegistered(t *testing.T) { + // Verify that metrics are registered by checking they can be collected + tests := []struct { + name string + collect func() prometheus.Collector + }{ + {"TasksCreatedTotal", func() prometheus.Collector { return TasksCreatedTotal }}, + {"TasksStartedTotal", func() prometheus.Collector { return TasksStartedTotal }}, + {"TasksCompletedTotal", func() prometheus.Collector { return TasksCompletedTotal }}, + {"ActiveTasks", func() prometheus.Collector { return ActiveTasks }}, + {"PropletRegistrations", func() prometheus.Collector { return PropletRegistrations }}, + {"PropletDeregistrations", func() prometheus.Collector { return PropletDeregistrations }}, + {"JobsCreatedTotal", func() prometheus.Collector { return JobsCreatedTotal }}, + {"JobsCompletedTotal", func() prometheus.Collector { return JobsCompletedTotal }}, + {"WorkflowsCreatedTotal", func() prometheus.Collector { return WorkflowsCreatedTotal }}, + {"WasmExecutionDuration", func() prometheus.Collector { return WasmExecutionDuration }}, + {"ErrorsTotal", func() prometheus.Collector { return ErrorsTotal }}, + {"ConnectedProplets", func() prometheus.Collector { return ConnectedProplets }}, + {"ProxyContainerFetchesTotal", func() prometheus.Collector { return ProxyContainerFetchesTotal }}, + {"ProxyContainerFetchErrors", func() prometheus.Collector { return ProxyContainerFetchErrors }}, + {"ProxyChunksSentTotal", func() prometheus.Collector { return ProxyChunksSentTotal }}, + {"ProxyActiveFetches", func() prometheus.Collector { return ProxyActiveFetches }}, + {"ProxyFetchDuration", func() prometheus.Collector { return ProxyFetchDuration }}, + {"ProxyBytesTransferred", func() prometheus.Collector { return ProxyBytesTransferred }}, + } + + //nolint:paralleltest + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + collector := tt.collect() + if collector == nil { + t.Errorf("%s metric is nil", tt.name) + } + }) + } +} + +//nolint:paralleltest +func TestRecordTaskCreated(t *testing.T) { + // Record a task and verify no panic + RecordTaskCreated(TaskKindStandard) + RecordTaskCreated(TaskKindFederated) +} + +//nolint:paralleltest +func TestRecordTaskLifecycle(t *testing.T) { + // Test task lifecycle recording + RecordTaskStarted() + RecordTaskCompleted(TaskStatusSuccess) + + RecordTaskStarted() + RecordTaskCompleted(TaskStatusFailed) +} + +//nolint:paralleltest +func TestRecordPropletLifecycle(t *testing.T) { + // Test proplet lifecycle recording + RecordPropletRegistered() + RecordPropletDeregistered() +} + +//nolint:paralleltest +func TestRecordJobLifecycle(t *testing.T) { + // Test job lifecycle recording + RecordJobCreated() + RecordJobCompleted(TaskStatusSuccess) +} + +//nolint:paralleltest +func TestRecordWorkflowCreated(t *testing.T) { + RecordWorkflowCreated() +} + +//nolint:paralleltest +func TestRecordError(t *testing.T) { + RecordError(ErrorTypeMQTT) + RecordError(ErrorTypeStorage) + RecordError(ErrorTypeTask) + RecordError(ErrorTypeProplet) +} + +//nolint:paralleltest +func TestObserveWasmExecution(t *testing.T) { + ObserveWasmExecution("test-task", 1.5) +} + +//nolint:paralleltest +func TestRecordMQTT(t *testing.T) { + RecordMQTTReceived(TopicTypeControl) + RecordMQTTPublished(TopicTypeResults) +} + +//nolint:paralleltest +func TestRecordFL(t *testing.T) { + RecordFLRound() + RecordFLUpdate() +} + +//nolint:paralleltest +func TestProxyMetrics(t *testing.T) { + RecordContainerFetch() + RecordContainerFetchError() + RecordChunkSent() + RecordActiveFetchStart() + RecordActiveFetchEnd() + ObserveFetchDuration(2.5) + RecordBytesTransferred(1024.0) +} + +//nolint:paralleltest +func TestConstants(t *testing.T) { + // Verify constants are defined + constants := []string{ + ErrorTypeMQTT, ErrorTypeStorage, ErrorTypeTask, ErrorTypeProplet, + ErrorTypeWorkflow, ErrorTypeJob, ErrorTypeFL, ErrorTypeValidation, + TaskStatusSuccess, TaskStatusFailed, TaskStatusTimeout, + TaskKindStandard, TaskKindFederated, + } + + for _, c := range constants { + if c == "" { + t.Error("empty constant found") + } + } +} diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go new file mode 100644 index 00000000..a7d06ce4 --- /dev/null +++ b/pkg/observability/tracing.go @@ -0,0 +1,64 @@ +package observability + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// StartSpan creates a new span with the given name and attributes. +func StartSpan(ctx context.Context, tracer trace.Tracer, name string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + //nolint:spancheck // Helper function deliberately returning the unended span directly to the caller. + return tracer.Start(ctx, name, trace.WithAttributes(attrs...)) +} + +// SpanFromContext returns the current span from context. +func SpanFromContext(ctx context.Context) trace.Span { + return trace.SpanFromContext(ctx) +} + +// AddSpanAttributes adds attributes to the current span. +func AddSpanAttributes(ctx context.Context, attrs ...attribute.KeyValue) { + span := trace.SpanFromContext(ctx) + span.SetAttributes(attrs...) +} + +// RecordSpanError records an error on the current span. +func RecordSpanError(ctx context.Context, err error) { + span := trace.SpanFromContext(ctx) + span.RecordError(err) +} + +// Common attribute helpers for consistent span attributes. + +// TaskAttributes returns common task-related span attributes. +func TaskAttributes(taskID, taskName string) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(FieldTaskID, taskID), + attribute.String("task_name", taskName), + } +} + +// PropletAttributes returns common proplet-related span attributes. +func PropletAttributes(propletID string) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(FieldPropletID, propletID), + } +} + +// JobAttributes returns common job-related span attributes. +func JobAttributes(jobID, jobName string) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.String(FieldJobID, jobID), + attribute.String("job_name", jobName), + } +} + +// PaginationAttributes returns pagination-related span attributes. +func PaginationAttributes(offset, limit uint64) []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.Int64(FieldOffset, int64(offset)), + attribute.Int64(FieldLimit, int64(limit)), + } +} diff --git a/proplet/Cargo.lock b/proplet/Cargo.lock index 3ea44bc1..4be40a30 100644 --- a/proplet/Cargo.lock +++ b/proplet/Cargo.lock @@ -365,8 +365,8 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", "itoa", "matchit", @@ -375,7 +375,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "serde_core", - "sync_wrapper", + "sync_wrapper 1.0.2", "tower", "tower-layer", "tower-service", @@ -389,12 +389,12 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", - "sync_wrapper", + "sync_wrapper 1.0.2", "tower-layer", "tower-service", ] @@ -1193,6 +1193,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82b8f8f868b36967f9606790d1903570de9ceaf870a7bf9fbbd3016d636a2cb2" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-deque" version = "0.8.6" @@ -2137,6 +2146,25 @@ dependencies = [ "subtle", ] +[[package]] +name = "h2" +version = "0.3.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0beca50380b1fc32983fc1cb4587bfa4bb9e78fc259aad4a0032d2080309222d" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http 0.2.12", + "indexmap 2.13.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "h2" version = "0.4.13" @@ -2148,7 +2176,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.4.0", "indexmap 2.13.0", "slab", "tokio", @@ -2207,6 +2235,30 @@ dependencies = [ "hashbrown 0.15.5", ] +[[package]] +name = "headers" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" +dependencies = [ + "base64 0.21.7", + "bytes", + "headers-core", + "http 0.2.12", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http 0.2.12", +] + [[package]] name = "heck" version = "0.5.0" @@ -2267,6 +2319,17 @@ dependencies = [ "digest", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http" version = "1.4.0" @@ -2286,6 +2349,17 @@ dependencies = [ "memchr", ] +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", +] + [[package]] name = "http-body" version = "1.0.1" @@ -2293,7 +2367,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.4.0", ] [[package]] @@ -2304,8 +2378,8 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2330,6 +2404,30 @@ dependencies = [ "typenum", ] +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.8.1" @@ -2340,9 +2438,9 @@ dependencies = [ "bytes", "futures-channel", "futures-core", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "httparse", "httpdate", "itoa", @@ -2359,8 +2457,8 @@ version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.4.0", + "hyper 1.8.1", "hyper-util", "rustls 0.23.37", "rustls-native-certs", @@ -2377,7 +2475,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.8.1", "hyper-util", "pin-project-lite", "tokio", @@ -2392,7 +2490,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "native-tls", "tokio", @@ -2410,15 +2508,15 @@ dependencies = [ "bytes", "futures-channel", "futures-util", - "http", - "http-body", - "hyper", + "http 1.4.0", + "http-body 1.0.1", + "hyper 1.8.1", "ipnet", "libc", "percent-encoding", "pin-project-lite", "socket2 0.6.3", - "system-configuration", + "system-configuration 0.7.0", "tokio", "tower-service", "tracing", @@ -2660,6 +2758,12 @@ dependencies = [ "hybrid-array", ] +[[package]] +name = "integer-encoding" +version = "3.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bb03732005da905c88227371639bf1ad885cc712789c011c31c5fb3ab3ccf02" + [[package]] name = "io-extras" version = "0.18.4" @@ -2925,7 +3029,7 @@ dependencies = [ "jwt-simple", "kbs-types", "log", - "reqwest", + "reqwest 0.12.28", "resource_uri", "serde", "serde_json", @@ -3311,9 +3415,9 @@ dependencies = [ "base64 0.22.1", "chrono", "getrandom 0.2.17", - "http", + "http 1.4.0", "rand 0.8.5", - "reqwest", + "reqwest 0.12.28", "serde", "serde_json", "serde_path_to_error", @@ -3362,14 +3466,14 @@ dependencies = [ "bytes", "chrono", "futures-util", - "http", + "http 1.4.0", "http-auth", "jwt", "lazy_static", "oci-spec 0.8.4", "olpc-cjson", "regex", - "reqwest", + "reqwest 0.12.28", "serde", "serde_json", "sha2", @@ -3476,7 +3580,7 @@ dependencies = [ "dyn-clone", "ed25519-dalek", "hmac", - "http", + "http 1.4.0", "itertools 0.10.5", "log", "oauth2", @@ -3550,6 +3654,86 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" +dependencies = [ + "futures-core", + "futures-sink", + "indexmap 2.13.0", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror 1.0.69", + "urlencoding", +] + +[[package]] +name = "opentelemetry-http" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f51189ce8be654f9b5f7e70e49967ed894e84a06fc35c6c042e64ac1fc5399e" +dependencies = [ + "async-trait", + "bytes", + "http 0.2.12", + "opentelemetry", + "reqwest 0.11.27", +] + +[[package]] +name = "opentelemetry-jaeger" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e617c66fd588e40e0dbbd66932fdc87393095b125d4459b1a3a10feb1712f8a1" +dependencies = [ + "async-trait", + "futures-core", + "futures-util", + "headers", + "http 0.2.12", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "reqwest 0.11.27", + "thrift", + "tokio", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float 4.6.0", + "percent-encoding", + "rand 0.8.5", + "thiserror 1.0.69", + "tokio", + "tokio-stream", +] + [[package]] name = "ordered-float" version = "2.10.1" @@ -3559,6 +3743,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7bb71e1b3fa6ca1c61f383464aaf2bb0e2f8e772a1f01d486832464de363b951" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.7.3" @@ -3976,6 +4169,21 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "memchr", + "parking_lot", + "protobuf", + "thiserror 1.0.69", +] + [[package]] name = "proplet" version = "0.4.0" @@ -3989,14 +4197,19 @@ dependencies = [ "elastic-tee-hal", "futures-util", "hex", - "hyper", + "hyper 1.8.1", "hyper-util", "image-rs", "kbs_protocol", + "lazy_static", "libc", "oci-client", "oci-spec 0.9.0", - "reqwest", + "opentelemetry", + "opentelemetry-jaeger", + "opentelemetry_sdk", + "prometheus", + "reqwest 0.12.28", "resource_uri", "rumqttc", "serde", @@ -4007,6 +4220,7 @@ dependencies = [ "tokio", "toml", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "url", "uuid", @@ -4040,6 +4254,12 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "protobuf" +version = "2.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" + [[package]] name = "protos" version = "0.1.0" @@ -4106,7 +4326,7 @@ dependencies = [ "thiserror 2.0.18", "tokio", "tracing", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -4127,7 +4347,7 @@ dependencies = [ "thiserror 2.0.18", "tinyvec", "tracing", - "web-time", + "web-time 1.1.0", ] [[package]] @@ -4345,6 +4565,42 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2 0.3.27", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.32", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration 0.5.1", + "tokio", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "reqwest" version = "0.12.28" @@ -4359,11 +4615,11 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-rustls", "hyper-tls", "hyper-util", @@ -4381,7 +4637,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-native-tls", "tokio-rustls 0.26.4", @@ -4618,7 +4874,7 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be040f8b0a225e40375822a563fa9524378b9d63112f53e19ffff34df5d33fdd" dependencies = [ - "web-time", + "web-time 1.1.0", "zeroize", ] @@ -4801,7 +5057,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f3a1a3341211875ef120e117ea7fd5228530ae7e7036a779fdc9117be6b3282c" dependencies = [ - "ordered-float", + "ordered-float 2.10.1", "serde", ] @@ -5008,7 +5264,7 @@ dependencies = [ "pkcs8", "rand 0.8.5", "regex", - "reqwest", + "reqwest 0.12.28", "rsa", "rustls-pki-types", "rustls-webpki 0.103.9", @@ -5209,6 +5465,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "sync_wrapper" version = "1.0.2" @@ -5243,6 +5505,17 @@ dependencies = [ "windows", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation 0.9.4", + "system-configuration-sys 0.5.0", +] + [[package]] name = "system-configuration" version = "0.7.0" @@ -5251,7 +5524,17 @@ checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" dependencies = [ "bitflags 2.11.0", "core-foundation 0.9.4", - "system-configuration-sys", + "system-configuration-sys 0.6.0", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", ] [[package]] @@ -5357,6 +5640,28 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + +[[package]] +name = "thrift" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e54bc85fc7faa8bc175c4bab5b92ba8d9a3ce893d0e9f42cc455c8ab16a9e09" +dependencies = [ + "byteorder", + "integer-encoding", + "log", + "ordered-float 2.10.1", + "threadpool", +] + [[package]] name = "time" version = "0.3.47" @@ -5575,17 +5880,17 @@ dependencies = [ "axum", "base64 0.22.1", "bytes", - "h2", - "http", - "http-body", + "h2 0.4.13", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-timeout", "hyper-util", "percent-encoding", "pin-project", "socket2 0.6.3", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-stream", "tower", @@ -5616,7 +5921,7 @@ dependencies = [ "indexmap 2.13.0", "pin-project-lite", "slab", - "sync_wrapper", + "sync_wrapper 1.0.2", "tokio", "tokio-util", "tower-layer", @@ -5633,8 +5938,8 @@ dependencies = [ "bitflags 2.11.0", "bytes", "futures-util", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower", @@ -5698,6 +6003,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time 0.2.4", +] + [[package]] name = "tracing-subscriber" version = "0.3.22" @@ -5820,6 +6143,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8_iter" version = "1.0.4" @@ -6426,10 +6755,10 @@ dependencies = [ "async-trait", "bytes", "futures", - "http", - "http-body", + "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.8.1", "rustls 0.22.4", "tokio", "tokio-rustls 0.25.0", @@ -6494,6 +6823,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa30049b1c872b72c89866d458eae9f20380ab280ffd1b1e18df2d3e2d98cfe0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "web-time" version = "1.1.0" @@ -6724,6 +7063,15 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -6760,6 +7108,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -6802,6 +7165,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -6814,6 +7183,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -6826,6 +7201,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -6850,6 +7231,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -6862,6 +7249,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -6874,6 +7267,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -6886,6 +7285,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -6907,6 +7312,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "winx" version = "0.36.4" @@ -6927,9 +7342,9 @@ dependencies = [ "base64 0.22.1", "deadpool", "futures", - "http", + "http 1.4.0", "http-body-util", - "hyper", + "hyper 1.8.1", "hyper-util", "log", "once_cell", diff --git a/proplet/Cargo.toml b/proplet/Cargo.toml index f6f6eeae..1bf20382 100644 --- a/proplet/Cargo.toml +++ b/proplet/Cargo.toml @@ -9,7 +9,11 @@ rumqttc = "0.25.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" toml = "0.9.8" -wasmtime = { version = "42.0.1", features = ["component-model", "async", "wave"] } +wasmtime = { version = "42.0.1", features = [ + "component-model", + "async", + "wave", +] } wasmtime-wasi = "42.0.1" wasmtime-wasi-http = "42.0.1" hyper = { version = "1.7", features = ["full"] } @@ -29,18 +33,49 @@ libc = "0.2" socket2 = { version = "0.5", features = ["all"] } reqwest = { version = "0.12", features = ["json"] } +# Observability - Prometheus metrics +prometheus = "0.13" +lazy_static = "1.4" + +# Observability - OpenTelemetry (optional feature) +opentelemetry = { version = "0.21", optional = true } +opentelemetry_sdk = { version = "0.21", features = [ + "rt-tokio", +], optional = true } +opentelemetry-jaeger = { version = "0.20", features = [ + "rt-tokio", + "reqwest_collector_client", +], optional = true } +tracing-opentelemetry = { version = "0.22", optional = true } + # TEE & Encryption Support (default feature) -attestation-agent = { git = "https://github.com/rodneyosodo/guest-components", branch = "enable-wasm-workloads", default-features = false, features = ["rust-crypto", "kbs"] } -image-rs = { git = "https://github.com/rodneyosodo/guest-components", branch = "enable-wasm-workloads", default-features = false, features = ["encryption-ring", "keywrap-grpc", "oci-client-rustls", "signature-cosign-rustls", "futures"] } -kbs_protocol = { git = "https://github.com/rodneyosodo/guest-components", branch = "enable-wasm-workloads", features = ["background_check", "rust-crypto"], default-features = false } -oci-client = { version = "0.15", default-features = false, features = ["rustls-tls"] } +attestation-agent = { git = "https://github.com/rodneyosodo/guest-components", branch = "enable-wasm-workloads", default-features = false, features = [ + "rust-crypto", + "kbs", +] } +image-rs = { git = "https://github.com/rodneyosodo/guest-components", branch = "enable-wasm-workloads", default-features = false, features = [ + "encryption-ring", + "keywrap-grpc", + "oci-client-rustls", + "signature-cosign-rustls", + "futures", +] } +kbs_protocol = { git = "https://github.com/rodneyosodo/guest-components", branch = "enable-wasm-workloads", features = [ + "background_check", + "rust-crypto", +], default-features = false } +oci-client = { version = "0.15", default-features = false, features = [ + "rustls-tls", +] } oci-spec = { version = "0.9.0" } resource_uri = { git = "https://github.com/rodneyosodo/guest-components", branch = "enable-wasm-workloads" } hex = { version = "0.4" } futures-util = { version = "0.3" } # ELASTIC TEE HAL — hardware abstraction layer for TEE workloads -elastic-tee-hal = { git = "https://github.com/elasticproject-eu/wasmhal", default-features = false, features = ["amd-sev"] } +elastic-tee-hal = { git = "https://github.com/elasticproject-eu/wasmhal", default-features = false, features = [ + "amd-sev", +] } wasm-wave = "0.244.0" [dev-dependencies] @@ -49,6 +84,12 @@ tokio = { version = "1.42", features = ["full"] } [features] default = [] +otlp = [ + "opentelemetry", + "opentelemetry_sdk", + "opentelemetry-jaeger", + "tracing-opentelemetry", +] [profile.release] strip = "symbols" diff --git a/proplet/src/config.rs b/proplet/src/config.rs index 223b13d0..110d57f3 100644 --- a/proplet/src/config.rs +++ b/proplet/src/config.rs @@ -47,6 +47,9 @@ pub struct PropletConfig { pub http_enabled: bool, pub preopened_dirs: Vec, pub http_proxy_port: u16, + pub metrics_port: u16, + pub jaeger_endpoint: Option, + pub jaeger_enabled: bool, pub description: Option, pub tags: Vec, pub location: Option, @@ -82,6 +85,9 @@ impl Default for PropletConfig { http_enabled: false, preopened_dirs: Vec::new(), http_proxy_port: 8222, + metrics_port: 7072, + jaeger_endpoint: None, + jaeger_enabled: false, description: None, tags: Vec::new(), location: None, @@ -315,6 +321,22 @@ impl PropletConfig { } } + if let Ok(val) = env::var("PROPLET_METRICS_PORT") { + if let Ok(port) = val.parse() { + config.metrics_port = port; + } + } + + if let Ok(val) = env::var("JAEGER_ENDPOINT") { + if !val.is_empty() { + config.jaeger_endpoint = Some(val); + } + } + + if let Ok(val) = env::var("PROPELLER_TRACING_ENABLED") { + config.jaeger_enabled = val.to_lowercase() == "true" || val == "1"; + } + if let Ok(val) = env::var("PROPLET_DIRS") { config.preopened_dirs = val .split(':') diff --git a/proplet/src/main.rs b/proplet/src/main.rs index 12aa9bd2..313a4947 100644 --- a/proplet/src/main.rs +++ b/proplet/src/main.rs @@ -4,6 +4,7 @@ mod hal_linker; mod metrics; mod monitoring; mod mqtt; +mod observability; mod runtime; mod service; mod task_handler; @@ -18,11 +19,76 @@ use crate::runtime::wasmtime_runtime::WasmtimeRuntime; use crate::runtime::Runtime; use crate::service::PropletService; use anyhow::Result; +use hyper::server::conn::http1; +use hyper::service::service_fn; +use hyper::{body::Incoming, Method, Request, Response, StatusCode}; +use hyper_util::rt::TokioIo; +use std::convert::Infallible; +use std::net::SocketAddr; use std::sync::Arc; +use tokio::net::TcpListener; use tokio::sync::mpsc; -use tracing::{debug, info, Level}; +use tracing::{debug, error, info, Level}; use tracing_subscriber::FmtSubscriber; +/// HTTP handler for metrics and health endpoints +async fn metrics_handler(req: Request) -> Result, Infallible> { + match (req.method(), req.uri().path()) { + (&Method::GET, "/metrics") => { + let metrics = observability::encode_metrics().unwrap_or_default(); + Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "text/plain; charset=utf-8") + .body(metrics) + .unwrap()) + } + (&Method::GET, "/health") => Ok(Response::builder() + .status(StatusCode::OK) + .body("OK".to_string()) + .unwrap()), + _ => Ok(Response::builder() + .status(StatusCode::NOT_FOUND) + .body("Not Found".to_string()) + .unwrap()), + } +} + +/// Start the metrics HTTP server +async fn start_metrics_server(port: u16) { + let addr = SocketAddr::from(([0, 0, 0, 0], port)); + + let listener = match TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + error!("Failed to bind metrics server to {}: {}", addr, e); + return; + } + }; + + info!("Metrics server listening on http://{}/metrics", addr); + + loop { + let (stream, _) = match listener.accept().await { + Ok(conn) => conn, + Err(e) => { + error!("Failed to accept connection: {}", e); + continue; + } + }; + + let io = TokioIo::new(stream); + + tokio::spawn(async move { + if let Err(e) = http1::Builder::new() + .serve_connection(io, service_fn(metrics_handler)) + .await + { + debug!("Error serving connection: {}", e); + } + }); + } +} + #[tokio::main] async fn main() -> Result<()> { let config = @@ -37,15 +103,27 @@ async fn main() -> Result<()> { _ => Level::INFO, }; - let subscriber = FmtSubscriber::builder() - .with_max_level(log_level) - .with_target(false) - .with_thread_ids(false) - .with_file(false) - .with_line_number(false) - .finish(); - - tracing::subscriber::set_global_default(subscriber)?; + if config.jaeger_enabled { + let tracing_config = observability::tracing::TracingConfig { + jaeger_endpoint: config.jaeger_endpoint.clone(), + service_name: "proplet".to_string(), + }; + // Ensure RUST_LOG matches configured log level for EnvFilter in init_tracing + if std::env::var("RUST_LOG").is_err() { + std::env::set_var("RUST_LOG", &config.log_level); + } + observability::tracing::init_tracing(tracing_config)?; + } else { + let subscriber = FmtSubscriber::builder() + .with_max_level(log_level) + .with_target(false) + .with_thread_ids(false) + .with_file(false) + .with_line_number(false) + .finish(); + + tracing::subscriber::set_global_default(subscriber)?; + } debug!("Proplet configuration: {:?}", config); @@ -110,6 +188,12 @@ async fn main() -> Result<()> { Arc::new(PropletService::new(config.clone(), pubsub, runtime)) }; + // Start metrics HTTP server + let metrics_port = config.metrics_port; + tokio::spawn(async move { + start_metrics_server(metrics_port).await; + }); + let shutdown_handle = tokio::spawn(async move { tokio::signal::ctrl_c() .await diff --git a/proplet/src/observability/metrics.rs b/proplet/src/observability/metrics.rs new file mode 100644 index 00000000..0eb360bb --- /dev/null +++ b/proplet/src/observability/metrics.rs @@ -0,0 +1,185 @@ +use lazy_static::lazy_static; +use prometheus::{ + register_counter, register_counter_vec, register_gauge, register_histogram_vec, Counter, + CounterVec, Encoder, Gauge, HistogramVec, TextEncoder, +}; + +lazy_static! { + /// Total number of tasks executed successfully + pub static ref TASKS_EXECUTED_TOTAL: Counter = register_counter!( + "proplet_tasks_executed_total", + "Total number of tasks executed successfully" + ) + .unwrap(); + + /// Total number of tasks that failed + pub static ref TASKS_FAILED_TOTAL: Counter = register_counter!( + "proplet_tasks_failed_total", + "Total number of tasks that failed" + ) + .unwrap(); + + /// Number of currently active tasks + pub static ref ACTIVE_TASKS: Gauge = register_gauge!( + "proplet_active_tasks", + "Number of currently running tasks" + ) + .unwrap(); + + /// WASM execution duration in seconds + pub static ref WASM_EXECUTION_DURATION: HistogramVec = register_histogram_vec!( + "proplet_wasm_execution_duration_seconds", + "Duration of WASM task execution in seconds", + &["task_name"], + vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0, 60.0] + ) + .unwrap(); + + /// WASM memory usage in bytes + pub static ref WASM_MEMORY_BYTES: Gauge = register_gauge!( + "proplet_wasm_memory_bytes", + "Current WASM memory usage in bytes" + ) + .unwrap(); + + /// Total chunks received from registry + pub static ref CHUNKS_RECEIVED_TOTAL: Counter = register_counter!( + "proplet_chunks_received_total", + "Total number of chunks received from registry" + ) + .unwrap(); + + /// Total binaries assembled from chunks + pub static ref BINARIES_ASSEMBLED_TOTAL: Counter = register_counter!( + "proplet_binaries_assembled_total", + "Total number of WASM binaries assembled from chunks" + ) + .unwrap(); + + /// MQTT messages received by topic type + pub static ref MQTT_MESSAGES_RECEIVED: CounterVec = register_counter_vec!( + "proplet_mqtt_messages_received_total", + "Total MQTT messages received by topic type", + &["topic_type"] + ) + .unwrap(); + + /// MQTT messages published by topic type + pub static ref MQTT_MESSAGES_PUBLISHED: CounterVec = register_counter_vec!( + "proplet_mqtt_messages_published_total", + "Total MQTT messages published by topic type", + &["topic_type"] + ) + .unwrap(); + + /// Errors by type + pub static ref ERRORS_TOTAL: CounterVec = register_counter_vec!( + "proplet_errors_total", + "Total errors by type", + &["error_type"] + ) + .unwrap(); +} + +// Topic type constants for labeling +pub const TOPIC_CONTROL: &str = "control"; +pub const TOPIC_REGISTRY: &str = "registry"; +pub const TOPIC_RESULTS: &str = "results"; + +// Error type constants +pub const ERROR_DECODE: &str = "decode"; +pub const ERROR_RUNTIME: &str = "runtime"; +pub const ERROR_MQTT: &str = "mqtt"; +pub const ERROR_CHUNK: &str = "chunk"; + +/// Record a task starting +pub fn record_task_started() { + ACTIVE_TASKS.inc(); +} + +/// Record a task completing successfully +pub fn record_task_completed() { + ACTIVE_TASKS.dec(); + TASKS_EXECUTED_TOTAL.inc(); +} + +/// Record a task failing +pub fn record_task_failed() { + ACTIVE_TASKS.dec(); + TASKS_FAILED_TOTAL.inc(); +} + +/// Observe WASM execution duration +pub fn observe_wasm_execution(task_name: &str, duration_secs: f64) { + WASM_EXECUTION_DURATION + .with_label_values(&[task_name]) + .observe(duration_secs); +} + +/// Update WASM memory gauge +#[allow(dead_code)] +pub fn set_wasm_memory(bytes: f64) { + WASM_MEMORY_BYTES.set(bytes); +} + +/// Record chunk received +pub fn record_chunk_received() { + CHUNKS_RECEIVED_TOTAL.inc(); +} + +/// Record binary assembled +pub fn record_binary_assembled() { + BINARIES_ASSEMBLED_TOTAL.inc(); +} + +/// Record MQTT message received +pub fn record_mqtt_received(topic_type: &str) { + MQTT_MESSAGES_RECEIVED + .with_label_values(&[topic_type]) + .inc(); +} + +/// Record MQTT message published +pub fn record_mqtt_published(topic_type: &str) { + MQTT_MESSAGES_PUBLISHED + .with_label_values(&[topic_type]) + .inc(); +} + +/// Record an error +pub fn record_error(error_type: &str) { + ERRORS_TOTAL.with_label_values(&[error_type]).inc(); +} + +/// Encode metrics as Prometheus text format +pub fn encode_metrics() -> Result { + let encoder = TextEncoder::new(); + let metric_families = prometheus::gather(); + let mut buffer = Vec::new(); + encoder.encode(&metric_families, &mut buffer)?; + Ok(String::from_utf8(buffer).unwrap_or_default()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_task_metrics() { + record_task_started(); + record_task_completed(); + record_task_started(); + record_task_failed(); + } + + #[test] + fn test_wasm_execution() { + observe_wasm_execution("test_task", 1.5); + } + + #[test] + fn test_encode_metrics() { + let result = encode_metrics(); + assert!(result.is_ok()); + } +} diff --git a/proplet/src/observability/mod.rs b/proplet/src/observability/mod.rs new file mode 100644 index 00000000..9abbacec --- /dev/null +++ b/proplet/src/observability/mod.rs @@ -0,0 +1,5 @@ +pub mod metrics; +pub mod tracing; + +pub use metrics::*; +pub use tracing::*; diff --git a/proplet/src/observability/tracing.rs b/proplet/src/observability/tracing.rs new file mode 100644 index 00000000..685cdad3 --- /dev/null +++ b/proplet/src/observability/tracing.rs @@ -0,0 +1,99 @@ +//! OpenTelemetry tracing support for the proplet. +//! +//! This module provides OTLP tracing export when the `otlp` feature is enabled. + +use tracing::info; + +/// Tracing configuration. +#[derive(Debug, Clone)] +#[allow(dead_code)] +pub struct TracingConfig { + /// Jaeger endpoint (e.g., "http://localhost:14268/api/traces") + pub jaeger_endpoint: Option, + /// Service name for tracing + pub service_name: String, +} + +impl Default for TracingConfig { + fn default() -> Self { + Self { + jaeger_endpoint: None, + service_name: "proplet".to_string(), + } + } +} + +/// Initialize tracing with optional OTLP export. +/// +/// When the `otlp` feature is enabled and an endpoint is configured, +/// traces will be exported to the OpenTelemetry Collector. +#[cfg(feature = "otlp")] +pub fn init_tracing(config: TracingConfig) -> anyhow::Result<()> { + use opentelemetry_sdk::{runtime, trace as sdktrace, Resource}; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + if let Some(endpoint) = config.jaeger_endpoint { + let tracer = opentelemetry_jaeger::new_collector_pipeline() + .with_endpoint(&endpoint) + .with_reqwest() + .with_service_name(config.service_name.clone()) + .with_trace_config( + sdktrace::Config::default().with_resource(Resource::new(vec![ + opentelemetry::KeyValue::new("service.name", config.service_name.clone()), + ])), + ) + .install_batch(runtime::Tokio)?; + + let telemetry_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .with(telemetry_layer) + .init(); + + info!("Jaeger tracing initialized, endpoint: {}", endpoint); + } else { + // Standard tracing without Jaeger + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .init(); + + info!("Tracing initialized (Jaeger disabled)"); + } + + Ok(()) +} + +/// Initialize tracing without OTLP export (when feature is disabled). +#[cfg(not(feature = "otlp"))] +pub fn init_tracing(_config: TracingConfig) -> anyhow::Result<()> { + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; + + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + tracing_subscriber::registry() + .with(env_filter) + .with(tracing_subscriber::fmt::layer()) + .init(); + + info!("Tracing initialized (OTLP feature not enabled)"); + + Ok(()) +} + +/// Shutdown tracing and flush pending spans. +#[cfg(feature = "otlp")] +pub fn shutdown_tracing() { + opentelemetry::global::shutdown_tracer_provider(); + info!("OTLP tracing shutdown complete"); +} + +/// Shutdown tracing (no-op when OTLP is disabled). +#[cfg(not(feature = "otlp"))] +pub fn shutdown_tracing() { + // No-op when OTLP is disabled +} diff --git a/proplet/src/service.rs b/proplet/src/service.rs index 7e86a928..a709f5e7 100644 --- a/proplet/src/service.rs +++ b/proplet/src/service.rs @@ -2,6 +2,9 @@ use crate::config::PropletConfig; use crate::metrics::MetricsCollector; use crate::monitoring::{system::SystemMonitor, ProcessMonitor}; use crate::mqtt::{build_topic, MqttMessage, PubSub}; +use crate::observability::{ + self, ERROR_CHUNK, ERROR_DECODE, ERROR_RUNTIME, TOPIC_CONTROL, TOPIC_REGISTRY, TOPIC_RESULTS, +}; use crate::runtime::{Runtime, RuntimeContext, StartConfig}; use crate::types::*; use anyhow::{Context, Result}; @@ -411,10 +414,13 @@ impl PropletService { } if msg.topic.contains("control/manager/start") { + observability::record_mqtt_received(TOPIC_CONTROL); self.handle_start_command(msg).await } else if msg.topic.contains("control/manager/stop") { + observability::record_mqtt_received(TOPIC_CONTROL); self.handle_stop_command(msg).await } else if msg.topic.contains("registry/server") { + observability::record_mqtt_received(TOPIC_REGISTRY); self.handle_chunk(msg).await } else { debug!("Ignoring message from unknown topic: {}", msg.topic); @@ -425,6 +431,7 @@ impl PropletService { async fn handle_start_command(&self, msg: MqttMessage) -> Result<()> { let req: StartRequest = msg.decode().map_err(|e| { error!("Failed to decode start request: {}", e); + observability::record_error(ERROR_DECODE); if let Ok(payload_str) = String::from_utf8(msg.payload.clone()) { error!("Payload was: {}", payload_str); } @@ -464,6 +471,7 @@ impl PropletService { use std::collections::hash_map::Entry; if let Entry::Vacant(e) = tasks.entry(req.id.clone()) { e.insert(TaskState::Running); + observability::record_task_started(); } else { warn!( "Task {} is already running, ignoring duplicate start command", @@ -762,7 +770,10 @@ impl PropletService { // Update config.env with the latest env (including MODEL_DATA and DATASET_DATA) config.env = env.clone(); + let execution_start = std::time::Instant::now(); let result = runtime.start_app(ctx, config).await; + let execution_duration = execution_start.elapsed().as_secs_f64(); + observability::observe_wasm_execution(&task_name, execution_duration); if let Some(handle) = monitor_handle { let _ = handle.await; @@ -777,10 +788,13 @@ impl PropletService { "Task {} completed successfully. Result: {}", task_id, result_str ); + observability::record_task_completed(); (result_str, None) } Err(e) => { error!("Task {} failed: {:#}", task_id, e); + observability::record_task_failed(); + observability::record_error(ERROR_RUNTIME); (String::new(), Some(e.to_string())) } }; @@ -871,8 +885,10 @@ impl PropletService { if let Err(e) = pubsub.publish(&topic, &fl_result, qos).await { error!("Failed to publish FL result for task {}: {}", task_id, e); + observability::record_error(observability::ERROR_MQTT); } else { info!("Successfully published FL update for task {}", task_id); + observability::record_mqtt_published(TOPIC_RESULTS); } } else { let result_msg = ResultMessage { @@ -888,8 +904,10 @@ impl PropletService { if let Err(e) = pubsub.publish(&topic, &result_msg, qos).await { error!("Failed to publish result for task {}: {}", task_id, e); + observability::record_error(observability::ERROR_MQTT); } else { info!("Successfully published result for task {}", task_id); + observability::record_mqtt_published(TOPIC_RESULTS); } } @@ -914,7 +932,11 @@ impl PropletService { } async fn handle_chunk(&self, msg: MqttMessage) -> Result<()> { - let chunk: Chunk = msg.decode()?; + let chunk: Chunk = msg.decode().inspect_err(|_| { + observability::record_error(ERROR_DECODE); + })?; + + observability::record_chunk_received(); debug!( "Received chunk {}/{} for app '{}'", @@ -934,6 +956,7 @@ impl PropletService { "Chunk total_chunks mismatch for '{}': expected {}, got {}", chunk.app_name, state.total_chunks, chunk.total_chunks ); + observability::record_error(ERROR_CHUNK); return Err(anyhow::anyhow!( "Chunk total_chunks mismatch for '{}'", chunk.app_name @@ -1017,6 +1040,7 @@ impl PropletService { state.total_chunks ); + observability::record_binary_assembled(); assembly.remove(app_name); return Ok(Some(binary)); diff --git a/proxy/middleware/doc.go b/proxy/middleware/doc.go new file mode 100644 index 00000000..b8873e50 --- /dev/null +++ b/proxy/middleware/doc.go @@ -0,0 +1,3 @@ +// Package middleware provides cross-cutting concerns for the Proxy service. +// It includes logging, metrics, and tracing middleware that wrap the service interface. +package middleware diff --git a/proxy/service.go b/proxy/service.go index daffd2f2..8b55f5fc 100644 --- a/proxy/service.go +++ b/proxy/service.go @@ -5,8 +5,10 @@ import ( "fmt" "log/slog" "sync" + "time" pkgmqtt "github.com/absmach/propeller/pkg/mqtt" + "github.com/absmach/propeller/pkg/observability" "github.com/absmach/propeller/pkg/proplet" ) @@ -77,11 +79,18 @@ func (s *ProxyService) StreamHTTP(ctx context.Context) error { s.mu.Unlock() go func(name string) { + startTime := time.Now() + observability.RecordContainerFetch() + observability.RecordActiveFetchStart() + defer func() { s.mu.Lock() delete(s.fetching, name) s.activeFetches-- s.mu.Unlock() + + observability.RecordActiveFetchEnd() + observability.ObserveFetchDuration(time.Since(startTime).Seconds()) }() s.logger.Info("fetching container from registry", @@ -92,6 +101,7 @@ func (s *ProxyService) StreamHTTP(ctx context.Context) error { s.logger.Error("failed to fetch container", slog.String("container", name), slog.Any("error", err)) + observability.RecordContainerFetchError() return } @@ -100,6 +110,13 @@ func (s *ProxyService) StreamHTTP(ctx context.Context) error { slog.String("container", name), slog.Int("total_chunks", len(chunks))) + // Calculate total bytes for metrics + var totalBytes int + for _, chunk := range chunks { + totalBytes += len(chunk.Data) + } + observability.RecordBytesTransferred(float64(totalBytes)) + for _, chunk := range chunks { select { case s.dataChan <- chunk: @@ -133,6 +150,9 @@ func (s *ProxyService) StreamMQTT(ctx context.Context) error { continue } + // Record chunk sent metric + observability.RecordChunkSent() + containerChunks[chunk.AppName]++ if containerChunks[chunk.AppName] == chunk.TotalChunks {