Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,10 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
if s.shallowReceipt(op.identityAddress) {
return true, err
}
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil {
// Retry budget exhausted: no peer in the correct neighborhood stored the
// chunk. Report CouldNotSync rather than Synced to avoid falsely marking
// the chunk as delivered.
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to ChunkCouldNotSync, will cause some false reporting because the ChunkCouldNotSync state is not checked and updated in uploadstore.go ->Report function. Maybe you would need to alsi include in that switch statement also ChunkCouldNotSync.

loggerV1.Error(err, "pusher: failed to report sync status")
return true, err
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/pushsync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type metrics struct {
ShallowReceiptDepth *prometheus.CounterVec
ShallowReceipt prometheus.Counter
OverdraftRefresh prometheus.Counter
OutOfDepthStoring prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -153,6 +154,12 @@ func newMetrics() metrics {
Name: "overdraft_refresh",
Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.",
}),
OutOfDepthStoring: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "out_of_depth_storing",
Help: "Total number of times a chunk was refused because it was outside the neighborhood (ErrWantSelf with proximity < radius).",
}),
}
}

Expand Down
43 changes: 34 additions & 9 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,14 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)

switch receipt, err := ps.pushToClosest(ctx, chunk, false); {
case errors.Is(err, topology.ErrWantSelf):
// Only store if we are actually within our neighborhood. If the chunk
// is outside our AOR we would store it in a low bin and unreserve()
// would evict it almost immediately, leaving the chunk nowhere on the
// network while the origin believes it was delivered.
if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you have doubling set on, and the chunk is saved into a sister neighborhood? Should we have here a check based on the committed depth?

ps.metrics.OutOfDepthStoring.Inc()
return ErrOutOfDepthStoring
}
stored, reason = true, "want self"
return store(ctx)
case err == nil:
Expand Down Expand Up @@ -361,10 +369,11 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
ps.metrics.TotalRequests.Inc()

var (
sentErrorsLeft = 1
preemptiveTicker <-chan time.Time
inflight int
parallelForwards = maxMultiplexForwards
sentErrorsLeft = 1
preemptiveTicker <-chan time.Time
inflight int
parallelForwards = maxMultiplexForwards
shallowReceiptResult *pb.Receipt
)

if origin {
Expand Down Expand Up @@ -419,11 +428,19 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
if skip.PruneExpiresAfter(idAddress, overDraftRefresh) == 0 { // no overdraft peers, we have depleted ALL peers
if inflight == 0 {
if ps.fullNode {
// If a peer already has the chunk (even at wrong depth), don't
// store locally — the chunk is closer to its neighbourhood than us.
if shallowReceiptResult != nil {
Comment thread
acud marked this conversation as resolved.
Outdated
return shallowReceiptResult, ErrShallowReceipt
}
if cac.Valid(ch) {
go ps.unwrap(ch)
}
return nil, topology.ErrWantSelf
}
if shallowReceiptResult != nil {
return shallowReceiptResult, ErrShallowReceipt
}
ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err)
return nil, err
}
Expand Down Expand Up @@ -486,12 +503,17 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
return result.receipt, nil
}

switch err := ps.checkReceipt(result.receipt); {
case err == nil:
if err := ps.checkReceipt(result.receipt); err == nil {
return result.receipt, nil
case errors.Is(err, ErrShallowReceipt):
ps.errSkip.Add(idAddress, result.peer, skiplistDur)
return result.receipt, err
} else if errors.Is(err, ErrShallowReceipt) {
// Treat shallow receipt like any other failure: exhaust the full
// error budget and wait for any other inflight parallel pushes
// (e.g. multiplex forwards) before giving up. Only return
// ErrShallowReceipt once the entire budget is spent.
shallowReceiptResult = result.receipt
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you've removed the skiplist addition line that was on the left side of the diff (L493). any particular reason?

Copy link
Copy Markdown
Member Author

@gacevicljubisa gacevicljubisa Mar 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it is not removed, it is on the common failure path at pushsync.go:524

result.err = err
Copy link
Copy Markdown
Contributor

@acud acud Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if I understand this fully, but this statement and the next one in the else block seem like assigned but never used where is result.err used?

} else {
result.err = err
}
}

Expand All @@ -505,6 +527,9 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
}
}

if shallowReceiptResult != nil {
return shallowReceiptResult, ErrShallowReceipt
}
return nil, ErrNoPush
}

Expand Down
78 changes: 64 additions & 14 deletions pkg/pushsync/pushsync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,29 +208,48 @@ func TestSocListener(t *testing.T) {
waitOnRecordAndTest(t, closestPeer, recorder, sch2.Address(), nil)
}

// TestShallowReceipt forces the peer to send back a shallow receipt to a pushsync request. In return, the origin node returns the error along with the received receipt.
// TestShallowReceipt verifies that when a storer node stores a chunk legitimately
// within its own AOR but the origin node has a stricter radius, the origin
// correctly identifies and returns ErrShallowReceipt together with the receipt.
func TestShallowReceipt(t *testing.T) {
t.Parallel()
// chunk data to upload
chunk := testingc.FixtureChunk("7000")

var highPO uint8 = 31
key, err := crypto.GenerateSecp256k1Key()
if err != nil {
t.Fatal(err)
}

// create a pivot node and a mocked closest node
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1
signer := crypto.NewDefaultSigner(key)

// peer is the node responding to the chunk receipt message
// mock should return ErrWantSelf since there's no one to forward to
psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeerErr(topology.ErrWantSelf))
pubKey, err := signer.PublicKey()
if err != nil {
t.Fatal(err)
}

closestPeer, err := crypto.NewOverlayAddress(*pubKey, 1, blockHash.Bytes())
if err != nil {
t.Fatal(err)
}

// Storer stores within its own AOR (proximity > storerRadius → qualifies).
// The origin has a much higher radius, so it always considers the receipt shallow.
storerRadius := 1
chunkProximity := 0
pivotRadius := 31
pivotTolerance := uint8(0)

pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")

chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, chunkProximity)

// storer: proximity > storerRadius → within AOR → stores and sends receipt
psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), 0, mock.WithClosestPeerErr(topology.ErrWantSelf))

recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode))

// pivot node needs the streamer since the chunk is intercepted by
// the chunk worker, then gets sent by opening a new stream
psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeer(closestPeer))
// pivot: stricter radius → origin considers the receipt shallow
psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), pivotTolerance, mock.WithClosestPeer(closestPeer))

// Trigger the sending of chunk to the closest node
receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk)
if !errors.Is(err, pushsync.ErrShallowReceipt) {
t.Fatalf("got %v, want %v", err, pushsync.ErrShallowReceipt)
Expand All @@ -247,6 +266,37 @@ func TestShallowReceipt(t *testing.T) {
waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil)
}

// TestOutOfDepthStoring verifies that when a storer is forced (ErrWantSelf) but
// the chunk is outside its AOR, it refuses to store and returns an error rather
// than storing a chunk it will immediately evict.
func TestOutOfDepthStoring(t *testing.T) {
t.Parallel()

chunk := testingc.FixtureChunk("7000")

var highPO uint8 = 31

// Storer address has very low proximity to the chunk; its radius is highPO.
// It has no closer peers (ErrWantSelf) but MUST refuse to store because
// the chunk is far outside its AOR.
pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000")
closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000")

psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeerErr(topology.ErrWantSelf))

recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode))

psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeer(closestPeer))

_, err := psPivot.PushChunkToClosest(context.Background(), chunk)

// The storer correctly refused to store, so the origin exhausted its peers
// without any shallow receipt. No ErrShallowReceipt should be returned.
if errors.Is(err, pushsync.ErrShallowReceipt) {
t.Fatal("got ErrShallowReceipt, but storer should have refused to store out-of-depth chunk")
}
}

// TestShallowReceiptTolerance sends back a shallow receipt but because of the tolerance level, the origin node accepts the receipts.
func TestShallowReceiptTolerance(t *testing.T) {
t.Parallel()
Expand Down
Loading