From d1970ec5ffdd21b4a033bf7015249a0c19e1086c Mon Sep 17 00:00:00 2001 From: Brandon Date: Wed, 15 Apr 2026 10:07:54 -0500 Subject: [PATCH] fix: CLI capabilities parsing and Go relay SQLite read/write race Two fixes: 1. CLI: RelayInfo struct now correctly reads capabilities from the nested `capabilities` object in the well-known response instead of expecting top-level `proof`/`content` fields (which always deserialized as false). 2. Go relay: HTTP read handlers (routes, auth) now use a read-only SQLiteStore view that always reads from the WAL read connection pool, never from an active write transaction. Previously, concurrent HTTP reads could grab the ingestion transaction via readerDB(), then fail with "transaction has already been committed" when ingestion committed the batch mid-query. Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/dfos-cli/internal/client/client.go | 25 ++++++++++---- packages/dfos-web-relay-go/auth.go | 14 ++++---- packages/dfos-web-relay-go/relay.go | 12 ++++++- packages/dfos-web-relay-go/routes.go | 38 ++++++++++----------- packages/dfos-web-relay-go/store_sqlite.go | 28 +++++++++++---- 5 files changed, 78 insertions(+), 39 deletions(-) diff --git a/packages/dfos-cli/internal/client/client.go b/packages/dfos-cli/internal/client/client.go index b543b92..b99e679 100644 --- a/packages/dfos-cli/internal/client/client.go +++ b/packages/dfos-cli/internal/client/client.go @@ -25,12 +25,23 @@ func New(baseURL string) *Client { // RelayInfo is the response from /.well-known/dfos-relay. type RelayInfo struct { - DID string `json:"did"` - Protocol string `json:"protocol"` - Version string `json:"version"` - Proof bool `json:"proof"` - Content bool `json:"content"` - Profile string `json:"profile,omitempty"` + DID string `json:"did"` + Protocol string `json:"protocol"` + Version string `json:"version"` + Capabilities RelayCapabilities `json:"capabilities"` + Profile string `json:"profile,omitempty"` + + // Convenience accessors populated after unmarshal. + Proof bool `json:"-"` + Content bool `json:"-"` +} + +// RelayCapabilities are the nested capability flags from the well-known response. +type RelayCapabilities struct { + Proof bool `json:"proof"` + Content bool `json:"content"` + Log bool `json:"log"` + Documents bool `json:"documents"` } // IdentityResponse is the response from GET /identities/:did. @@ -100,6 +111,8 @@ func (c *Client) GetRelayInfo() (*RelayInfo, error) { if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { return nil, err } + info.Proof = info.Capabilities.Proof + info.Content = info.Capabilities.Content return &info, nil } diff --git a/packages/dfos-web-relay-go/auth.go b/packages/dfos-web-relay-go/auth.go index 85b1d2a..a7b6f8a 100644 --- a/packages/dfos-web-relay-go/auth.go +++ b/packages/dfos-web-relay-go/auth.go @@ -70,7 +70,7 @@ func (r *Relay) verifyContentAccess(requesterDID string, creatorDID string, requ resolveKey := CreateKeyResolver(r.store) // 2. check stored public credentials - publicCreds, _ := r.store.GetPublicCredentials(requestedResource) + publicCreds, _ := r.readStore.GetPublicCredentials(requestedResource) for _, credJws := range publicCreds { if err := r.verifyCredentialForAccess(credJws, resolveKey, requestedResource, action, creatorDID, ""); err == nil { return "" @@ -109,7 +109,7 @@ func (r *Relay) verifyCredentialForAccess(credJws string, resolveKey dfos.KeyRes issuerDID := kid[:strings.Index(kid, "#")] // check issuer identity is not deleted - issuerIdentity, _ := r.store.GetIdentityChain(issuerDID) + issuerIdentity, _ := r.readStore.GetIdentityChain(issuerDID) if issuerIdentity != nil && issuerIdentity.State.IsDeleted { return fmt.Errorf("credential issuer identity is deleted") } @@ -129,7 +129,7 @@ func (r *Relay) verifyCredentialForAccess(credJws string, resolveKey dfos.KeyRes } // check leaf revocation - revoked, _ := r.store.IsCredentialRevoked(verified.Iss, verified.CID) + revoked, _ := r.readStore.IsCredentialRevoked(verified.Iss, verified.CID) if revoked { return fmt.Errorf("credential is revoked") } @@ -203,7 +203,7 @@ func (r *Relay) verifyDelegationChain(childJws string, prf []string, childAtt [] parentIssuerDID := pKid[:strings.Index(pKid, "#")] // check parent issuer identity is not deleted - parentIdentity, _ := r.store.GetIdentityChain(parentIssuerDID) + parentIdentity, _ := r.readStore.GetIdentityChain(parentIssuerDID) if parentIdentity != nil && parentIdentity.State.IsDeleted { return fmt.Errorf("parent credential issuer identity is deleted") } @@ -219,7 +219,7 @@ func (r *Relay) verifyDelegationChain(childJws string, prf []string, childAtt [] } // check revocation at every level - prevoked, _ := r.store.IsCredentialRevoked(pVerified.Iss, pVerified.CID) + prevoked, _ := r.readStore.IsCredentialRevoked(pVerified.Iss, pVerified.CID) if prevoked { return fmt.Errorf("parent credential in delegation chain is revoked") } @@ -327,7 +327,7 @@ func (r *Relay) matchesResource(att []dfos.AttEntry, resource string, action str // manifestLookup resolves which contentIds a manifest indexes by reading its // head document blob and extracting entries values. func (r *Relay) manifestLookup(manifestContentID string) []string { - chain, err := r.store.GetContentChain(manifestContentID) + chain, err := r.readStore.GetContentChain(manifestContentID) if err != nil || chain == nil { return nil } @@ -336,7 +336,7 @@ func (r *Relay) manifestLookup(manifestContentID string) []string { } docCID := *chain.State.CurrentDocumentCID - blob, _ := r.store.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: docCID}) + blob, _ := r.readStore.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: docCID}) if blob == nil { return nil } diff --git a/packages/dfos-web-relay-go/relay.go b/packages/dfos-web-relay-go/relay.go index 7363a01..506c514 100644 --- a/packages/dfos-web-relay-go/relay.go +++ b/packages/dfos-web-relay-go/relay.go @@ -18,7 +18,8 @@ type BatchableStore interface { // Relay is a DFOS web relay — the core verification and storage engine. type Relay struct { - store Store + store Store // ingestion store — sees write transactions for within-batch reads + readStore Store // HTTP read store — always uses WAL read pool, never races on tx did string profileArtifactJWS string contentEnabled bool @@ -53,8 +54,17 @@ func NewRelay(opts RelayOptions) (*Relay, error) { logger = slog.Default() } + // If the store supports it, create a read-only view for HTTP handlers + // that never races on the write transaction. Falls back to the main store + // for non-SQLite backends (e.g. in-memory test store). + readStore := opts.Store + if sqlStore, ok := opts.Store.(*SQLiteStore); ok { + readStore = sqlStore.ReadStore() + } + return &Relay{ store: opts.Store, + readStore: readStore, did: identity.DID, profileArtifactJWS: identity.ProfileArtifactJWS, contentEnabled: contentEnabled, diff --git a/packages/dfos-web-relay-go/routes.go b/packages/dfos-web-relay-go/routes.go index 6562234..5aa8567 100644 --- a/packages/dfos-web-relay-go/routes.go +++ b/packages/dfos-web-relay-go/routes.go @@ -103,7 +103,7 @@ func (r *Relay) handlePostOperations(w http.ResponseWriter, req *http.Request) { func (r *Relay) handleGetOperation(w http.ResponseWriter, req *http.Request) { cid := req.PathValue("cid") - op, err := r.store.GetOperation(cid) + op, err := r.readStore.GetOperation(cid) if storeErr(w, err) { return } @@ -120,7 +120,7 @@ func (r *Relay) handleGetOperation(w http.ResponseWriter, req *http.Request) { func (r *Relay) handleGetIdentity(w http.ResponseWriter, req *http.Request) { did := req.PathValue("did") - chain, err := r.store.GetIdentityChain(did) + chain, err := r.readStore.GetIdentityChain(did) if storeErr(w, err) { return } @@ -147,7 +147,7 @@ func (r *Relay) handleGetIdentity(w http.ResponseWriter, req *http.Request) { } after = *page.Cursor } - chain, _ = r.store.GetIdentityChain(did) + chain, _ = r.readStore.GetIdentityChain(did) if chain != nil { break } @@ -167,7 +167,7 @@ func (r *Relay) handleGetIdentity(w http.ResponseWriter, req *http.Request) { func (r *Relay) handleIdentityLog(w http.ResponseWriter, req *http.Request) { did := req.PathValue("did") - chain, err := r.store.GetIdentityChain(did) + chain, err := r.readStore.GetIdentityChain(did) if storeErr(w, err) { return } @@ -234,7 +234,7 @@ func (r *Relay) handleIdentityLog(w http.ResponseWriter, req *http.Request) { func (r *Relay) handleGetContent(w http.ResponseWriter, req *http.Request) { contentID := req.PathValue("contentId") - chain, err := r.store.GetContentChain(contentID) + chain, err := r.readStore.GetContentChain(contentID) if storeErr(w, err) { return } @@ -261,7 +261,7 @@ func (r *Relay) handleGetContent(w http.ResponseWriter, req *http.Request) { } after = *page.Cursor } - chain, _ = r.store.GetContentChain(contentID) + chain, _ = r.readStore.GetContentChain(contentID) if chain != nil { break } @@ -282,7 +282,7 @@ func (r *Relay) handleGetContent(w http.ResponseWriter, req *http.Request) { func (r *Relay) handleContentLog(w http.ResponseWriter, req *http.Request) { contentID := req.PathValue("contentId") - chain, err := r.store.GetContentChain(contentID) + chain, err := r.readStore.GetContentChain(contentID) if storeErr(w, err) { return } @@ -348,12 +348,12 @@ func (r *Relay) handleContentLog(w http.ResponseWriter, req *http.Request) { func (r *Relay) handleGetCountersignatures(w http.ResponseWriter, req *http.Request) { cid := req.PathValue("cid") - op, err := r.store.GetOperation(cid) + op, err := r.readStore.GetOperation(cid) if storeErr(w, err) { return } if op == nil { - cs, csErr := r.store.GetCountersignatures(cid) + cs, csErr := r.readStore.GetCountersignatures(cid) if storeErr(w, csErr) { return } @@ -368,7 +368,7 @@ func (r *Relay) handleGetCountersignatures(w http.ResponseWriter, req *http.Requ return } - cs, csErr := r.store.GetCountersignatures(cid) + cs, csErr := r.readStore.GetCountersignatures(cid) if storeErr(w, csErr) { return } @@ -381,7 +381,7 @@ func (r *Relay) handleGetCountersignatures(w http.ResponseWriter, req *http.Requ func (r *Relay) handleOperationCountersignatures(w http.ResponseWriter, req *http.Request) { cid := req.PathValue("cid") - op, err := r.store.GetOperation(cid) + op, err := r.readStore.GetOperation(cid) if storeErr(w, err) { return } @@ -390,7 +390,7 @@ func (r *Relay) handleOperationCountersignatures(w http.ResponseWriter, req *htt return } - cs, csErr := r.store.GetCountersignatures(cid) + cs, csErr := r.readStore.GetCountersignatures(cid) if storeErr(w, csErr) { return } @@ -406,7 +406,7 @@ func (r *Relay) handleOperationCountersignatures(w http.ResponseWriter, req *htt func (r *Relay) handleGetBeacon(w http.ResponseWriter, req *http.Request) { did := req.PathValue("did") - beacon, err := r.store.GetBeacon(did) + beacon, err := r.readStore.GetBeacon(did) if storeErr(w, err) { return } @@ -435,7 +435,7 @@ func (r *Relay) handleGetLog(w http.ResponseWriter, req *http.Request) { after := req.URL.Query().Get("after") limit := parseLimit(req, 100, 1000) - entries, cursor, err := r.store.ReadLog(after, limit) + entries, cursor, err := r.readStore.ReadLog(after, limit) if storeErr(w, err) { return } @@ -475,7 +475,7 @@ func (r *Relay) handlePutBlob(w http.ResponseWriter, req *http.Request) { } // verify chain exists - chain, err := r.store.GetContentChain(contentID) + chain, err := r.readStore.GetContentChain(contentID) if storeErr(w, err) { return } @@ -566,7 +566,7 @@ func (r *Relay) readBlob(w http.ResponseWriter, req *http.Request, contentID, re return } - chain, err := r.store.GetContentChain(contentID) + chain, err := r.readStore.GetContentChain(contentID) if storeErr(w, err) { return } @@ -613,7 +613,7 @@ func (r *Relay) readBlob(w http.ResponseWriter, req *http.Request, contentID, re return } - blob, _ := r.store.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: documentCID}) + blob, _ := r.readStore.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: documentCID}) if blob == nil { writeError(w, 404, "blob not found") return @@ -644,7 +644,7 @@ func (r *Relay) handleGetDocuments(w http.ResponseWriter, req *http.Request) { } // verify chain exists - chain, err := r.store.GetContentChain(contentID) + chain, err := r.readStore.GetContentChain(contentID) if storeErr(w, err) { return } @@ -663,7 +663,7 @@ func (r *Relay) handleGetDocuments(w http.ResponseWriter, req *http.Request) { after := req.URL.Query().Get("after") limit := parseLimit(req, 100, 1000) - docs, cursor, err := r.store.GetDocuments(contentID, after, limit) + docs, cursor, err := r.readStore.GetDocuments(contentID, after, limit) if storeErr(w, err) { return } diff --git a/packages/dfos-web-relay-go/store_sqlite.go b/packages/dfos-web-relay-go/store_sqlite.go index 4ab93cf..8398c82 100644 --- a/packages/dfos-web-relay-go/store_sqlite.go +++ b/packages/dfos-web-relay-go/store_sqlite.go @@ -109,10 +109,17 @@ CREATE INDEX IF NOT EXISTS idx_public_credentials_exp ON public_credentials(exp) ` // SQLiteStore is a durable Store backed by SQLite. +// +// The readOnly flag controls readerDB() behavior. When false (default for the +// ingestion store), readerDB() returns the active write transaction so that +// within-batch reads see uncommitted writes. When true (for the HTTP read +// store), readerDB() always returns the WAL read pool — safe for concurrent +// use while ingestion holds a write transaction. type SQLiteStore struct { - db *sql.DB // write connection (single writer) - readDB *sql.DB // read connection pool (concurrent reads) - tx *sql.Tx // active write batch transaction, if any + db *sql.DB // write connection (single writer) + readDB *sql.DB // read connection pool (concurrent reads) + tx *sql.Tx // active write batch transaction, if any + readOnly bool // if true, readerDB() never returns tx } // writerDB returns the active transaction if one exists, otherwise the raw db. @@ -123,15 +130,24 @@ func (s *SQLiteStore) writerDB() dbConn { return s.db } -// readerDB returns the active transaction if one exists (so reads see -// uncommitted writes within a batch), otherwise the read connection pool. +// readerDB returns the read connection to use. For the ingestion store +// (readOnly=false), returns the active transaction if one exists so within- +// batch reads see uncommitted writes. For the HTTP read store (readOnly=true), +// always returns the WAL read pool. func (s *SQLiteStore) readerDB() dbConn { - if s.tx != nil { + if !s.readOnly && s.tx != nil { return s.tx } return s.readDB } +// ReadStore returns a Store that shares this store's database connections but +// always reads from the WAL read pool, never from an active write transaction. +// Use this for HTTP handlers that run concurrently with ingestion. +func (s *SQLiteStore) ReadStore() *SQLiteStore { + return &SQLiteStore{db: s.db, readDB: s.readDB, readOnly: true} +} + // dbConn is the common interface between *sql.DB and *sql.Tx. type dbConn interface { Exec(query string, args ...any) (sql.Result, error)