Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func main() {
}()
tp = sdktp
}

tracer := tp.Tracer(svcName)

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.ClientID, cfg.ClientID, cfg.ClientKey, cfg.DomainID, cfg.ChannelID, cfg.MQTTTimeout, logger)
Expand Down
65 changes: 61 additions & 4 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,28 @@
"fmt"
"log"
"log/slog"
"net/http"
"net/url"
"os"
"os/signal"
"syscall"
"time"

Check failure on line 15 in cmd/proxy/main.go

View workflow job for this annotation

GitHub Actions / Go Lint and Build

File is not properly formatted (gci)
"github.com/absmach/propeller"
"github.com/absmach/propeller/pkg/mqtt"
"github.com/absmach/propeller/proxy"
"github.com/absmach/magistrala/pkg/jaeger"
"github.com/caarlos0/env/v11"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"golang.org/x/sync/errgroup"
)

const (
svcName = "proxy"
configPath = "config.toml"
svcName = "proxy"
configPath = "config.toml"
defMetricsPort = "7071"
)

type config struct {
Expand All @@ -37,11 +46,13 @@
Username string `env:"PROXY_REGISTRY_USERNAME" envDefault:""`
Password string `env:"PROXY_REGISTRY_PASSWORD" envDefault:""`
RegistryURL string `env:"PROXY_REGISTRY_URL,notEmpty"`
// Observability
MetricsPort string `env:"PROXY_METRICS_PORT" envDefault:"7071"`
OTELURL url.URL `env:"PROXY_OTEL_URL"`
TraceRatio float64 `env:"PROXY_TRACE_RATIO" envDefault:"0"`
}

func main() {
g, ctx := errgroup.WithContext(context.Background())

cfg := config{}
if err := env.Parse(&cfg); err != nil {
log.Fatalf("failed to load configuration : %s", err.Error())
Expand Down Expand Up @@ -72,12 +83,37 @@
if err := level.UnmarshalText([]byte(cfg.LogLevel)); err != nil {
log.Fatalf("failed to parse log level: %s", err.Error())
}

logHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: level,
})
logger := slog.New(logHandler)
slog.SetDefault(logger)

ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGTERM, syscall.SIGINT)
defer stop()
g, ctx := errgroup.WithContext(ctx)

var tp trace.TracerProvider
switch cfg.OTELURL {
case url.URL{}:
tp = noop.NewTracerProvider()
default:
sdktp, err := jaeger.NewProvider(ctx, svcName, cfg.OTELURL, "", cfg.TraceRatio)
if err != nil {
logger.Error("failed to initialize jaeger tracing", slog.String("error", err.Error()))
//nolint:gocritic
os.Exit(1)
}
defer func() {
if err := sdktp.Shutdown(ctx); err != nil {
logger.Error("error shutting down tracer provider", slog.Any("error", err))
}
}()
tp = sdktp
}
_ = tp

mqttPubSub, err := mqtt.NewPubSub(cfg.MQTTAddress, cfg.MQTTQoS, cfg.ClientID, cfg.ClientID, cfg.ClientKey, cfg.DomainID, cfg.ChannelID, cfg.MQTTTimeout, logger)
if err != nil {
logger.Error("failed to initialize mqtt client", slog.Any("error", err))
Expand Down Expand Up @@ -118,6 +154,27 @@

slog.Info("successfully subscribed to topic")

// Start Prometheus metrics server
metricsPort := cfg.MetricsPort
if metricsPort == "" {
metricsPort = defMetricsPort
}
g.Go(func() error {
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.Handler())
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("OK"))
})
server := &http.Server{
Addr: ":" + metricsPort,
Handler: mux,
}
logger.Info("starting metrics server", slog.String("port", metricsPort))

return server.ListenAndServe()
})

g.Go(func() error {
return service.StreamHTTP(ctx)
})
Expand Down
138 changes: 138 additions & 0 deletions deploy/otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# OpenTelemetry Collector Configuration for Propeller
# This configuration receives telemetry data from Propeller services and exports
# it to various backends for visualization and alerting.
#
# Usage:
# docker run --rm -p 4317:4317 -p 4318:4318 -p 8889:8889 \
# -v $(pwd)/deploy/otel-collector-config.yaml:/etc/otelcol/config.yaml \
# otel/opentelemetry-collector-contrib:latest

receivers:
# OTLP receiver for traces, metrics, and logs from Propeller services
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318

# Prometheus receiver to scrape metrics from services
prometheus:
config:
scrape_configs:
- job_name: 'propeller-manager'
scrape_interval: 15s
static_configs:
- targets: ['manager:7070']
metrics_path: '/metrics'

- job_name: 'propeller-proxy'
scrape_interval: 15s
static_configs:
- targets: ['proxy:7071']
metrics_path: '/metrics'

processors:
# Batch processor for efficient export
batch:
timeout: 1s
send_batch_size: 1024
send_batch_max_size: 2048

# Memory limiter to prevent OOM
memory_limiter:
check_interval: 1s
limit_mib: 512
spike_limit_mib: 128

# Resource processor to add common attributes
resource:
attributes:
- key: deployment.environment
value: ${PROPELLER_ENV:-development}
action: upsert
- key: service.namespace
value: propeller
action: upsert

# Attributes processor for filtering sensitive data
attributes:
actions:
- key: client_key
action: delete
- key: password
action: delete
- key: token
action: delete

exporters:
# Debug exporter for local development
debug:
verbosity: detailed
sampling_initial: 5
sampling_thereafter: 200

# Prometheus exporter for metrics (scraped by Prometheus/Grafana)
prometheus:
endpoint: "0.0.0.0:8889"
namespace: propeller
const_labels:
collector: otel

# OTLP exporter for traces (to Jaeger, Tempo, etc.)
otlp/traces:
endpoint: ${JAEGER_ENDPOINT:-jaeger:4317}
tls:
insecure: true

# Loki exporter for logs (optional)
# Uncomment if using Grafana Loki
# loki:
# endpoint: ${LOKI_ENDPOINT:-http://loki:3100/loki/api/v1/push}
# labels:
# attributes:
# service.name: "service_name"
# service.namespace: "namespace"

extensions:
# Health check extension
health_check:
endpoint: 0.0.0.0:13133

# Performance profiler (pprof)
pprof:
endpoint: 0.0.0.0:1777

# zPages for debugging
zpages:
endpoint: 0.0.0.0:55679

service:
extensions: [health_check, pprof, zpages]

pipelines:
# Traces pipeline
traces:
receivers: [otlp]
processors: [memory_limiter, batch, resource, attributes]
exporters: [otlp/traces, debug]

# Metrics pipeline
metrics:
receivers: [otlp, prometheus]
processors: [memory_limiter, batch, resource]
exporters: [prometheus, debug]

# Logs pipeline
logs:
receivers: [otlp]
processors: [memory_limiter, batch, resource, attributes]
exporters: [debug]
# Uncomment to enable Loki export:
# exporters: [loki, debug]

telemetry:
logs:
level: info
metrics:
address: 0.0.0.0:8888
90 changes: 90 additions & 0 deletions docker/compose.observability.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Observability Stack for Propeller
# This compose file extends the main compose.yaml with observability services.
#
# Usage:
# docker-compose -f compose.yaml -f compose.observability.yaml up -d
#
# Or with make:
# make start-observability

name: "propeller-observability"

networks:
supermq-base-net:
external: true

services:
otel-collector:
image: docker.io/otel/opentelemetry-collector-contrib:0.96.0
container_name: propeller-otel-collector
restart: unless-stopped
command: ["--config=/etc/otelcol/config.yaml"]
networks:
- supermq-base-net
ports:
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
- "8889:8889" # Prometheus exporter
- "13133:13133" # Health check
- "55679:55679" # zPages
volumes:
- ../deploy/otel-collector-config.yaml:/etc/otelcol/config.yaml:ro
environment:
- PROPELLER_ENV=${PROPELLER_ENV:-development}
- JAEGER_ENDPOINT=jaeger:4317
depends_on:
- jaeger
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:13133/"]
interval: 30s
timeout: 10s
retries: 3

# Prometheus - Metrics collection and storage
prometheus:
image: docker.io/prom/prometheus:v3.3.0
container_name: propeller-prometheus
restart: unless-stopped
networks:
- supermq-base-net
ports:
- "9090:9090"
volumes:
- ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.path=/prometheus'
- '--storage.tsdb.retention.time=15d'
- '--web.enable-lifecycle'
- '--web.enable-admin-api'
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:9090/-/healthy"]
interval: 30s
timeout: 10s
retries: 3

# Grafana - Visualization and dashboards
grafana:
image: docker.io/grafana/grafana:12.0.1
container_name: propeller-grafana
restart: unless-stopped
networks:
- supermq-base-net
ports:
- "3000:3000"
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning:ro
environment:
- GF_SECURITY_ADMIN_USER=${GRAFANA_ADMIN_USER:-admin}
- GF_SECURITY_ADMIN_PASSWORD=${GRAFANA_ADMIN_PASSWORD:-admin}
- GF_USERS_ALLOW_SIGN_UP=false
- GF_FEATURE_TOGGLES_ENABLE=traceqlEditor
depends_on:
- prometheus
- jaeger

volumes:
prometheus-data:
grafana-data:
26 changes: 26 additions & 0 deletions docker/grafana/provisioning/datasources/datasources.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Grafana Data Sources for Propeller Observability
apiVersion: 1

datasources:
# Prometheus for metrics
- name: Prometheus
type: prometheus
access: proxy
url: http://prometheus:9090
isDefault: true
editable: true
jsonData:
timeInterval: "15s"
httpMethod: POST

# Jaeger for traces
- name: Jaeger
type: jaeger
access: proxy
url: http://jaeger:16686
editable: true
jsonData:
tracesToLogs:
datasourceUid: prometheus
filterByTraceID: true
filterBySpanID: false
Loading
Loading