diff --git a/pkg/node/node.go b/pkg/node/node.go index 35b78d4dc05..d0f6d050cc2 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -291,7 +291,6 @@ func NewBee( if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > maxAllowedDoubling { return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: %d", maxAllowedDoubling) } - shallowReceiptTolerance := maxAllowedDoubling - o.ReserveCapacityDoubling reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity @@ -1047,7 +1046,7 @@ func NewBee( } } - pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, detector, uint8(shallowReceiptTolerance)) + pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, detector) b.pushSyncCloser = pushSyncProtocol // set the pushSyncer in the PSS diff --git a/pkg/pusher/metrics.go b/pkg/pusher/metrics.go index 0a92951b42b..30c0233e2fa 100644 --- a/pkg/pusher/metrics.go +++ b/pkg/pusher/metrics.go @@ -10,12 +10,13 @@ import ( ) type metrics struct { - TotalToPush prometheus.Counter - TotalSynced prometheus.Counter - TotalErrors prometheus.Counter - MarkAndSweepTime prometheus.Histogram - SyncTime prometheus.Histogram - ErrorTime prometheus.Histogram + TotalToPush prometheus.Counter + TotalSynced prometheus.Counter + TotalCouldNotSync prometheus.Counter + TotalErrors prometheus.Counter + MarkAndSweepTime prometheus.Histogram + SyncTime prometheus.Histogram + ErrorTime prometheus.Histogram } func newMetrics() metrics { @@ -34,6 +35,12 @@ func newMetrics() metrics { Name: "total_synced", Help: "Total chunks synced successfully with valid receipts.", }), + TotalCouldNotSync: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "total_could_not_sync", + Help: "Total chunks abandoned after exhausting retries with no valid receipt (shallow receipt or no peer in the correct neighbourhood).", + }), TotalErrors: prometheus.NewCounter(prometheus.CounterOpts{ Namespace: m.Namespace, Subsystem: subsystem, diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 9c4073f1fcc..9f369176ffe 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -276,11 +276,14 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( loggerV1.Error(err, "pusher: failed reporting chunk") return true, err } + s.attempts.delete(op.identityAddress) case errors.Is(err, pushsync.ErrShallowReceipt): if s.shallowReceipt(op.identityAddress) { return true, err } - if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil { + // budget exhausted; report CouldNotSync so the chunk isn't marked delivered + s.metrics.TotalCouldNotSync.Inc() + if err := s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync); err != nil { loggerV1.Error(err, "pusher: failed to report sync status") return true, err } @@ -289,6 +292,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) ( loggerV1.Error(err, "pusher: failed to report sync status") return true, err } + s.attempts.delete(op.identityAddress) default: loggerV1.Error(err, "pusher: failed PushChunkToClosest") return true, err @@ -330,13 +334,16 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err if err != nil { loggerV1.Error(err, "pusher: failed to store chunk") } + s.attempts.delete(op.identityAddress) case errors.Is(err, pushsync.ErrShallowReceipt): if s.shallowReceipt(op.identityAddress) { return err } - // out of attempts for retry, swallow error - err = nil - case err != nil: + // budget exhausted; propagate err instead of falsely reporting success + s.metrics.TotalCouldNotSync.Inc() + case err == nil: + s.attempts.delete(op.identityAddress) + default: loggerV1.Error(err, "pusher: failed PushChunkToClosest") } diff --git a/pkg/pushsync/metrics.go b/pkg/pushsync/metrics.go index 758a6f072a5..b21bedffa84 100644 --- a/pkg/pushsync/metrics.go +++ b/pkg/pushsync/metrics.go @@ -29,6 +29,7 @@ type metrics struct { ShallowReceiptDepth *prometheus.CounterVec ShallowReceipt prometheus.Counter OverdraftRefresh prometheus.Counter + OutOfDepthStoring prometheus.Counter } func newMetrics() metrics { @@ -153,6 +154,12 @@ func newMetrics() metrics { Name: "overdraft_refresh", Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.", }), + OutOfDepthStoring: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: m.Namespace, + Subsystem: subsystem, + Name: "out_of_depth_storing", + Help: "Total number of times a chunk was refused because it was outside the neighborhood (ErrWantSelf with proximity < radius).", + }), } } diff --git a/pkg/pushsync/pushsync.go b/pkg/pushsync/pushsync.go index 05c4aef3d41..8e4610bb764 100644 --- a/pkg/pushsync/pushsync.go +++ b/pkg/pushsync/pushsync.go @@ -79,27 +79,25 @@ type Storer interface { } type PushSync struct { - address swarm.Address - networkID uint64 - radius func() (uint8, error) - nonce []byte - streamer p2p.StreamerDisconnecter - store Storer - topologyDriver topology.Driver - unwrap func(swarm.Chunk) - gsocHandler func(*soc.SOC) - logger log.Logger - accounting accounting.Interface - pricer pricer.Interface - metrics metrics - tracer *tracing.Tracer - validStamp postage.ValidStampFn - signer crypto.Signer - fullNode bool - errSkip *skippeers.List - stabilizer stabilization.Subscriber - - shallowReceiptTolerance uint8 + address swarm.Address + networkID uint64 + radius func() (uint8, error) + nonce []byte + streamer p2p.StreamerDisconnecter + store Storer + topologyDriver topology.Driver + unwrap func(swarm.Chunk) + gsocHandler func(*soc.SOC) + logger log.Logger + accounting accounting.Interface + pricer pricer.Interface + metrics metrics + tracer *tracing.Tracer + validStamp postage.ValidStampFn + signer crypto.Signer + fullNode bool + errSkip *skippeers.List + stabilizer stabilization.Subscriber overDraftRefreshLimiter *rate.Limiter } @@ -128,7 +126,6 @@ func New( signer crypto.Signer, tracer *tracing.Tracer, stabilizer stabilization.Subscriber, - shallowReceiptTolerance uint8, ) *PushSync { ps := &PushSync{ address: address, @@ -149,7 +146,6 @@ func New( signer: signer, errSkip: skippeers.NewList(time.Minute), stabilizer: stabilizer, - shallowReceiptTolerance: shallowReceiptTolerance, overDraftRefreshLimiter: rate.NewLimiter(rate.Every(time.Second), 1), } @@ -299,6 +295,12 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream) switch receipt, err := ps.pushToClosest(ctx, chunk, false); { case errors.Is(err, topology.ErrWantSelf): + // Out-of-AOR chunks are unreachable via retrieval even when not + // evicted; let the origin try the next peer instead. + if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad { + ps.metrics.OutOfDepthStoring.Inc() + return ErrOutOfDepthStoring + } stored, reason = true, "want self" return store(ctx) case err == nil: @@ -361,10 +363,12 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo ps.metrics.TotalRequests.Inc() var ( - sentErrorsLeft = 1 - preemptiveTicker <-chan time.Time - inflight int - parallelForwards = maxMultiplexForwards + sentErrorsLeft = 1 + preemptiveTicker <-chan time.Time + inflight int + parallelForwards = maxMultiplexForwards + shallowReceiptResult *pb.Receipt + shallowReceiptPO uint8 ) if origin { @@ -415,33 +419,45 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo // For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk. fullSkip := append(skip.ChunkPeers(idAddress), ps.errSkip.ChunkPeers(idAddress)...) peer, err := ps.closestPeer(ch.Address(), origin, fullSkip) - if errors.Is(err, topology.ErrNotFound) { - if skip.PruneExpiresAfter(idAddress, overDraftRefresh) == 0 { // no overdraft peers, we have depleted ALL peers - if inflight == 0 { - if ps.fullNode { - if cac.Valid(ch) { - go ps.unwrap(ch) - } - return nil, topology.ErrWantSelf - } - ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err) - return nil, err + + // ErrWantSelf on a forwarder can mean closer peers exist but are + // overdraft-skipped; wait for refresh before falling back to self. + if errors.Is(err, topology.ErrNotFound) || errors.Is(err, topology.ErrWantSelf) { + if skip.PruneExpiresAfter(idAddress, overDraftRefresh) > 0 { + ps.metrics.OverdraftRefresh.Inc() + if ps.overDraftRefreshLimiter.Allow() { + ps.logger.Debug("sleeping to refresh overdraft balance") } - continue // there is still an inflight request, wait for it's result - } - ps.metrics.OverdraftRefresh.Inc() - if ps.overDraftRefreshLimiter.Allow() { - ps.logger.Debug("sleeping to refresh overdraft balance") + select { + case <-time.After(overDraftRefresh): + retry() + continue + case <-ctx.Done(): + return nil, ctx.Err() + } } + } - select { - case <-time.After(overDraftRefresh): - retry() - continue - case <-ctx.Done(): - return nil, ctx.Err() + if errors.Is(err, topology.ErrNotFound) { + if inflight == 0 { + if ps.fullNode { + if cac.Valid(ch) { + go ps.unwrap(ch) + } + // prefer a shallow peer over self-store when one exists + if shallowReceiptResult != nil { + return shallowReceiptResult, ErrShallowReceipt + } + return nil, topology.ErrWantSelf + } + if shallowReceiptResult != nil { + return shallowReceiptResult, ErrShallowReceipt + } + ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err) + return nil, err } + continue // there is still an inflight request, wait for it's result } if err != nil { @@ -486,12 +502,19 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo return result.receipt, nil } - switch err := ps.checkReceipt(result.receipt); { + // Cache the best (highest-PO) shallow receipt and exhaust the + // budget; surface ErrShallowReceipt only when nothing better lands. + switch po, err := ps.checkReceipt(result.receipt); { case err == nil: return result.receipt, nil case errors.Is(err, ErrShallowReceipt): - ps.errSkip.Add(idAddress, result.peer, skiplistDur) - return result.receipt, err + if shallowReceiptResult == nil || po > shallowReceiptPO { + shallowReceiptResult = result.receipt + shallowReceiptPO = po + } + fallthrough + default: + result.err = err } } @@ -505,6 +528,9 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo } } + if shallowReceiptResult != nil { + return shallowReceiptResult, ErrShallowReceipt + } return nil, ErrNoPush } @@ -564,42 +590,40 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes err = action.Apply() } -func (ps *PushSync) checkReceipt(receipt *pb.Receipt) error { +// checkReceipt validates the receipt and returns the storer-to-chunk PO so +// callers can rank shallow receipts; PO is zero on signature errors. Strict +// po >= rad: a chunk is not synced until it lands within the AOR. +func (ps *PushSync) checkReceipt(receipt *pb.Receipt) (uint8, error) { addr := swarm.NewAddress(receipt.Address) publicKey, err := crypto.Recover(receipt.Signature, addr.Bytes()) if err != nil { - return fmt.Errorf("pushsync: receipt recover: %w", err) + return 0, fmt.Errorf("pushsync: receipt recover: %w", err) } peer, err := crypto.NewOverlayAddress(*publicKey, ps.networkID, receipt.Nonce) if err != nil { - return fmt.Errorf("pushsync: receipt storer address: %w", err) + return 0, fmt.Errorf("pushsync: receipt storer address: %w", err) } po := swarm.Proximity(addr.Bytes(), peer.Bytes()) r, err := ps.radius() if err != nil { - return fmt.Errorf("pushsync: storage radius: %w", err) - } - - var tolerance uint8 - if r >= ps.shallowReceiptTolerance { // check for underflow of uint8 - tolerance = r - ps.shallowReceiptTolerance + return po, fmt.Errorf("pushsync: storage radius: %w", err) } - if po < tolerance || uint32(po) < receipt.StorageRadius { + if po < r || uint32(po) < receipt.StorageRadius { ps.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() ps.metrics.ShallowReceipt.Inc() ps.logger.Debug("shallow receipt", "chunk_address", addr, "peer_address", peer, "proximity_order", po, "peer_radius", receipt.StorageRadius, "self_radius", r) - return ErrShallowReceipt + return po, ErrShallowReceipt } ps.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc() ps.logger.Debug("chunk pushed", "chunk_address", addr, "peer_address", peer, "proximity_order", po) - return nil + return po, nil } func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (receipt *pb.Receipt, err error) { diff --git a/pkg/pushsync/pushsync_integration_test.go b/pkg/pushsync/pushsync_integration_test.go index e9b2fa30f11..001033111b7 100644 --- a/pkg/pushsync/pushsync_integration_test.go +++ b/pkg/pushsync/pushsync_integration_test.go @@ -70,7 +70,6 @@ func TestPushSyncIntegration(t *testing.T) { serverSigner, nil, stabilizationmock.NewSubscriber(true), - 0, ) t.Cleanup(func() { serverPushSync.Close() }) @@ -106,7 +105,6 @@ func TestPushSyncIntegration(t *testing.T) { clientSigner, nil, stabilizationmock.NewSubscriber(true), - 0, ) t.Cleanup(func() { clientPushSync.Close() }) diff --git a/pkg/pushsync/pushsync_test.go b/pkg/pushsync/pushsync_test.go index debc6554f94..09a4b6c329f 100644 --- a/pkg/pushsync/pushsync_test.go +++ b/pkg/pushsync/pushsync_test.go @@ -209,48 +209,11 @@ func TestSocListener(t *testing.T) { waitOnRecordAndTest(t, closestPeer, recorder, sch2.Address(), nil) } -// TestShallowReceipt forces the peer to send back a shallow receipt to a pushsync request. In return, the origin node returns the error along with the received receipt. +// TestShallowReceipt verifies that when a storer node stores a chunk legitimately +// within its own AOR but the origin node has a stricter radius, the origin +// correctly identifies and returns ErrShallowReceipt together with the receipt. func TestShallowReceipt(t *testing.T) { t.Parallel() - // chunk data to upload - chunk := testingc.FixtureChunk("7000") - - var highPO uint8 = 31 - - // create a pivot node and a mocked closest node - pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") // base is 0000 - closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") // binary 0110 -> po 1 - - // peer is the node responding to the chunk receipt message - // mock should return ErrWantSelf since there's no one to forward to - psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) - - recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) - - // pivot node needs the streamer since the chunk is intercepted by - // the chunk worker, then gets sent by opening a new stream - psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, 0, mock.WithClosestPeer(closestPeer)) - - // Trigger the sending of chunk to the closest node - receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) - if !errors.Is(err, pushsync.ErrShallowReceipt) { - t.Fatalf("got %v, want %v", err, pushsync.ErrShallowReceipt) - } - - if !chunk.Address().Equal(receipt.Address) { - t.Fatal("invalid receipt") - } - - // this intercepts the outgoing delivery message - waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data()) - - // this intercepts the incoming receipt message - waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) -} - -// TestShallowReceiptTolerance sends back a shallow receipt but because of the tolerance level, the origin node accepts the receipts. -func TestShallowReceiptTolerance(t *testing.T) { - t.Parallel() key, err := crypto.GenerateSecp256k1Key() if err != nil { @@ -269,38 +232,32 @@ func TestShallowReceiptTolerance(t *testing.T) { t.Fatal(err) } - storerRadius := 2 - chunkProximity := 2 - - pivotRadius := 4 - pivotTolerance := uint8(2) + // Storer stores within its own AOR (proximity > storerRadius → qualifies). + // The origin has a much higher radius, so it always considers the receipt shallow. + storerRadius := 1 + chunkProximity := 0 + pivotRadius := 31 - // create a pivot node and a mocked closest node pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") chunk := testingc.GenerateValidRandomChunkAt(t, closestPeer, chunkProximity) - // peer is the node responding to the chunk receipt message - // mock should return ErrWantSelf since there's no one to forward to - psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), 0, mock.WithClosestPeerErr(topology.ErrWantSelf)) + // storer: proximity > storerRadius → within AOR → stores and sends receipt + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, signer, uint8(storerRadius), mock.WithClosestPeerErr(topology.ErrWantSelf)) recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) - // pivot node needs the streamer since the chunk is intercepted by - // the chunk worker, then gets sent by opening a new stream - psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), pivotTolerance, mock.WithClosestPeer(closestPeer)) + // pivot: stricter radius → origin considers the receipt shallow + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, nil, uint8(pivotRadius), mock.WithClosestPeer(closestPeer)) - // Trigger the sending of chunk to the closest node receipt, err := psPivot.PushChunkToClosest(context.Background(), chunk) + if !errors.Is(err, pushsync.ErrShallowReceipt) { + t.Fatalf("got %v, want %v", err, pushsync.ErrShallowReceipt) + } + if !chunk.Address().Equal(receipt.Address) { t.Fatal("invalid receipt") } - if err != nil { - t.Fatalf("got %v, want %v", err, nil) - } - if got := swarm.Proximity(receipt.Address.Bytes(), closestPeer.Bytes()); got < uint8(chunkProximity) { - t.Fatalf("got %v, want at least %v", got, chunkProximity) - } // this intercepts the outgoing delivery message waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), chunk.Data()) @@ -309,6 +266,37 @@ func TestShallowReceiptTolerance(t *testing.T) { waitOnRecordAndTest(t, closestPeer, recorder, chunk.Address(), nil) } +// TestOutOfDepthStoring verifies that when a storer is forced (ErrWantSelf) but +// the chunk is outside its AOR, it refuses to store and returns an error rather +// than storing a chunk it will immediately evict. +func TestOutOfDepthStoring(t *testing.T) { + t.Parallel() + + chunk := testingc.FixtureChunk("7000") + + var highPO uint8 = 31 + + // Storer address has very low proximity to the chunk; its radius is highPO. + // It has no closer peers (ErrWantSelf) but MUST refuse to store because + // the chunk is far outside its AOR. + pivotNode := swarm.MustParseHexAddress("0000000000000000000000000000000000000000000000000000000000000000") + closestPeer := swarm.MustParseHexAddress("6000000000000000000000000000000000000000000000000000000000000000") + + psPeer, _ := createPushSyncNodeWithRadius(t, closestPeer, defaultPrices, nil, nil, defaultSigner(chunk), highPO, mock.WithClosestPeerErr(topology.ErrWantSelf)) + + recorder := streamtest.New(streamtest.WithProtocols(psPeer.Protocol()), streamtest.WithBaseAddr(pivotNode)) + + psPivot, _ := createPushSyncNodeWithRadius(t, pivotNode, defaultPrices, recorder, nil, defaultSigner(chunk), highPO, mock.WithClosestPeer(closestPeer)) + + _, err := psPivot.PushChunkToClosest(context.Background(), chunk) + + // The storer correctly refused to store, so the origin exhausted its peers + // and falls back to ErrWantSelf (full node with no remaining peers). + if !errors.Is(err, topology.ErrWantSelf) { + t.Fatalf("got %v, want %v", err, topology.ErrWantSelf) + } +} + // TestPushChunkToClosest tests the sending of chunk to closest peer from the origination source perspective. // it also checks whether the tags are incremented properly if they are present func TestPushChunkToClosest(t *testing.T) { @@ -1010,7 +998,6 @@ func createPushSyncNodeWithRadius( unwrap func(swarm.Chunk), signer crypto.Signer, radius uint8, - shallowReceiptTolerance uint8, mockOpts ...mock.Option, ) (*pushsync.PushSync, *testStorer) { t.Helper() @@ -1033,7 +1020,7 @@ func createPushSyncNodeWithRadius( radiusFunc := func() (uint8, error) { return radius, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, stabilmock.NewSubscriber(true), shallowReceiptTolerance) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, func(*soc.SOC) {}, validStamp, log.Noop, accountingmock.NewAccounting(), mockPricer, signer, nil, stabilmock.NewSubscriber(true)) t.Cleanup(func() { ps.Close() }) return ps, storer @@ -1074,7 +1061,7 @@ func createPushSyncNodeWithAccounting( radiusFunc := func() (uint8, error) { return 0, nil } - ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, gsocListener, validStamp, logger, acct, mockPricer, signer, nil, stabilmock.NewSubscriber(true), 0) + ps := pushsync.New(addr, 1, blockHash.Bytes(), recorderDisconnecter, storer, radiusFunc, mockTopology, true, unwrap, gsocListener, validStamp, logger, acct, mockPricer, signer, nil, stabilmock.NewSubscriber(true)) t.Cleanup(func() { ps.Close() }) return ps, storer diff --git a/pkg/storer/internal/upload/uploadstore.go b/pkg/storer/internal/upload/uploadstore.go index 51e99fa16d3..38d53d97460 100644 --- a/pkg/storer/internal/upload/uploadstore.go +++ b/pkg/storer/internal/upload/uploadstore.go @@ -611,6 +611,9 @@ func Report(ctx context.Context, st transaction.Store, chunk swarm.Chunk, state ti.Synced++ case storage.ChunkSynced: ti.Synced++ + case storage.ChunkCouldNotSync: + // no Synced bump: failure is observable via Split-Synced and the + // pusher's total_could_not_sync metric } err = indexStore.Put(ti)