diff --git a/.env.example b/.env.example
index 1b0b61d..ae00235 100644
--- a/.env.example
+++ b/.env.example
@@ -38,6 +38,12 @@
# Observability (optional)
# AGENT_VAULT_LOG_LEVEL=info # info (default) | debug — debug emits one line per proxied request (no secret values)
+# Request-log retention (optional) — controls the per-vault audit log of proxied requests.
+# Bodies and query strings are never stored; only method/host/path/status/latency metadata.
+# AGENT_VAULT_LOGS_MAX_AGE_HOURS=168 # Drop rows older than this (default 168 = 7 days).
+# AGENT_VAULT_LOGS_MAX_ROWS_PER_VAULT=10000 # Keep at most this many rows per vault (default 10000).
+# AGENT_VAULT_LOGS_RETENTION_LOCK=false # When true, ignore any owner UI overrides (operator pin).
+
# Rate limiting (optional) — tiered limits with sensible defaults.
# Profile: default | strict (≈0.5×) | loose (≈2×) | off (disable all limits).
# AGENT_VAULT_RATELIMIT_PROFILE=default
diff --git a/README.md b/README.md
index f934697..61c3c31 100644
--- a/README.md
+++ b/README.md
@@ -24,6 +24,7 @@ Agent Vault takes a different approach: **Agent Vault never reveals vault-stored
- **Self-service access** - Agents discover available services at runtime and [propose access](https://docs.agent-vault.dev/learn/proposals) for anything missing. You review and approve in your browser with one click. Any service can be toggled on/off without losing its configuration — disabled services return `403 service_disabled` until re-enabled.
- **Encrypted at rest** - Credentials are encrypted with AES-256-GCM using a random data encryption key (DEK). An optional master password wraps the DEK via Argon2id — change the password without re-encrypting credentials. Passwordless mode available for PaaS deploys. [Learn more](https://docs.agent-vault.dev/learn/credentials)
- **Multi-user, multi-vault** - Role-based access control with instance and vault-level [permissions](https://docs.agent-vault.dev/learn/permissions). Invite teammates, scope agents to specific [vaults](https://docs.agent-vault.dev/learn/vaults), and audit everything.
+- **Request logs** - Every proxied request is persisted per vault with method, host, path, status, latency, and the credential key names involved — never bodies, headers, or query strings. View them in the **Logs** tab. Retention defaults to 7 days / 10k rows per vault and is owner-tunable.
diff --git a/cmd/server.go b/cmd/server.go
index 1ba1232..cc89c6d 100644
--- a/cmd/server.go
+++ b/cmd/server.go
@@ -23,6 +23,7 @@ import (
"github.com/Infisical/agent-vault/internal/notify"
"github.com/Infisical/agent-vault/internal/oauth"
"github.com/Infisical/agent-vault/internal/pidfile"
+ "github.com/Infisical/agent-vault/internal/requestlog"
"github.com/Infisical/agent-vault/internal/server"
"github.com/Infisical/agent-vault/internal/store"
"github.com/spf13/cobra"
@@ -145,6 +146,8 @@ var serverCmd = &cobra.Command{
oauthProviders := loadOAuthProviders(baseURL)
srv := server.New(addr, db, masterKey.Key(), notifier, initialized, baseURL, oauthProviders, logger)
srv.SetSkills(skillCLI, skillHTTP)
+ shutdownLogs := attachLogSink(srv, db, logger)
+ defer shutdownLogs()
if err := attachMITMIfEnabled(srv, host, mitmPort, masterKey.Key()); err != nil {
return err
}
@@ -178,11 +181,33 @@ func attachMITMIfEnabled(srv *server.Server, host string, mitmPort int, masterKe
BaseURL: srv.BaseURL(),
Logger: srv.Logger(),
RateLimit: srv.RateLimit(),
+ LogSink: srv.LogSink(),
},
))
return nil
}
+// attachLogSink wires the request-log pipeline: a SQLiteSink with async
+// batching feeds persistent storage, and a retention goroutine trims old
+// rows. Returns a shutdown function the caller runs after Start()
+// returns to flush pending records and stop retention.
+func attachLogSink(srv *server.Server, db *store.SQLiteStore, logger *slog.Logger) func() {
+ sink := requestlog.NewSQLiteSink(db, logger, requestlog.SQLiteSinkConfig{})
+ srv.AttachLogSink(sink)
+
+ retentionCtx, cancelRetention := context.WithCancel(context.Background())
+ go requestlog.RunRetention(retentionCtx, db, logger)
+
+ return func() {
+ cancelRetention()
+ flushCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := sink.Close(flushCtx); err != nil {
+ fmt.Fprintf(os.Stderr, "warning: request_log sink flush: %v\n", err)
+ }
+ }
+}
+
// promptOwnerSetup interactively creates the owner account.
// masterPassword is optional — if provided, the admin password is checked against it.
func promptOwnerSetup(cmd *cobra.Command, db *store.SQLiteStore, masterPassword []byte) error {
@@ -438,6 +463,8 @@ func runDetachedChild(host, addr string, mitmPort int, logger *slog.Logger) erro
oauthProviders := loadOAuthProviders(baseURL)
srv := server.New(addr, db, key, notifier, initialized, baseURL, oauthProviders, logger)
srv.SetSkills(skillCLI, skillHTTP)
+ shutdownLogs := attachLogSink(srv, db, logger)
+ defer shutdownLogs()
if err := attachMITMIfEnabled(srv, host, mitmPort, key); err != nil {
return err
}
diff --git a/cmd/skill_cli.md b/cmd/skill_cli.md
index 19150c1..737d722 100644
--- a/cmd/skill_cli.md
+++ b/cmd/skill_cli.md
@@ -142,6 +142,10 @@ Key fields (JSON mode):
**Check status:** `GET {AGENT_VAULT_ADDR}/v1/proposals/{id}` with `Authorization: Bearer {AGENT_VAULT_SESSION_TOKEN}` -- returns `pending`, `applied`, `rejected`, or `expired`
+## Request Logs
+
+Agent Vault keeps a per-vault audit log of proxied requests (method, host, path, status, latency -- never bodies or query strings). The CLI does not wrap this yet; fetch via the HTTP API: `GET {AGENT_VAULT_ADDR}/v1/vaults/{vault}/logs` with `Authorization: Bearer {AGENT_VAULT_SESSION_TOKEN}`. Requires vault `member` or `admin` role. See `skill_http.md` for query params.
+
## Building Code That Needs Credentials
When you are writing or modifying application code that requires secrets or API keys (e.g. `process.env.STRIPE_KEY`, `os.Getenv("DB_PASSWORD")`), use Agent Vault to ensure those credentials are tracked and available.
diff --git a/cmd/skill_http.md b/cmd/skill_http.md
index d1d44e9..79154eb 100644
--- a/cmd/skill_http.md
+++ b/cmd/skill_http.md
@@ -134,6 +134,17 @@ Key fields:
**List proposals:** `GET {AGENT_VAULT_ADDR}/v1/proposals?status=pending`
+## Request Logs
+
+Agent Vault persists a per-request audit log for each vault (method, host, path, status, latency, matched service, credential key names -- **never** bodies or query strings). Useful for debugging "did the request go through?" and inspecting traffic patterns. Requires vault `member` or `admin` role.
+
+```
+GET {AGENT_VAULT_ADDR}/v1/vaults/{vault}/logs
+Authorization: Bearer {AGENT_VAULT_SESSION_TOKEN}
+```
+
+Query params: `ingress` (`explicit`|`mitm`), `status_bucket` (`2xx`|`3xx`|`4xx`|`5xx`|`err`), `service`, `limit` (default 50, max 200), `before=` (page back), `after=` (tail forward for new rows). Response: `{ "logs": [...], "next_cursor": , "latest_id": }`.
+
## Building Code That Needs Credentials
When you are writing or modifying application code that requires secrets or API keys (e.g. `process.env.STRIPE_KEY`, `os.Getenv("DB_PASSWORD")`), use Agent Vault to ensure those credentials are tracked and available.
diff --git a/docs/reference/cli.mdx b/docs/reference/cli.mdx
index ebb5c47..13922cd 100644
--- a/docs/reference/cli.mdx
+++ b/docs/reference/cli.mdx
@@ -46,6 +46,9 @@ description: "Complete reference for all Agent Vault CLI commands."
| `AGENT_VAULT_RATELIMIT_PROFILE` | Rate-limit profile: `default`, `strict`, `loose`, or `off`. Affects anonymous auth, token-redeem, proxy, authenticated CRUD, and the global in-flight / RPS ceilings. |
| `AGENT_VAULT_RATELIMIT_LOCK` | When `true`, the rate-limit section in the Manage Instance UI is read-only and UI overrides are ignored. Use when you want limits pinned to env vars on PaaS. |
| `AGENT_VAULT_RATELIMIT__` | Fine-grained per-tier overrides. `TIER` ∈ `AUTH`, `PROXY`, `AUTHED`, `GLOBAL`. `KNOB` ∈ `RATE`, `BURST`, `WINDOW`, `MAX`, `CONCURRENCY`. Env-set knobs always beat UI overrides. |
+ | `AGENT_VAULT_LOGS_MAX_AGE_HOURS` | Retention ceiling for the per-vault request log. Default `168` (7 days). Rows older than this are trimmed by a background job every 15 minutes. Only non-secret metadata is stored. |
+ | `AGENT_VAULT_LOGS_MAX_ROWS_PER_VAULT` | Per-vault row cap for the request log. Default `10000`. Whichever limit (age or rows) fills first wins. Set `0` to disable the cap. |
+ | `AGENT_VAULT_LOGS_RETENTION_LOCK` | When `true`, owner-UI overrides for log retention are ignored and env values (or defaults) are pinned. |
diff --git a/docs/self-hosting/environment-variables.mdx b/docs/self-hosting/environment-variables.mdx
index eccfe2e..cf5784a 100644
--- a/docs/self-hosting/environment-variables.mdx
+++ b/docs/self-hosting/environment-variables.mdx
@@ -17,6 +17,9 @@ description: "All environment variables used to configure Agent Vault"
| `AGENT_VAULT_RATELIMIT_PROFILE` | `default` | Rate-limit profile: `default`, `strict` (≈0.5× the defaults), `loose` (≈2×), or `off` (disable all limits). Affects every tier — anonymous auth, token-redeem, proxy, authenticated CRUD, global in-flight. Owners can override per-tier in **Manage Instance → Settings → Rate Limiting** unless `AGENT_VAULT_RATELIMIT_LOCK=true`. |
| `AGENT_VAULT_RATELIMIT_LOCK` | `false` | When `true`, the rate-limit UI in **Manage Instance** is read-only and UI overrides are ignored. Use on PaaS deployments (Fly.io, Cloud Run) when the operator wants limits pinned to env vars. |
| `AGENT_VAULT_RATELIMIT__` | — | Fine-grained per-tier overrides. `TIER` is one of `AUTH` (unauthenticated endpoints), `PROXY` (proxy + MITM), `AUTHED` (everything behind requireAuth), `GLOBAL` (server-wide backstop). `KNOB` is one of `RATE` (tokens/sec), `BURST` (bucket depth), `WINDOW` (duration like `5m`), `MAX` (sliding-window event cap), `CONCURRENCY` (semaphore slots). Env-set knobs always take precedence over UI overrides. |
+| `AGENT_VAULT_LOGS_MAX_AGE_HOURS` | `168` | Retention for the per-vault request log (surfaced in **Vault → Logs**). Rows older than this many hours are trimmed by a background job every 15 minutes. Only secret-free metadata is stored (method, host, path, status, latency, matched service, credential key names) — never bodies or query strings. |
+| `AGENT_VAULT_LOGS_MAX_ROWS_PER_VAULT` | `10000` | Per-vault row cap for the request log. Whichever limit (age or rows) hits first wins, so heavy-traffic vaults retain a shorter window than the time-based TTL alone would suggest. Set to `0` to disable the row cap. |
+| `AGENT_VAULT_LOGS_RETENTION_LOCK` | `false` | When `true`, any owner-UI overrides for log retention are ignored and env values (or defaults) are pinned. Use when you want retention limits controlled only by the operator. |
Master password resolution order:
diff --git a/internal/brokercore/logging.go b/internal/brokercore/logging.go
index 7817f60..14dc63a 100644
--- a/internal/brokercore/logging.go
+++ b/internal/brokercore/logging.go
@@ -5,12 +5,27 @@ import (
"time"
)
+// Ingress labels identify which entrypoint handled a proxied request.
+// Persisted into request logs and filterable by the Logs UI, so a typo
+// at any call site would silently desync filters from the real data.
+const (
+ IngressExplicit = "explicit"
+ IngressMITM = "mitm"
+)
+
+// Actor types identify the principal behind a proxied request. Same
+// reason for constants as the ingress labels above.
+const (
+ ActorTypeUser = "user"
+ ActorTypeAgent = "agent"
+)
+
// ProxyEvent is the shape of a single structured per-request log line
// emitted by both the explicit /proxy/ handler and the transparent MITM
// forward handler. It is intentionally shallow and contains only
// non-secret metadata — no header values, no bodies, no query strings.
type ProxyEvent struct {
- Ingress string // "explicit" | "mitm"
+ Ingress string // one of IngressExplicit, IngressMITM
Method string // HTTP method from the agent request
Host string // target host (with port if present)
Path string // r.URL.Path only — no query, no fragment
diff --git a/internal/mitm/connect.go b/internal/mitm/connect.go
index bc594f1..2d5742c 100644
--- a/internal/mitm/connect.go
+++ b/internal/mitm/connect.go
@@ -23,14 +23,31 @@ func mitmConnectIPKey(r *http.Request) string {
return "mitm:" + host
}
+// isLoopbackPeer reports whether the HTTP request came from a loopback
+// peer (127.0.0.0/8 or ::1). Used to skip the CONNECT flood gate for
+// local `vault run` clients — a single agent legitimately opens dozens
+// of CONNECTs (one per distinct upstream host) on startup, and a
+// cooperating or higher-privilege local process can trivially DoS the
+// proxy by other means regardless, so rate-limiting loopback only
+// breaks legitimate agents without adding defense.
+func isLoopbackPeer(r *http.Request) bool {
+ host, _, err := net.SplitHostPort(r.RemoteAddr)
+ if err != nil || host == "" {
+ host = r.RemoteAddr
+ }
+ ip := net.ParseIP(host)
+ return ip != nil && ip.IsLoopback()
+}
+
// handleConnect terminates a CONNECT tunnel and serves HTTP/1.1 off the
// resulting TLS connection. The upstream target is taken from the
// CONNECT request line (r.Host) and captured in a closure so subsequent
// Host-header rewrites by the client cannot redirect the tunnel.
func (p *Proxy) handleConnect(w http.ResponseWriter, r *http.Request) {
// Gate before ParseProxyAuth + session lookup so a bad-auth flood
- // can't burn CPU. Per-IP on the raw TCP peer.
- if p.rateLimit != nil {
+ // can't burn CPU. Per-IP on the raw TCP peer, shared with the
+ // TierAuth budget. Loopback is exempt — see isLoopbackPeer.
+ if p.rateLimit != nil && !isLoopbackPeer(r) {
if d := p.rateLimit.Allow(ratelimit.TierAuth, mitmConnectIPKey(r)); !d.Allow {
ratelimit.WriteDenial(w, d, "Too many CONNECT attempts")
return
diff --git a/internal/mitm/connect_test.go b/internal/mitm/connect_test.go
new file mode 100644
index 0000000..ef6f6fe
--- /dev/null
+++ b/internal/mitm/connect_test.go
@@ -0,0 +1,29 @@
+package mitm
+
+import (
+ "net/http"
+ "testing"
+)
+
+func TestIsLoopbackPeer(t *testing.T) {
+ cases := []struct {
+ remote string
+ want bool
+ }{
+ {"127.0.0.1:54321", true},
+ {"127.1.2.3:54321", true},
+ {"[::1]:54321", true},
+ {"10.0.0.5:54321", false},
+ {"172.17.0.1:54321", false},
+ {"203.0.113.9:54321", false},
+ {"[2001:db8::1]:54321", false},
+ {"", false},
+ {"not-an-address", false},
+ }
+ for _, tc := range cases {
+ r := &http.Request{RemoteAddr: tc.remote}
+ if got := isLoopbackPeer(r); got != tc.want {
+ t.Errorf("isLoopbackPeer(%q) = %v, want %v", tc.remote, got, tc.want)
+ }
+ }
+}
diff --git a/internal/mitm/forward.go b/internal/mitm/forward.go
index 9daeb50..cba665e 100644
--- a/internal/mitm/forward.go
+++ b/internal/mitm/forward.go
@@ -9,8 +9,24 @@ import (
"github.com/Infisical/agent-vault/internal/brokercore"
"github.com/Infisical/agent-vault/internal/ratelimit"
+ "github.com/Infisical/agent-vault/internal/requestlog"
)
+// actorFromScope returns the (type, id) pair used in request log rows.
+// Empty strings when neither principal is set on the scope.
+func actorFromScope(scope *brokercore.ProxyScope) (string, string) {
+ if scope == nil {
+ return "", ""
+ }
+ if scope.UserID != "" {
+ return brokercore.ActorTypeUser, scope.UserID
+ }
+ if scope.AgentID != "" {
+ return brokercore.ActorTypeAgent, scope.AgentID
+ }
+ return "", ""
+}
+
// forwardHandler returns an http.Handler that forwards each request to
// target (the host:port captured from the original CONNECT line). Using
// a closed-over target rather than r.Host defeats post-tunnel host
@@ -20,13 +36,17 @@ func (p *Proxy) forwardHandler(target, host string, scope *brokercore.ProxyScope
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
event := brokercore.ProxyEvent{
- Ingress: "mitm",
+ Ingress: brokercore.IngressMITM,
Method: r.Method,
Host: target,
Path: r.URL.Path,
}
+ actorType, actorID := actorFromScope(scope)
emit := func(status int, errCode string) {
event.Emit(p.logger, start, status, errCode)
+ if p.logSink != nil {
+ p.logSink.Record(r.Context(), requestlog.FromEvent(event, scope.VaultID, actorType, actorID))
+ }
}
// Shares one budget with /proxy so switching ingress can't bypass.
diff --git a/internal/mitm/proxy.go b/internal/mitm/proxy.go
index 1759c0d..e74fd71 100644
--- a/internal/mitm/proxy.go
+++ b/internal/mitm/proxy.go
@@ -28,6 +28,7 @@ import (
"github.com/Infisical/agent-vault/internal/ca"
"github.com/Infisical/agent-vault/internal/netguard"
"github.com/Infisical/agent-vault/internal/ratelimit"
+ "github.com/Infisical/agent-vault/internal/requestlog"
)
// Proxy is a transparent MITM proxy. It is safe to start at most once;
@@ -43,6 +44,7 @@ type Proxy struct {
baseURL string // externally-reachable control-plane URL for help links
logger *slog.Logger
rateLimit *ratelimit.Registry // shared with the HTTP server; nil = no-op
+ logSink requestlog.Sink // never nil (Nop default); shared with HTTP ingress
}
// Options carries the dependencies a Proxy needs. BaseURL is the
@@ -58,6 +60,7 @@ type Options struct {
BaseURL string
Logger *slog.Logger
RateLimit *ratelimit.Registry
+ LogSink requestlog.Sink // nil → Nop
}
// New builds a Proxy bound to addr. The returned Proxy does not begin
@@ -73,6 +76,10 @@ func New(addr string, opts Options) *Proxy {
ResponseHeaderTimeout: 30 * time.Second,
}
+ sink := opts.LogSink
+ if sink == nil {
+ sink = requestlog.Nop{}
+ }
p := &Proxy{
ca: opts.CA,
sessions: opts.Sessions,
@@ -81,6 +88,7 @@ func New(addr string, opts Options) *Proxy {
baseURL: opts.BaseURL,
logger: opts.Logger,
rateLimit: opts.RateLimit,
+ logSink: sink,
}
p.tlsConfig = &tls.Config{
diff --git a/internal/requestlog/retention.go b/internal/requestlog/retention.go
new file mode 100644
index 0000000..b468319
--- /dev/null
+++ b/internal/requestlog/retention.go
@@ -0,0 +1,189 @@
+package requestlog
+
+import (
+ "context"
+ "database/sql"
+ "encoding/json"
+ "errors"
+ "log/slog"
+ "os"
+ "strconv"
+ "time"
+
+ "github.com/Infisical/agent-vault/internal/store"
+)
+
+// Defaults shipped with the built-in SQLite sink. Both limits apply;
+// whichever is hit first trims rows. Owner can tune via the
+// "logs_retention" instance setting or pin via env vars.
+const (
+ DefaultMaxAge = 7 * 24 * time.Hour
+ DefaultMaxRowsPerVault = 10_000
+ DefaultRetentionTick = 15 * time.Minute
+
+ // SettingKey is the instance_settings key holding the JSON payload.
+ SettingKey = "logs_retention"
+
+ envMaxAgeHours = "AGENT_VAULT_LOGS_MAX_AGE_HOURS"
+ envMaxRows = "AGENT_VAULT_LOGS_MAX_ROWS_PER_VAULT"
+ envRetentionLock = "AGENT_VAULT_LOGS_RETENTION_LOCK"
+)
+
+// RetentionConfig controls the background cleanup job. Zero-valued
+// limits disable that dimension (e.g. MaxAge == 0 skips time-based
+// trimming).
+type RetentionConfig struct {
+ MaxAge time.Duration
+ MaxRowsPerVault int64
+ Tick time.Duration
+}
+
+// retentionSettingPayload is the JSON blob persisted at SettingKey.
+type retentionSettingPayload struct {
+ MaxAgeHours *float64 `json:"max_age_hours,omitempty"`
+ MaxRowsPerVault *int64 `json:"max_rows_per_vault,omitempty"`
+}
+
+// retentionStore is the narrow store surface the retention loop needs.
+type retentionStore interface {
+ GetSetting(ctx context.Context, key string) (string, error)
+ DeleteOldRequestLogs(ctx context.Context, before time.Time) (int64, error)
+ TrimRequestLogsToCap(ctx context.Context, vaultID string, cap int64) (int64, error)
+ VaultIDsWithLogs(ctx context.Context) ([]string, error)
+}
+
+// ResolveRetention returns the effective RetentionConfig. Precedence:
+// env (when lock is set) > instance setting > env defaults > built-in
+// defaults. Absent setting is not an error.
+func ResolveRetention(ctx context.Context, s retentionStore) RetentionConfig {
+ cfg := RetentionConfig{
+ MaxAge: DefaultMaxAge,
+ MaxRowsPerVault: DefaultMaxRowsPerVault,
+ Tick: DefaultRetentionTick,
+ }
+ envCfg, envSet := loadRetentionFromEnv()
+ applyEnv := func(base *RetentionConfig) {
+ if envSet.age {
+ base.MaxAge = envCfg.MaxAge
+ }
+ if envSet.rows {
+ base.MaxRowsPerVault = envCfg.MaxRowsPerVault
+ }
+ }
+
+ if os.Getenv(envRetentionLock) == "true" {
+ applyEnv(&cfg)
+ return cfg
+ }
+
+ // Instance setting layers on top of env defaults; env-set values
+ // still win when present.
+ payload, present, _ := loadRetentionSetting(ctx, s)
+ if present {
+ if payload.MaxAgeHours != nil {
+ cfg.MaxAge = time.Duration(*payload.MaxAgeHours * float64(time.Hour))
+ }
+ if payload.MaxRowsPerVault != nil {
+ cfg.MaxRowsPerVault = *payload.MaxRowsPerVault
+ }
+ }
+ applyEnv(&cfg)
+ return cfg
+}
+
+type envRetentionMask struct {
+ age, rows bool
+}
+
+func loadRetentionFromEnv() (RetentionConfig, envRetentionMask) {
+ var (
+ cfg RetentionConfig
+ mask envRetentionMask
+ )
+ if raw := os.Getenv(envMaxAgeHours); raw != "" {
+ if v, err := strconv.ParseFloat(raw, 64); err == nil && v >= 0 {
+ cfg.MaxAge = time.Duration(v * float64(time.Hour))
+ mask.age = true
+ }
+ }
+ if raw := os.Getenv(envMaxRows); raw != "" {
+ if v, err := strconv.ParseInt(raw, 10, 64); err == nil && v >= 0 {
+ cfg.MaxRowsPerVault = v
+ mask.rows = true
+ }
+ }
+ return cfg, mask
+}
+
+func loadRetentionSetting(ctx context.Context, s retentionStore) (retentionSettingPayload, bool, error) {
+ raw, err := s.GetSetting(ctx, SettingKey)
+ if err != nil {
+ if errors.Is(err, sql.ErrNoRows) {
+ return retentionSettingPayload{}, false, nil
+ }
+ return retentionSettingPayload{}, false, err
+ }
+ if raw == "" {
+ return retentionSettingPayload{}, false, nil
+ }
+ var p retentionSettingPayload
+ if err := json.Unmarshal([]byte(raw), &p); err != nil {
+ return retentionSettingPayload{}, false, err
+ }
+ return p, true, nil
+}
+
+// RunRetention ticks on cfg.Tick and trims request_logs until ctx is
+// done. Blocks; callers typically run it in a goroutine.
+func RunRetention(ctx context.Context, s store.Store, logger *slog.Logger) {
+ cfg := ResolveRetention(ctx, s)
+ tick := cfg.Tick
+ if tick <= 0 {
+ tick = DefaultRetentionTick
+ }
+ ticker := time.NewTicker(tick)
+ defer ticker.Stop()
+
+ trimOnce(ctx, s, logger, cfg)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ cfg = ResolveRetention(ctx, s)
+ trimOnce(ctx, s, logger, cfg)
+ }
+ }
+}
+
+func trimOnce(ctx context.Context, s store.Store, logger *slog.Logger, cfg RetentionConfig) {
+ if cfg.MaxAge > 0 {
+ before := time.Now().Add(-cfg.MaxAge)
+ if n, err := s.DeleteOldRequestLogs(ctx, before); err != nil {
+ if logger != nil {
+ logger.Warn("request_logs ttl trim failed", "err", err.Error())
+ }
+ } else if n > 0 && logger != nil {
+ logger.Debug("request_logs ttl trimmed", "rows", n, "before", before)
+ }
+ }
+
+ if cfg.MaxRowsPerVault > 0 {
+ vaults, err := s.VaultIDsWithLogs(ctx)
+ if err != nil {
+ if logger != nil {
+ logger.Warn("request_logs cap list vaults failed", "err", err.Error())
+ }
+ return
+ }
+ for _, v := range vaults {
+ if n, err := s.TrimRequestLogsToCap(ctx, v, cfg.MaxRowsPerVault); err != nil {
+ if logger != nil {
+ logger.Warn("request_logs cap trim failed", "vault", v, "err", err.Error())
+ }
+ } else if n > 0 && logger != nil {
+ logger.Debug("request_logs cap trimmed", "vault", v, "rows", n)
+ }
+ }
+ }
+}
diff --git a/internal/requestlog/sink.go b/internal/requestlog/sink.go
new file mode 100644
index 0000000..9c9991f
--- /dev/null
+++ b/internal/requestlog/sink.go
@@ -0,0 +1,104 @@
+// Package requestlog persists per-request broker metadata (method, host,
+// path, status, latency) so operators can audit what traffic flowed
+// through a vault. Bodies, headers, and query strings are never
+// captured. A Sink is the pluggable output layer: the shipped
+// implementation writes to SQLite, but future sinks (HTTP webhooks,
+// stdout JSONL) satisfy the same interface without changes to the
+// proxy hot path.
+package requestlog
+
+import (
+ "context"
+
+ "github.com/Infisical/agent-vault/internal/brokercore"
+ "github.com/Infisical/agent-vault/internal/store"
+)
+
+// Record is the in-memory shape handed to sinks. It is a superset of
+// brokercore.ProxyEvent with the actor/vault context the handler knows
+// but brokercore does not. Adapters convert Record → store.RequestLog
+// at the SQLite boundary.
+type Record struct {
+ VaultID string
+ ActorType string
+ ActorID string
+ Ingress string
+ Method string
+ Host string
+ Path string
+ MatchedService string
+ CredentialKeys []string
+ Status int
+ LatencyMs int64
+ ErrorCode string
+}
+
+// Sink accepts records on the hot proxy path. Implementations MUST NOT
+// block meaningfully: Record runs inline with every proxied request.
+// Return is void by design — sinks are fire-and-forget; durability
+// guarantees live inside the implementation.
+type Sink interface {
+ Record(ctx context.Context, r Record)
+}
+
+// Nop is a Sink that discards records. Safe default when the server
+// is constructed without a configured sink (tests, tooling).
+type Nop struct{}
+
+// Record implements Sink.
+func (Nop) Record(context.Context, Record) {}
+
+// MultiSink fans each record out to every wrapped sink. Later sinks
+// (HTTP webhook, stdout JSONL) stack here without touching the proxy.
+type MultiSink []Sink
+
+// Record implements Sink by forwarding to each wrapped sink in order.
+func (m MultiSink) Record(ctx context.Context, r Record) {
+ for _, s := range m {
+ if s == nil {
+ continue
+ }
+ s.Record(ctx, r)
+ }
+}
+
+// FromEvent lifts a brokercore.ProxyEvent plus the actor/vault context
+// the ingress handler knows (but brokercore does not) into a Record.
+// Callers pass the terminal event — after ProxyEvent.Emit has filled in
+// Status, Err, and TotalMs.
+func FromEvent(ev brokercore.ProxyEvent, vaultID, actorType, actorID string) Record {
+ return Record{
+ VaultID: vaultID,
+ ActorType: actorType,
+ ActorID: actorID,
+ Ingress: ev.Ingress,
+ Method: ev.Method,
+ Host: ev.Host,
+ Path: ev.Path,
+ MatchedService: ev.MatchedService,
+ CredentialKeys: ev.CredentialKeys,
+ Status: ev.Status,
+ LatencyMs: ev.TotalMs,
+ ErrorCode: ev.Err,
+ }
+}
+
+// toStoreRow converts a Record to the persisted shape. Kept in the
+// package (not on store.RequestLog) so the store package stays free of
+// requestlog imports.
+func toStoreRow(r Record) store.RequestLog {
+ return store.RequestLog{
+ VaultID: r.VaultID,
+ ActorType: r.ActorType,
+ ActorID: r.ActorID,
+ Ingress: r.Ingress,
+ Method: r.Method,
+ Host: r.Host,
+ Path: r.Path,
+ MatchedService: r.MatchedService,
+ CredentialKeys: r.CredentialKeys,
+ Status: r.Status,
+ LatencyMs: r.LatencyMs,
+ ErrorCode: r.ErrorCode,
+ }
+}
diff --git a/internal/requestlog/sqlite_sink.go b/internal/requestlog/sqlite_sink.go
new file mode 100644
index 0000000..0c608e9
--- /dev/null
+++ b/internal/requestlog/sqlite_sink.go
@@ -0,0 +1,203 @@
+package requestlog
+
+import (
+ "context"
+ "log/slog"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/Infisical/agent-vault/internal/store"
+)
+
+// Defaults tuned for the near-real-time tailing UI: batches land inside
+// one ~3s UI poll interval while keeping SQLite writes coalesced under
+// bursty traffic. Operators can override via SQLiteSinkConfig.
+const (
+ defaultBufferSize = 1024
+ defaultBatchSize = 100
+ defaultFlushEvery = 250 * time.Millisecond
+ defaultShutdownWait = 3 * time.Second
+)
+
+// sqliteStore is the narrow surface SQLiteSink needs; lets tests
+// substitute a fake without standing up the full Store interface.
+type sqliteStore interface {
+ InsertRequestLogs(ctx context.Context, rows []store.RequestLog) error
+}
+
+// SQLiteSinkConfig controls the SQLiteSink's batching behavior. Zero
+// fields fall back to sensible defaults.
+type SQLiteSinkConfig struct {
+ BufferSize int
+ BatchSize int
+ FlushEvery time.Duration
+ ShutdownWait time.Duration
+
+ // OnCommit is invoked (if non-nil) after a batch is successfully
+ // inserted. Reserved for the future broadcaster that will feed an
+ // SSE endpoint — unused today. Callback runs on the worker
+ // goroutine, so implementations must not block.
+ OnCommit func(batch []Record)
+}
+
+// SQLiteSink buffers records in a bounded channel and flushes them to
+// SQLite in batches. Non-blocking on the hot path: if the buffer is
+// full, the record is dropped and the drop counter is incremented.
+type SQLiteSink struct {
+ store sqliteStore
+ logger *slog.Logger
+ cfg SQLiteSinkConfig
+ in chan Record
+ done chan struct{}
+ wg sync.WaitGroup
+ dropped atomic.Uint64
+}
+
+// NewSQLiteSink constructs a sink and starts its background worker.
+// Call Close to flush and stop.
+func NewSQLiteSink(s sqliteStore, logger *slog.Logger, cfg SQLiteSinkConfig) *SQLiteSink {
+ if cfg.BufferSize <= 0 {
+ cfg.BufferSize = defaultBufferSize
+ }
+ if cfg.BatchSize <= 0 {
+ cfg.BatchSize = defaultBatchSize
+ }
+ if cfg.FlushEvery <= 0 {
+ cfg.FlushEvery = defaultFlushEvery
+ }
+ if cfg.ShutdownWait <= 0 {
+ cfg.ShutdownWait = defaultShutdownWait
+ }
+ sk := &SQLiteSink{
+ store: s,
+ logger: logger,
+ cfg: cfg,
+ in: make(chan Record, cfg.BufferSize),
+ done: make(chan struct{}),
+ }
+ sk.wg.Add(1)
+ go sk.run()
+ return sk
+}
+
+// Record implements Sink. Non-blocking: drops if the buffer is full.
+func (s *SQLiteSink) Record(_ context.Context, r Record) {
+ select {
+ case s.in <- r:
+ default:
+ n := s.dropped.Add(1)
+ // Warn on the first drop of each power-of-two to surface
+ // overload without flooding the log under sustained pressure.
+ if s.logger != nil && isLogBoundary(n) {
+ s.logger.Warn("request_log buffer overflow: dropping records",
+ "total_dropped", n,
+ "buffer_size", s.cfg.BufferSize,
+ )
+ }
+ }
+}
+
+// Dropped returns the total records dropped due to buffer overflow
+// since construction. Exposed for metrics and tests.
+func (s *SQLiteSink) Dropped() uint64 { return s.dropped.Load() }
+
+// Close drains pending records and stops the worker. Honors the parent
+// context for its own deadline; falls back to ShutdownWait.
+func (s *SQLiteSink) Close(ctx context.Context) error {
+ close(s.done)
+
+ wait := s.cfg.ShutdownWait
+ waitCtx, cancel := context.WithTimeout(ctx, wait)
+ defer cancel()
+
+ doneCh := make(chan struct{})
+ go func() {
+ s.wg.Wait()
+ close(doneCh)
+ }()
+ select {
+ case <-doneCh:
+ return nil
+ case <-waitCtx.Done():
+ return waitCtx.Err()
+ }
+}
+
+func (s *SQLiteSink) run() {
+ defer s.wg.Done()
+
+ batch := make([]Record, 0, s.cfg.BatchSize)
+ ticker := time.NewTicker(s.cfg.FlushEvery)
+ defer ticker.Stop()
+
+ flush := func() {
+ if len(batch) == 0 {
+ return
+ }
+ s.commit(batch)
+ batch = batch[:0]
+ }
+
+ for {
+ select {
+ case <-s.done:
+ // Drain whatever is already buffered before exiting.
+ for {
+ select {
+ case r := <-s.in:
+ batch = append(batch, r)
+ if len(batch) >= s.cfg.BatchSize {
+ s.commit(batch)
+ batch = batch[:0]
+ }
+ default:
+ flush()
+ return
+ }
+ }
+ case <-ticker.C:
+ flush()
+ case r := <-s.in:
+ batch = append(batch, r)
+ if len(batch) >= s.cfg.BatchSize {
+ s.commit(batch)
+ batch = batch[:0]
+ }
+ }
+ }
+}
+
+// commit persists batch and fires the OnCommit hook on success. Errors
+// are logged; we do not retry — losing a small slice of logs is
+// acceptable, but blocking the worker on a failing DB is not.
+func (s *SQLiteSink) commit(batch []Record) {
+ rows := make([]store.RequestLog, len(batch))
+ for i, r := range batch {
+ rows[i] = toStoreRow(r)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+ if err := s.store.InsertRequestLogs(ctx, rows); err != nil {
+ if s.logger != nil {
+ s.logger.Error("request_logs insert failed",
+ "err", err.Error(),
+ "batch_size", len(batch),
+ )
+ }
+ return
+ }
+ if s.cfg.OnCommit != nil {
+ // Worker reuses `batch` after commit returns; hand the
+ // callback its own copy so slow consumers don't see mutations.
+ cp := make([]Record, len(batch))
+ copy(cp, batch)
+ s.cfg.OnCommit(cp)
+ }
+}
+
+// isLogBoundary reports true when n is a power of two. Used to throttle
+// overflow warnings to 1, 2, 4, 8, ... dropped records.
+func isLogBoundary(n uint64) bool {
+ return n > 0 && (n&(n-1)) == 0
+}
diff --git a/internal/server/handle_logs.go b/internal/server/handle_logs.go
new file mode 100644
index 0000000..2ce6b8d
--- /dev/null
+++ b/internal/server/handle_logs.go
@@ -0,0 +1,141 @@
+package server
+
+import (
+ "fmt"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/Infisical/agent-vault/internal/store"
+)
+
+const (
+ logsDefaultLimit = 50
+ logsMaxLimit = 200
+)
+
+type logItem struct {
+ ID int64 `json:"id"`
+ Ingress string `json:"ingress"`
+ Method string `json:"method"`
+ Host string `json:"host"`
+ Path string `json:"path"`
+ MatchedService string `json:"matched_service"`
+ CredentialKeys []string `json:"credential_keys"`
+ Status int `json:"status"`
+ LatencyMs int64 `json:"latency_ms"`
+ ErrorCode string `json:"error_code"`
+ ActorType string `json:"actor_type"`
+ ActorID string `json:"actor_id"`
+ CreatedAt string `json:"created_at"`
+}
+
+func (s *Server) handleVaultLogsList(w http.ResponseWriter, r *http.Request) {
+ ctx := r.Context()
+ name := r.PathValue("name")
+
+ ns, err := s.store.GetVault(ctx, name)
+ if err != nil || ns == nil {
+ jsonError(w, http.StatusNotFound, fmt.Sprintf("Vault %q not found", name))
+ return
+ }
+
+ // Logs live alongside Proposals: member+ can read, proxy-only (agents)
+ // cannot. Owner-scoped instance-wide logs will be a separate handler.
+ if _, err := s.requireVaultMember(w, r, ns.ID); err != nil {
+ return
+ }
+
+ q := r.URL.Query()
+ opts := store.ListRequestLogsOpts{
+ VaultID: &ns.ID,
+ Ingress: q.Get("ingress"),
+ StatusBucket: q.Get("status_bucket"),
+ MatchedService: q.Get("service"),
+ }
+
+ if raw := q.Get("before"); raw != "" {
+ if v, err := strconv.ParseInt(raw, 10, 64); err == nil && v > 0 {
+ opts.Before = v
+ }
+ }
+ if raw := q.Get("after"); raw != "" {
+ if v, err := strconv.ParseInt(raw, 10, 64); err == nil && v > 0 {
+ opts.After = v
+ }
+ }
+ if opts.Before > 0 && opts.After > 0 {
+ jsonError(w, http.StatusBadRequest, "before and after are mutually exclusive")
+ return
+ }
+
+ limit := logsDefaultLimit
+ if raw := q.Get("limit"); raw != "" {
+ if v, err := strconv.Atoi(raw); err == nil && v > 0 {
+ limit = v
+ }
+ }
+ if limit > logsMaxLimit {
+ limit = logsMaxLimit
+ }
+ opts.Limit = limit
+
+ rows, err := s.store.ListRequestLogs(ctx, opts)
+ if err != nil {
+ jsonError(w, http.StatusInternalServerError, "Failed to list logs")
+ return
+ }
+
+ // Tail queries come back ASC (so bursts > page size don't lose the
+ // oldest rows); flip to DESC so the response contract stays
+ // newest-first regardless of cursor direction.
+ if opts.After > 0 {
+ for i, j := 0, len(rows)-1; i < j; i, j = i+1, j-1 {
+ rows[i], rows[j] = rows[j], rows[i]
+ }
+ }
+
+ items := make([]logItem, len(rows))
+ var latestID int64
+ for i, r := range rows {
+ if i == 0 {
+ latestID = r.ID
+ }
+ items[i] = logItem{
+ ID: r.ID,
+ Ingress: r.Ingress,
+ Method: r.Method,
+ Host: r.Host,
+ Path: r.Path,
+ MatchedService: r.MatchedService,
+ CredentialKeys: r.CredentialKeys,
+ Status: r.Status,
+ LatencyMs: r.LatencyMs,
+ ErrorCode: r.ErrorCode,
+ ActorType: r.ActorType,
+ ActorID: r.ActorID,
+ CreatedAt: r.CreatedAt.Format(time.RFC3339),
+ }
+ }
+
+ // When the caller is tailing (passed after=) and no new rows exist, we
+ // echo back their cursor so the next poll stays on the same high-water
+ // mark without a separate round-trip to learn "still nothing new".
+ if latestID == 0 {
+ latestID = opts.After
+ }
+
+ // next_cursor is the id to pass as `before` on the next page going back
+ // in time. Nil when we don't have a full page (end of history) or when
+ // the caller is tailing forward (`after` set).
+ var nextCursor any
+ if opts.After == 0 && len(rows) == limit {
+ nextCursor = rows[len(rows)-1].ID
+ }
+
+ jsonOK(w, map[string]any{
+ "logs": items,
+ "next_cursor": nextCursor,
+ "latest_id": latestID,
+ })
+}
diff --git a/internal/server/handle_proxy.go b/internal/server/handle_proxy.go
index 178820f..f6e7ddb 100644
--- a/internal/server/handle_proxy.go
+++ b/internal/server/handle_proxy.go
@@ -11,6 +11,7 @@ import (
"github.com/Infisical/agent-vault/internal/brokercore"
"github.com/Infisical/agent-vault/internal/netguard"
"github.com/Infisical/agent-vault/internal/ratelimit"
+ "github.com/Infisical/agent-vault/internal/requestlog"
"github.com/Infisical/agent-vault/internal/store"
)
@@ -107,12 +108,25 @@ func (s *Server) resolveVaultForSession(w http.ResponseWriter, r *http.Request,
func (s *Server) handleProxy(w http.ResponseWriter, r *http.Request) {
start := time.Now()
event := brokercore.ProxyEvent{
- Ingress: "explicit",
+ Ingress: brokercore.IngressExplicit,
Method: r.Method,
Path: r.URL.Path,
}
+ // Captured as the handler resolves the request; Emit hands off the
+ // terminal snapshot to both the slog line and the log sink.
+ var (
+ logVaultID string
+ logActorType string
+ logActorID string
+ )
emit := func(status int, errCode string) {
event.Emit(s.logger, start, status, errCode)
+ // Skip persistence for early-exit paths before vault resolution:
+ // an empty vault_id violates the FK constraint and would roll
+ // back the entire 250ms batch, losing valid records alongside.
+ if s.logSink != nil && logVaultID != "" {
+ s.logSink.Record(r.Context(), requestlog.FromEvent(event, logVaultID, logActorType, logActorID))
+ }
}
// 1. Parse target host and path from /proxy/{target_host}/{path...}
@@ -156,6 +170,12 @@ func (s *Server) handleProxy(w http.ResponseWriter, r *http.Request) {
emit(0, "vault_error")
return // error already written
}
+ logVaultID = ns.ID
+ if sess.UserID != "" {
+ logActorType, logActorID = brokercore.ActorTypeUser, sess.UserID
+ } else if sess.AgentID != "" {
+ logActorType, logActorID = brokercore.ActorTypeAgent, sess.AgentID
+ }
// Enforced post-vault-resolution; scope isn't known until here.
scope := &brokercore.ProxyScope{UserID: sess.UserID, AgentID: sess.AgentID}
diff --git a/internal/server/server.go b/internal/server/server.go
index 8cb4d05..8c41af6 100644
--- a/internal/server/server.go
+++ b/internal/server/server.go
@@ -23,6 +23,7 @@ import (
"github.com/Infisical/agent-vault/internal/oauth"
"github.com/Infisical/agent-vault/internal/pidfile"
"github.com/Infisical/agent-vault/internal/ratelimit"
+ "github.com/Infisical/agent-vault/internal/requestlog"
"github.com/Infisical/agent-vault/internal/store"
)
@@ -64,6 +65,7 @@ type Server struct {
mitm *mitm.Proxy // transparent MITM proxy; nil only when --mitm-port 0
logger *slog.Logger // structured logger for per-request observability
rateLimit *ratelimit.Registry // tiered rate limiter; shared with the MITM ingress
+ logSink requestlog.Sink // per-request persistence sink; never nil (Nop default)
}
// RateLimit returns the server's rate-limit registry. Exported so the
@@ -75,6 +77,20 @@ func (s *Server) RateLimit() *ratelimit.Registry { return s.rateLimit }
// stops it alongside the HTTP server.
func (s *Server) AttachMITM(p *mitm.Proxy) { s.mitm = p }
+// AttachLogSink swaps the per-request log sink. Safe to call once at
+// startup, before the HTTP server begins accepting connections. nil
+// resets to a Nop sink.
+func (s *Server) AttachLogSink(sink requestlog.Sink) {
+ if sink == nil {
+ sink = requestlog.Nop{}
+ }
+ s.logSink = sink
+}
+
+// LogSink returns the server's log sink. Shared with the MITM ingress so
+// both paths feed the same pipeline.
+func (s *Server) LogSink() requestlog.Sink { return s.logSink }
+
// SessionResolver returns a brokercore.SessionResolver backed by this
// server's store.
func (s *Server) SessionResolver() brokercore.SessionResolver {
@@ -236,6 +252,13 @@ type Store interface {
CreateAgentToken(ctx context.Context, agentID string, expiresAt *time.Time) (*store.Session, error)
CountAllOwners(ctx context.Context) (int, error)
+ // Request logs
+ InsertRequestLogs(ctx context.Context, rows []store.RequestLog) error
+ ListRequestLogs(ctx context.Context, opts store.ListRequestLogsOpts) ([]store.RequestLog, error)
+ DeleteOldRequestLogs(ctx context.Context, before time.Time) (int64, error)
+ TrimRequestLogsToCap(ctx context.Context, vaultID string, cap int64) (int64, error)
+ VaultIDsWithLogs(ctx context.Context) ([]string, error)
+
Close() error
}
@@ -549,6 +572,7 @@ func New(addr string, store Store, encKey []byte, notifier *notify.Notifier, ini
oauthProviders: oauthProviders,
logger: logger,
rateLimit: rl,
+ logSink: requestlog.Nop{},
}
ipAuth := s.tier(ratelimit.TierAuth, s.ipKeyer())
@@ -649,6 +673,7 @@ func New(addr string, store Store, encKey []byte, notifier *notify.Notifier, ini
mux.HandleFunc("DELETE /v1/vaults/{name}/services/{host}", s.requireInitialized(s.requireAuth(actorAuthed(s.handleServiceRemove))))
mux.HandleFunc("DELETE /v1/vaults/{name}/services", s.requireInitialized(s.requireAuth(actorAuthed(s.handleServicesClear))))
mux.HandleFunc("GET /v1/vaults/{name}/services/credential-usage", s.requireInitialized(s.requireAuth(actorAuthed(s.handleServicesCredentialUsage))))
+ mux.HandleFunc("GET /v1/vaults/{name}/logs", s.requireInitialized(s.requireAuth(actorAuthed(s.handleVaultLogsList))))
// Public static reads — immutable payloads with no credentials on
// the wire. TierGlobal is the only useful backstop; TierAuth would
// punish `vault run` (CA fetch per invocation) and the dashboard
diff --git a/internal/server/server_test.go b/internal/server/server_test.go
index ac328df..c9b5716 100644
--- a/internal/server/server_test.go
+++ b/internal/server/server_test.go
@@ -460,6 +460,27 @@ func (m *mockStore) UpdateAgentInviteVaultRole(_ context.Context, inviteID int,
func (m *mockStore) Close() error { return nil }
+// --- Request log stubs (unused in server tests; storage-level tests
+// live in the store package). ---
+
+func (m *mockStore) InsertRequestLogs(_ context.Context, _ []store.RequestLog) error {
+ return nil
+}
+
+func (m *mockStore) ListRequestLogs(_ context.Context, _ store.ListRequestLogsOpts) ([]store.RequestLog, error) {
+ return nil, nil
+}
+
+func (m *mockStore) DeleteOldRequestLogs(_ context.Context, _ time.Time) (int64, error) {
+ return 0, nil
+}
+
+func (m *mockStore) TrimRequestLogsToCap(_ context.Context, _ string, _ int64) (int64, error) {
+ return 0, nil
+}
+
+func (m *mockStore) VaultIDsWithLogs(_ context.Context) ([]string, error) { return nil, nil }
+
// --- Multi-user permission model mocks ---
func (m *mockStore) GetUserByID(_ context.Context, id string) (*store.User, error) {
diff --git a/internal/store/migrations/039_request_logs.sql b/internal/store/migrations/039_request_logs.sql
new file mode 100644
index 0000000..ca19342
--- /dev/null
+++ b/internal/store/migrations/039_request_logs.sql
@@ -0,0 +1,19 @@
+CREATE TABLE request_logs (
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
+ vault_id TEXT NOT NULL REFERENCES vaults(id) ON DELETE CASCADE,
+ actor_type TEXT NOT NULL DEFAULT '',
+ actor_id TEXT NOT NULL DEFAULT '',
+ ingress TEXT NOT NULL,
+ method TEXT NOT NULL,
+ host TEXT NOT NULL,
+ path TEXT NOT NULL,
+ matched_service TEXT NOT NULL DEFAULT '',
+ credential_keys TEXT NOT NULL DEFAULT '[]',
+ status INTEGER NOT NULL DEFAULT 0,
+ latency_ms INTEGER NOT NULL DEFAULT 0,
+ error_code TEXT NOT NULL DEFAULT '',
+ created_at TEXT NOT NULL DEFAULT (datetime('now'))
+);
+
+CREATE INDEX idx_request_logs_vault_id_desc ON request_logs(vault_id, id DESC);
+CREATE INDEX idx_request_logs_id_desc ON request_logs(id DESC);
diff --git a/internal/store/sqlite.go b/internal/store/sqlite.go
index 68638d4..74b29ab 100644
--- a/internal/store/sqlite.go
+++ b/internal/store/sqlite.go
@@ -6,6 +6,7 @@ import (
"crypto/sha256"
"database/sql"
"encoding/hex"
+ "encoding/json"
"fmt"
"io"
"os"
@@ -2741,3 +2742,213 @@ func newUUID() string {
func newSessionToken() string { return newPrefixedToken("av_sess_") }
func newAgentToken() string { return newPrefixedToken("av_agt_") }
+
+// --- Request Logs ---
+
+// InsertRequestLogs persists a batch of request logs inside a single
+// transaction. Credential key names are stored as a JSON array.
+// Callers are expected to pre-filter out anything secret; the store does
+// not validate fields beyond the column types.
+func (s *SQLiteStore) InsertRequestLogs(ctx context.Context, rows []RequestLog) error {
+ if len(rows) == 0 {
+ return nil
+ }
+ tx, err := s.db.BeginTx(ctx, nil)
+ if err != nil {
+ return fmt.Errorf("beginning request_logs tx: %w", err)
+ }
+ defer func() { _ = tx.Rollback() }()
+
+ stmt, err := tx.PrepareContext(ctx, `
+ INSERT INTO request_logs
+ (vault_id, actor_type, actor_id, ingress, method, host, path,
+ matched_service, credential_keys, status, latency_ms, error_code, created_at)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
+ if err != nil {
+ return fmt.Errorf("preparing request_logs insert: %w", err)
+ }
+ defer func() { _ = stmt.Close() }()
+
+ for _, r := range rows {
+ keys := r.CredentialKeys
+ if keys == nil {
+ keys = []string{}
+ }
+ keysJSON, err := json.Marshal(keys)
+ if err != nil {
+ return fmt.Errorf("marshaling credential_keys: %w", err)
+ }
+ createdAt := r.CreatedAt
+ if createdAt.IsZero() {
+ createdAt = time.Now()
+ }
+ if _, err := stmt.ExecContext(ctx,
+ r.VaultID, r.ActorType, r.ActorID, r.Ingress, r.Method, r.Host, r.Path,
+ r.MatchedService, string(keysJSON), r.Status, r.LatencyMs, r.ErrorCode,
+ createdAt.UTC().Format(time.DateTime),
+ ); err != nil {
+ return fmt.Errorf("inserting request_log: %w", err)
+ }
+ }
+ if err := tx.Commit(); err != nil {
+ return fmt.Errorf("committing request_logs: %w", err)
+ }
+ return nil
+}
+
+// ListRequestLogs returns logs matching opts, newest first. Pagination is
+// cursor-based via opts.Before (historical) or opts.After (tailing).
+// opts.Limit is used as-is; callers must cap it.
+func (s *SQLiteStore) ListRequestLogs(ctx context.Context, opts ListRequestLogsOpts) ([]RequestLog, error) {
+ var (
+ where []string
+ args []any
+ )
+ if opts.VaultID != nil {
+ where = append(where, "vault_id = ?")
+ args = append(args, *opts.VaultID)
+ }
+ if opts.Ingress != "" {
+ where = append(where, "ingress = ?")
+ args = append(args, opts.Ingress)
+ }
+ if opts.MatchedService != "" {
+ where = append(where, "matched_service = ?")
+ args = append(args, opts.MatchedService)
+ }
+ switch opts.StatusBucket {
+ case "2xx":
+ where = append(where, "status >= 200 AND status < 300")
+ case "3xx":
+ where = append(where, "status >= 300 AND status < 400")
+ case "4xx":
+ where = append(where, "status >= 400 AND status < 500")
+ case "5xx":
+ where = append(where, "status >= 500 AND status < 600")
+ case "err":
+ where = append(where, "(error_code != '' OR status >= 400)")
+ }
+ if opts.Before > 0 {
+ where = append(where, "id < ?")
+ args = append(args, opts.Before)
+ }
+ if opts.After > 0 {
+ where = append(where, "id > ?")
+ args = append(args, opts.After)
+ }
+
+ limit := opts.Limit
+ if limit <= 0 {
+ limit = 50
+ }
+
+ query := `SELECT id, vault_id, actor_type, actor_id, ingress, method, host, path,
+ matched_service, credential_keys, status, latency_ms, error_code, created_at
+ FROM request_logs`
+ if len(where) > 0 {
+ query += " WHERE " + strings.Join(where, " AND ") // #nosec G202 -- where entries are static predicate strings; all user input flows through args as ? placeholders
+ }
+ // Tailing (After > 0) scans ASC so bursts larger than the page are
+ // consumed oldest-first — a DESC LIMIT would skip the oldest new
+ // rows and silently lose them on the next poll. Historical paging
+ // (Before, or no cursor) stays DESC for newest-first display.
+ if opts.After > 0 {
+ query += " ORDER BY id ASC LIMIT ?"
+ } else {
+ query += " ORDER BY id DESC LIMIT ?"
+ }
+ args = append(args, limit)
+
+ rows, err := s.db.QueryContext(ctx, query, args...)
+ if err != nil {
+ return nil, fmt.Errorf("listing request_logs: %w", err)
+ }
+ defer func() { _ = rows.Close() }()
+
+ var out []RequestLog
+ for rows.Next() {
+ var rl RequestLog
+ var keysJSON, createdAt string
+ if err := rows.Scan(
+ &rl.ID, &rl.VaultID, &rl.ActorType, &rl.ActorID, &rl.Ingress,
+ &rl.Method, &rl.Host, &rl.Path, &rl.MatchedService, &keysJSON,
+ &rl.Status, &rl.LatencyMs, &rl.ErrorCode, &createdAt,
+ ); err != nil {
+ return nil, fmt.Errorf("scanning request_log: %w", err)
+ }
+ if keysJSON != "" {
+ _ = json.Unmarshal([]byte(keysJSON), &rl.CredentialKeys)
+ }
+ rl.CreatedAt, _ = time.Parse(time.DateTime, createdAt)
+ out = append(out, rl)
+ }
+ return out, rows.Err()
+}
+
+// DeleteOldRequestLogs deletes rows older than before across all vaults.
+// Returns the number of rows affected.
+func (s *SQLiteStore) DeleteOldRequestLogs(ctx context.Context, before time.Time) (int64, error) {
+ res, err := s.db.ExecContext(ctx,
+ `DELETE FROM request_logs WHERE created_at < ?`,
+ before.UTC().Format(time.DateTime),
+ )
+ if err != nil {
+ return 0, fmt.Errorf("deleting old request_logs: %w", err)
+ }
+ n, _ := res.RowsAffected()
+ return n, nil
+}
+
+// TrimRequestLogsToCap keeps at most cap rows for vaultID, deleting the
+// oldest beyond that ceiling. Returns rows deleted. Short-circuits when
+// the vault is under the cap so steady-state calls do no index-walk work.
+func (s *SQLiteStore) TrimRequestLogsToCap(ctx context.Context, vaultID string, cap int64) (int64, error) {
+ if cap <= 0 {
+ return 0, nil
+ }
+ var count int64
+ if err := s.db.QueryRowContext(ctx,
+ `SELECT COUNT(*) FROM request_logs WHERE vault_id = ?`, vaultID,
+ ).Scan(&count); err != nil {
+ return 0, fmt.Errorf("counting request_logs: %w", err)
+ }
+ if count <= cap {
+ return 0, nil
+ }
+ res, err := s.db.ExecContext(ctx, `
+ DELETE FROM request_logs
+ WHERE vault_id = ?
+ AND id <= (
+ SELECT id FROM request_logs
+ WHERE vault_id = ?
+ ORDER BY id DESC
+ LIMIT 1 OFFSET ?
+ )`,
+ vaultID, vaultID, cap,
+ )
+ if err != nil {
+ return 0, fmt.Errorf("trimming request_logs: %w", err)
+ }
+ n, _ := res.RowsAffected()
+ return n, nil
+}
+
+// VaultIDsWithLogs returns the distinct vault IDs that have at least one
+// persisted request log. Used by the retention ticker to scope per-vault
+// trimming without iterating every vault.
+func (s *SQLiteStore) VaultIDsWithLogs(ctx context.Context) ([]string, error) {
+ rows, err := s.db.QueryContext(ctx, `SELECT DISTINCT vault_id FROM request_logs`)
+ if err != nil {
+ return nil, fmt.Errorf("listing log vault ids: %w", err)
+ }
+ defer func() { _ = rows.Close() }()
+ var out []string
+ for rows.Next() {
+ var id string
+ if err := rows.Scan(&id); err != nil {
+ return nil, err
+ }
+ out = append(out, id)
+ }
+ return out, rows.Err()
+}
diff --git a/internal/store/sqlite_test.go b/internal/store/sqlite_test.go
index 75dbb5f..2caa1ec 100644
--- a/internal/store/sqlite_test.go
+++ b/internal/store/sqlite_test.go
@@ -28,8 +28,8 @@ func TestOpenAndMigrate(t *testing.T) {
if err != nil {
t.Fatalf("querying schema_migrations: %v", err)
}
- if version != 38 {
- t.Fatalf("expected migration version 38, got %d", version)
+ if version != 39 {
+ t.Fatalf("expected migration version 39, got %d", version)
}
}
@@ -1785,3 +1785,67 @@ func TestExpirePendingPasswordResets(t *testing.T) {
t.Fatalf("expected 1 pending after expiry, got %d", count)
}
}
+
+// TestListRequestLogsTailOrdering is a regression test: when a burst
+// larger than the page size lands between polls, the tail query must
+// consume the oldest rows first so subsequent polls can advance the
+// cursor through the whole burst without gaps. Before the ASC fix,
+// `ORDER BY id DESC LIMIT N` returned the *newest* N rows and silently
+// lost the older ones on the next poll.
+func TestListRequestLogsTailOrdering(t *testing.T) {
+ s := openTestDB(t)
+ ctx := context.Background()
+
+ ns, err := s.CreateVault(ctx, "logs")
+ if err != nil {
+ t.Fatalf("CreateVault: %v", err)
+ }
+
+ // Insert 10 rows so we can page through with Limit=3.
+ rows := make([]RequestLog, 10)
+ for i := range rows {
+ rows[i] = RequestLog{
+ VaultID: ns.ID,
+ Ingress: "explicit",
+ Method: "GET",
+ Host: "api.example.com",
+ Path: "/",
+ Status: 200,
+ }
+ }
+ if err := s.InsertRequestLogs(ctx, rows); err != nil {
+ t.Fatalf("InsertRequestLogs: %v", err)
+ }
+
+ // Historical page (no cursor) returns newest-first.
+ page, err := s.ListRequestLogs(ctx, ListRequestLogsOpts{VaultID: &ns.ID, Limit: 3})
+ if err != nil {
+ t.Fatalf("initial list: %v", err)
+ }
+ if len(page) != 3 {
+ t.Fatalf("initial page size = %d, want 3", len(page))
+ }
+ if page[0].ID <= page[1].ID || page[1].ID <= page[2].ID {
+ t.Fatalf("historical page not DESC: %v", []int64{page[0].ID, page[1].ID, page[2].ID})
+ }
+
+ // Tail from an id boundary: returns rows (boundary, boundary+Limit]
+ // in ASC order so a subsequent poll with after=boundary+Limit picks
+ // up from there with no gap. Before the fix, the query was
+ // `ORDER BY id DESC LIMIT N`, which returned the newest N rows above
+ // the boundary and silently dropped the older ones.
+ boundary := page[2].ID - 1
+ tail, err := s.ListRequestLogs(ctx, ListRequestLogsOpts{VaultID: &ns.ID, After: boundary, Limit: 3})
+ if err != nil {
+ t.Fatalf("tail: %v", err)
+ }
+ if len(tail) != 3 {
+ t.Fatalf("tail size = %d, want 3", len(tail))
+ }
+ if tail[0].ID >= tail[1].ID || tail[1].ID >= tail[2].ID {
+ t.Fatalf("tail not ASC: %v", []int64{tail[0].ID, tail[1].ID, tail[2].ID})
+ }
+ if tail[0].ID != boundary+1 {
+ t.Fatalf("tail should start at id %d, got %d", boundary+1, tail[0].ID)
+ }
+}
diff --git a/internal/store/store.go b/internal/store/store.go
index 43fc01d..0b4e12e 100644
--- a/internal/store/store.go
+++ b/internal/store/store.go
@@ -124,6 +124,39 @@ type EncryptedCredential struct {
Nonce []byte
}
+// RequestLog is a persisted record of a single proxied request. Secret-free
+// by construction: no header values, no bodies, no query strings — only
+// metadata already safe to log (see internal/brokercore/logging.go).
+type RequestLog struct {
+ ID int64
+ VaultID string
+ ActorType string // brokercore.ActorType{User,Agent} or ""
+ ActorID string
+ Ingress string // brokercore.Ingress{Explicit,MITM}
+ Method string
+ Host string
+ Path string
+ MatchedService string
+ CredentialKeys []string
+ Status int
+ LatencyMs int64
+ ErrorCode string
+ CreatedAt time.Time
+}
+
+// ListRequestLogsOpts controls the ListRequestLogs query.
+// Exactly one of Before or After may be set (both zero returns the newest page).
+// VaultID == nil means "all vaults" — reserved for a future owner-only endpoint.
+type ListRequestLogsOpts struct {
+ VaultID *string
+ Ingress string // "", "explicit", "mitm"
+ StatusBucket string // "", "2xx", "3xx", "4xx", "5xx", "err"
+ MatchedService string
+ Before int64 // rows with id < Before (pagination going back)
+ After int64 // rows with id > After (polling for new rows)
+ Limit int // capped at 200 by handler; store trusts caller
+}
+
// Invite represents a named agent invite with optional vault pre-assignments.
// All invites create named, instance-level agents on redemption.
type Invite struct {
@@ -384,6 +417,13 @@ type Store interface {
SetSetting(ctx context.Context, key, value string) error
GetAllSettings(ctx context.Context) (map[string]string, error)
+ // Request logs
+ InsertRequestLogs(ctx context.Context, rows []RequestLog) error
+ ListRequestLogs(ctx context.Context, opts ListRequestLogsOpts) ([]RequestLog, error)
+ DeleteOldRequestLogs(ctx context.Context, before time.Time) (int64, error)
+ TrimRequestLogsToCap(ctx context.Context, vaultID string, cap int64) (int64, error)
+ VaultIDsWithLogs(ctx context.Context) ([]string, error)
+
// Lifecycle
Close() error
}
diff --git a/web/src/components/LogsView.tsx b/web/src/components/LogsView.tsx
new file mode 100644
index 0000000..07d322a
--- /dev/null
+++ b/web/src/components/LogsView.tsx
@@ -0,0 +1,360 @@
+import { useEffect, useMemo, useRef, useState } from "react";
+import { apiFetch } from "../lib/api";
+import { ErrorBanner, LoadingSpinner, timeAgo } from "./shared";
+import DataTable, { type Column } from "./DataTable";
+import Button from "./Button";
+import Modal from "./Modal";
+
+export interface LogEntry {
+ id: number;
+ ingress: string;
+ method: string;
+ host: string;
+ path: string;
+ matched_service: string;
+ credential_keys: string[];
+ status: number;
+ latency_ms: number;
+ error_code: string;
+ actor_type: string;
+ actor_id: string;
+ created_at: string;
+}
+
+interface LogsResponse {
+ logs: LogEntry[];
+ next_cursor: number | null;
+ latest_id: number;
+}
+
+type StatusFilter = "all" | "errors";
+
+interface LogsViewProps {
+ /** Absolute URL the view hits; e.g. `/v1/vaults/my-vault/logs`. */
+ endpoint: string;
+ /** Page size. Defaults to 50 (server caps at 200). */
+ limit?: number;
+ /** How often to poll for new rows. Defaults to 3000 ms; set to 0 to disable. */
+ pollMs?: number;
+ title?: string;
+ description?: string;
+}
+
+export default function LogsView({
+ endpoint,
+ limit = 50,
+ pollMs = 3000,
+ title = "Request Logs",
+ description = "Recent proxied requests. Bodies and query strings are never recorded.",
+}: LogsViewProps) {
+ const [rows, setRows] = useState([]);
+ const [loading, setLoading] = useState(true);
+ const [error, setError] = useState("");
+ const [statusFilter, setStatusFilter] = useState("all");
+ const [nextCursor, setNextCursor] = useState(null);
+ const [loadingMore, setLoadingMore] = useState(false);
+ const [paused, setPaused] = useState(false);
+ const [selected, setSelected] = useState(null);
+
+ const latestIdRef = useRef(0);
+ const initializedRef = useRef(false);
+ const abortRef = useRef(null);
+
+ const filterQS = useMemo(() => {
+ const parts: string[] = [`limit=${limit}`];
+ if (statusFilter === "errors") parts.push("status_bucket=err");
+ return parts.join("&");
+ }, [statusFilter, limit]);
+
+ // Reset when filters change.
+ useEffect(() => {
+ latestIdRef.current = 0;
+ initializedRef.current = false;
+ setRows([]);
+ setNextCursor(null);
+ setError("");
+ setLoading(true);
+ loadInitial();
+ // eslint-disable-next-line react-hooks/exhaustive-deps
+ }, [endpoint, filterQS]);
+
+ // Poll for new rows (tailing).
+ useEffect(() => {
+ if (pollMs <= 0 || paused) return;
+ const id = setInterval(() => {
+ pollNew();
+ }, pollMs);
+ return () => clearInterval(id);
+ // eslint-disable-next-line react-hooks/exhaustive-deps
+ }, [endpoint, filterQS, pollMs, paused]);
+
+ async function loadInitial() {
+ abortRef.current?.abort();
+ const controller = new AbortController();
+ abortRef.current = controller;
+ try {
+ const resp = await apiFetch(`${endpoint}?${filterQS}`, { signal: controller.signal });
+ if (!resp.ok) {
+ const data = await resp.json().catch(() => ({}));
+ setError(data.error || "Failed to load logs.");
+ return;
+ }
+ const data: LogsResponse = await resp.json();
+ setRows(data.logs ?? []);
+ setNextCursor(data.next_cursor);
+ latestIdRef.current = data.latest_id || 0;
+ initializedRef.current = true;
+ } catch (err) {
+ if (err instanceof DOMException && err.name === "AbortError") return;
+ setError("Network error.");
+ } finally {
+ setLoading(false);
+ }
+ }
+
+ async function pollNew() {
+ // Gate on the initial load completing, not on cursor > 0 — an empty
+ // vault legitimately reports latest_id=0 and still needs polls so
+ // the first row shows up without a reload.
+ if (!initializedRef.current) return;
+ try {
+ const resp = await apiFetch(`${endpoint}?${filterQS}&after=${latestIdRef.current}`);
+ if (!resp.ok) return;
+ const data: LogsResponse = await resp.json();
+ const fresh = data.logs ?? [];
+ if (fresh.length > 0) {
+ setRows((prev) => [...fresh, ...prev]);
+ }
+ // Guard against out-of-order poll responses rolling the cursor back.
+ if (data.latest_id > latestIdRef.current) {
+ latestIdRef.current = data.latest_id;
+ }
+ } catch {
+ // ignore; poll errors are silent
+ }
+ }
+
+ async function loadMore() {
+ if (nextCursor === null) return;
+ setLoadingMore(true);
+ try {
+ const resp = await apiFetch(`${endpoint}?${filterQS}&before=${nextCursor}`);
+ if (!resp.ok) return;
+ const data: LogsResponse = await resp.json();
+ const older = data.logs ?? [];
+ if (older.length > 0) {
+ setRows((prev) => [...prev, ...older]);
+ }
+ setNextCursor(data.next_cursor);
+ } finally {
+ setLoadingMore(false);
+ }
+ }
+
+ const columns: Column[] = [
+ {
+ key: "time",
+ header: "Time",
+ render: (r) => (
+
+ {timeAgo(r.created_at)}
+
+ ),
+ },
+ {
+ key: "method",
+ header: "Method",
+ render: (r) => (
+ {r.method}
+ ),
+ },
+ {
+ key: "endpoint",
+ header: "Endpoint",
+ render: (r) => (
+
- Works with Claude Code, Cursor, ChatGPT, and other chat-based agents.
+ Works with Claude Code, Cursor, ChatGPT, and other chat-based agents. For agents you can't paste into, see Manual setup.
>
) : (
- <>
-
- Copy the token to configure the agent directly via environment variables.
-
+ Agent Vault's transparent proxy presents TLS leaves signed by its own CA. Save the certificate and trust it so your agent's HTTPS client can verify those leaves.
+