Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions packages/dfos-cli/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@ func New(baseURL string) *Client {

// RelayInfo is the response from /.well-known/dfos-relay.
type RelayInfo struct {
DID string `json:"did"`
Protocol string `json:"protocol"`
Version string `json:"version"`
Proof bool `json:"proof"`
Content bool `json:"content"`
Profile string `json:"profile,omitempty"`
DID string `json:"did"`
Protocol string `json:"protocol"`
Version string `json:"version"`
Capabilities RelayCapabilities `json:"capabilities"`
Profile string `json:"profile,omitempty"`

// Convenience accessors populated after unmarshal.
Proof bool `json:"-"`
Content bool `json:"-"`
}

// RelayCapabilities are the nested capability flags from the well-known response.
type RelayCapabilities struct {
Proof bool `json:"proof"`
Content bool `json:"content"`
Log bool `json:"log"`
Documents bool `json:"documents"`
}

// IdentityResponse is the response from GET /identities/:did.
Expand Down Expand Up @@ -100,6 +111,8 @@ func (c *Client) GetRelayInfo() (*RelayInfo, error) {
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return nil, err
}
info.Proof = info.Capabilities.Proof
info.Content = info.Capabilities.Content
return &info, nil
}

Expand Down
14 changes: 7 additions & 7 deletions packages/dfos-web-relay-go/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (r *Relay) verifyContentAccess(requesterDID string, creatorDID string, requ
resolveKey := CreateKeyResolver(r.store)

// 2. check stored public credentials
publicCreds, _ := r.store.GetPublicCredentials(requestedResource)
publicCreds, _ := r.readStore.GetPublicCredentials(requestedResource)
for _, credJws := range publicCreds {
if err := r.verifyCredentialForAccess(credJws, resolveKey, requestedResource, action, creatorDID, ""); err == nil {
return ""
Expand Down Expand Up @@ -109,7 +109,7 @@ func (r *Relay) verifyCredentialForAccess(credJws string, resolveKey dfos.KeyRes
issuerDID := kid[:strings.Index(kid, "#")]

// check issuer identity is not deleted
issuerIdentity, _ := r.store.GetIdentityChain(issuerDID)
issuerIdentity, _ := r.readStore.GetIdentityChain(issuerDID)
if issuerIdentity != nil && issuerIdentity.State.IsDeleted {
return fmt.Errorf("credential issuer identity is deleted")
}
Expand All @@ -129,7 +129,7 @@ func (r *Relay) verifyCredentialForAccess(credJws string, resolveKey dfos.KeyRes
}

// check leaf revocation
revoked, _ := r.store.IsCredentialRevoked(verified.Iss, verified.CID)
revoked, _ := r.readStore.IsCredentialRevoked(verified.Iss, verified.CID)
if revoked {
return fmt.Errorf("credential is revoked")
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func (r *Relay) verifyDelegationChain(childJws string, prf []string, childAtt []
parentIssuerDID := pKid[:strings.Index(pKid, "#")]

// check parent issuer identity is not deleted
parentIdentity, _ := r.store.GetIdentityChain(parentIssuerDID)
parentIdentity, _ := r.readStore.GetIdentityChain(parentIssuerDID)
if parentIdentity != nil && parentIdentity.State.IsDeleted {
return fmt.Errorf("parent credential issuer identity is deleted")
}
Expand All @@ -219,7 +219,7 @@ func (r *Relay) verifyDelegationChain(childJws string, prf []string, childAtt []
}

// check revocation at every level
prevoked, _ := r.store.IsCredentialRevoked(pVerified.Iss, pVerified.CID)
prevoked, _ := r.readStore.IsCredentialRevoked(pVerified.Iss, pVerified.CID)
if prevoked {
return fmt.Errorf("parent credential in delegation chain is revoked")
}
Expand Down Expand Up @@ -327,7 +327,7 @@ func (r *Relay) matchesResource(att []dfos.AttEntry, resource string, action str
// manifestLookup resolves which contentIds a manifest indexes by reading its
// head document blob and extracting entries values.
func (r *Relay) manifestLookup(manifestContentID string) []string {
chain, err := r.store.GetContentChain(manifestContentID)
chain, err := r.readStore.GetContentChain(manifestContentID)
if err != nil || chain == nil {
return nil
}
Expand All @@ -336,7 +336,7 @@ func (r *Relay) manifestLookup(manifestContentID string) []string {
}
docCID := *chain.State.CurrentDocumentCID

blob, _ := r.store.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: docCID})
blob, _ := r.readStore.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: docCID})
if blob == nil {
return nil
}
Expand Down
12 changes: 11 additions & 1 deletion packages/dfos-web-relay-go/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ type BatchableStore interface {

// Relay is a DFOS web relay — the core verification and storage engine.
type Relay struct {
store Store
store Store // ingestion store — sees write transactions for within-batch reads
readStore Store // HTTP read store — always uses WAL read pool, never races on tx
did string
profileArtifactJWS string
contentEnabled bool
Expand Down Expand Up @@ -53,8 +54,17 @@ func NewRelay(opts RelayOptions) (*Relay, error) {
logger = slog.Default()
}

// If the store supports it, create a read-only view for HTTP handlers
// that never races on the write transaction. Falls back to the main store
// for non-SQLite backends (e.g. in-memory test store).
readStore := opts.Store
if sqlStore, ok := opts.Store.(*SQLiteStore); ok {
readStore = sqlStore.ReadStore()
}

return &Relay{
store: opts.Store,
readStore: readStore,
did: identity.DID,
profileArtifactJWS: identity.ProfileArtifactJWS,
contentEnabled: contentEnabled,
Expand Down
38 changes: 19 additions & 19 deletions packages/dfos-web-relay-go/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (r *Relay) handlePostOperations(w http.ResponseWriter, req *http.Request) {

func (r *Relay) handleGetOperation(w http.ResponseWriter, req *http.Request) {
cid := req.PathValue("cid")
op, err := r.store.GetOperation(cid)
op, err := r.readStore.GetOperation(cid)
if storeErr(w, err) {
return
}
Expand All @@ -120,7 +120,7 @@ func (r *Relay) handleGetOperation(w http.ResponseWriter, req *http.Request) {

func (r *Relay) handleGetIdentity(w http.ResponseWriter, req *http.Request) {
did := req.PathValue("did")
chain, err := r.store.GetIdentityChain(did)
chain, err := r.readStore.GetIdentityChain(did)
if storeErr(w, err) {
return
}
Expand All @@ -147,7 +147,7 @@ func (r *Relay) handleGetIdentity(w http.ResponseWriter, req *http.Request) {
}
after = *page.Cursor
}
chain, _ = r.store.GetIdentityChain(did)
chain, _ = r.readStore.GetIdentityChain(did)
if chain != nil {
break
}
Expand All @@ -167,7 +167,7 @@ func (r *Relay) handleGetIdentity(w http.ResponseWriter, req *http.Request) {

func (r *Relay) handleIdentityLog(w http.ResponseWriter, req *http.Request) {
did := req.PathValue("did")
chain, err := r.store.GetIdentityChain(did)
chain, err := r.readStore.GetIdentityChain(did)
if storeErr(w, err) {
return
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func (r *Relay) handleIdentityLog(w http.ResponseWriter, req *http.Request) {

func (r *Relay) handleGetContent(w http.ResponseWriter, req *http.Request) {
contentID := req.PathValue("contentId")
chain, err := r.store.GetContentChain(contentID)
chain, err := r.readStore.GetContentChain(contentID)
if storeErr(w, err) {
return
}
Expand All @@ -261,7 +261,7 @@ func (r *Relay) handleGetContent(w http.ResponseWriter, req *http.Request) {
}
after = *page.Cursor
}
chain, _ = r.store.GetContentChain(contentID)
chain, _ = r.readStore.GetContentChain(contentID)
if chain != nil {
break
}
Expand All @@ -282,7 +282,7 @@ func (r *Relay) handleGetContent(w http.ResponseWriter, req *http.Request) {

func (r *Relay) handleContentLog(w http.ResponseWriter, req *http.Request) {
contentID := req.PathValue("contentId")
chain, err := r.store.GetContentChain(contentID)
chain, err := r.readStore.GetContentChain(contentID)
if storeErr(w, err) {
return
}
Expand Down Expand Up @@ -348,12 +348,12 @@ func (r *Relay) handleContentLog(w http.ResponseWriter, req *http.Request) {
func (r *Relay) handleGetCountersignatures(w http.ResponseWriter, req *http.Request) {
cid := req.PathValue("cid")

op, err := r.store.GetOperation(cid)
op, err := r.readStore.GetOperation(cid)
if storeErr(w, err) {
return
}
if op == nil {
cs, csErr := r.store.GetCountersignatures(cid)
cs, csErr := r.readStore.GetCountersignatures(cid)
if storeErr(w, csErr) {
return
}
Expand All @@ -368,7 +368,7 @@ func (r *Relay) handleGetCountersignatures(w http.ResponseWriter, req *http.Requ
return
}

cs, csErr := r.store.GetCountersignatures(cid)
cs, csErr := r.readStore.GetCountersignatures(cid)
if storeErr(w, csErr) {
return
}
Expand All @@ -381,7 +381,7 @@ func (r *Relay) handleGetCountersignatures(w http.ResponseWriter, req *http.Requ
func (r *Relay) handleOperationCountersignatures(w http.ResponseWriter, req *http.Request) {
cid := req.PathValue("cid")

op, err := r.store.GetOperation(cid)
op, err := r.readStore.GetOperation(cid)
if storeErr(w, err) {
return
}
Expand All @@ -390,7 +390,7 @@ func (r *Relay) handleOperationCountersignatures(w http.ResponseWriter, req *htt
return
}

cs, csErr := r.store.GetCountersignatures(cid)
cs, csErr := r.readStore.GetCountersignatures(cid)
if storeErr(w, csErr) {
return
}
Expand All @@ -406,7 +406,7 @@ func (r *Relay) handleOperationCountersignatures(w http.ResponseWriter, req *htt

func (r *Relay) handleGetBeacon(w http.ResponseWriter, req *http.Request) {
did := req.PathValue("did")
beacon, err := r.store.GetBeacon(did)
beacon, err := r.readStore.GetBeacon(did)
if storeErr(w, err) {
return
}
Expand Down Expand Up @@ -435,7 +435,7 @@ func (r *Relay) handleGetLog(w http.ResponseWriter, req *http.Request) {
after := req.URL.Query().Get("after")
limit := parseLimit(req, 100, 1000)

entries, cursor, err := r.store.ReadLog(after, limit)
entries, cursor, err := r.readStore.ReadLog(after, limit)
if storeErr(w, err) {
return
}
Expand Down Expand Up @@ -475,7 +475,7 @@ func (r *Relay) handlePutBlob(w http.ResponseWriter, req *http.Request) {
}

// verify chain exists
chain, err := r.store.GetContentChain(contentID)
chain, err := r.readStore.GetContentChain(contentID)
if storeErr(w, err) {
return
}
Expand Down Expand Up @@ -566,7 +566,7 @@ func (r *Relay) readBlob(w http.ResponseWriter, req *http.Request, contentID, re
return
}

chain, err := r.store.GetContentChain(contentID)
chain, err := r.readStore.GetContentChain(contentID)
if storeErr(w, err) {
return
}
Expand Down Expand Up @@ -613,7 +613,7 @@ func (r *Relay) readBlob(w http.ResponseWriter, req *http.Request, contentID, re
return
}

blob, _ := r.store.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: documentCID})
blob, _ := r.readStore.GetBlob(BlobKey{CreatorDID: chain.State.CreatorDID, DocumentCID: documentCID})
if blob == nil {
writeError(w, 404, "blob not found")
return
Expand Down Expand Up @@ -644,7 +644,7 @@ func (r *Relay) handleGetDocuments(w http.ResponseWriter, req *http.Request) {
}

// verify chain exists
chain, err := r.store.GetContentChain(contentID)
chain, err := r.readStore.GetContentChain(contentID)
if storeErr(w, err) {
return
}
Expand All @@ -663,7 +663,7 @@ func (r *Relay) handleGetDocuments(w http.ResponseWriter, req *http.Request) {
after := req.URL.Query().Get("after")
limit := parseLimit(req, 100, 1000)

docs, cursor, err := r.store.GetDocuments(contentID, after, limit)
docs, cursor, err := r.readStore.GetDocuments(contentID, after, limit)
if storeErr(w, err) {
return
}
Expand Down
28 changes: 22 additions & 6 deletions packages/dfos-web-relay-go/store_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,17 @@ CREATE INDEX IF NOT EXISTS idx_public_credentials_exp ON public_credentials(exp)
`

// SQLiteStore is a durable Store backed by SQLite.
//
// The readOnly flag controls readerDB() behavior. When false (default for the
// ingestion store), readerDB() returns the active write transaction so that
// within-batch reads see uncommitted writes. When true (for the HTTP read
// store), readerDB() always returns the WAL read pool — safe for concurrent
// use while ingestion holds a write transaction.
type SQLiteStore struct {
db *sql.DB // write connection (single writer)
readDB *sql.DB // read connection pool (concurrent reads)
tx *sql.Tx // active write batch transaction, if any
db *sql.DB // write connection (single writer)
readDB *sql.DB // read connection pool (concurrent reads)
tx *sql.Tx // active write batch transaction, if any
readOnly bool // if true, readerDB() never returns tx
}

// writerDB returns the active transaction if one exists, otherwise the raw db.
Expand All @@ -123,15 +130,24 @@ func (s *SQLiteStore) writerDB() dbConn {
return s.db
}

// readerDB returns the active transaction if one exists (so reads see
// uncommitted writes within a batch), otherwise the read connection pool.
// readerDB returns the read connection to use. For the ingestion store
// (readOnly=false), returns the active transaction if one exists so within-
// batch reads see uncommitted writes. For the HTTP read store (readOnly=true),
// always returns the WAL read pool.
func (s *SQLiteStore) readerDB() dbConn {
if s.tx != nil {
if !s.readOnly && s.tx != nil {
return s.tx
}
return s.readDB
}

// ReadStore returns a Store that shares this store's database connections but
// always reads from the WAL read pool, never from an active write transaction.
// Use this for HTTP handlers that run concurrently with ingestion.
func (s *SQLiteStore) ReadStore() *SQLiteStore {
return &SQLiteStore{db: s.db, readDB: s.readDB, readOnly: true}
}

// dbConn is the common interface between *sql.DB and *sql.Tx.
type dbConn interface {
Exec(query string, args ...any) (sql.Result, error)
Expand Down
Loading