Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ func NewBee(
if o.ReserveCapacityDoubling < 0 || o.ReserveCapacityDoubling > maxAllowedDoubling {
return nil, fmt.Errorf("config reserve capacity doubling has to be between default: 0 and maximum: %d", maxAllowedDoubling)
}
shallowReceiptTolerance := maxAllowedDoubling - o.ReserveCapacityDoubling

reserveCapacity := (1 << o.ReserveCapacityDoubling) * storer.DefaultReserveCapacity

Expand Down Expand Up @@ -1047,7 +1046,7 @@ func NewBee(
}
}

pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, detector, uint8(shallowReceiptTolerance))
pushSyncProtocol := pushsync.New(swarmAddress, networkID, nonce, p2ps, localStore, waitNetworkRFunc, kad, o.FullNodeMode && !o.BootnodeMode, pssService.TryUnwrap, gsocService.Handle, validStamp, logger, acc, pricer, signer, tracer, detector)
b.pushSyncCloser = pushSyncProtocol

// set the pushSyncer in the PSS
Expand Down
19 changes: 13 additions & 6 deletions pkg/pusher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
)

type metrics struct {
TotalToPush prometheus.Counter
TotalSynced prometheus.Counter
TotalErrors prometheus.Counter
MarkAndSweepTime prometheus.Histogram
SyncTime prometheus.Histogram
ErrorTime prometheus.Histogram
TotalToPush prometheus.Counter
TotalSynced prometheus.Counter
TotalCouldNotSync prometheus.Counter
TotalErrors prometheus.Counter
MarkAndSweepTime prometheus.Histogram
SyncTime prometheus.Histogram
ErrorTime prometheus.Histogram
}

func newMetrics() metrics {
Expand All @@ -34,6 +35,12 @@ func newMetrics() metrics {
Name: "total_synced",
Help: "Total chunks synced successfully with valid receipts.",
}),
TotalCouldNotSync: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "total_could_not_sync",
Help: "Total chunks abandoned after exhausting retries with no valid receipt (shallow receipt or no peer in the correct neighbourhood).",
}),
TotalErrors: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Expand Down
15 changes: 11 additions & 4 deletions pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,14 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
loggerV1.Error(err, "pusher: failed reporting chunk")
return true, err
}
s.attempts.delete(op.identityAddress)
case errors.Is(err, pushsync.ErrShallowReceipt):
if s.shallowReceipt(op.identityAddress) {
return true, err
}
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkSynced); err != nil {
// budget exhausted; report CouldNotSync so the chunk isn't marked delivered
s.metrics.TotalCouldNotSync.Inc()
if err := s.storer.Report(ctx, op.Chunk, storage.ChunkCouldNotSync); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing to ChunkCouldNotSync, will cause some false reporting because the ChunkCouldNotSync state is not checked and updated in uploadstore.go ->Report function. Maybe you would need to alsi include in that switch statement also ChunkCouldNotSync.

loggerV1.Error(err, "pusher: failed to report sync status")
return true, err
}
Expand All @@ -289,6 +292,7 @@ func (s *Service) pushDeferred(ctx context.Context, logger log.Logger, op *Op) (
loggerV1.Error(err, "pusher: failed to report sync status")
return true, err
}
s.attempts.delete(op.identityAddress)
default:
loggerV1.Error(err, "pusher: failed PushChunkToClosest")
return true, err
Expand Down Expand Up @@ -330,13 +334,16 @@ func (s *Service) pushDirect(ctx context.Context, logger log.Logger, op *Op) err
if err != nil {
loggerV1.Error(err, "pusher: failed to store chunk")
}
s.attempts.delete(op.identityAddress)
case errors.Is(err, pushsync.ErrShallowReceipt):
if s.shallowReceipt(op.identityAddress) {
return err
}
// out of attempts for retry, swallow error
err = nil
case err != nil:
// budget exhausted; propagate err instead of falsely reporting success
s.metrics.TotalCouldNotSync.Inc()
case err == nil:
s.attempts.delete(op.identityAddress)
default:
loggerV1.Error(err, "pusher: failed PushChunkToClosest")
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/pushsync/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type metrics struct {
ShallowReceiptDepth *prometheus.CounterVec
ShallowReceipt prometheus.Counter
OverdraftRefresh prometheus.Counter
OutOfDepthStoring prometheus.Counter
}

func newMetrics() metrics {
Expand Down Expand Up @@ -153,6 +154,12 @@ func newMetrics() metrics {
Name: "overdraft_refresh",
Help: "Total number of times peers were skipped due to overdraft, requiring a wait to refresh balance.",
}),
OutOfDepthStoring: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.Namespace,
Subsystem: subsystem,
Name: "out_of_depth_storing",
Help: "Total number of times a chunk was refused because it was outside the neighborhood (ErrWantSelf with proximity < radius).",
}),
}
}

Expand Down
152 changes: 88 additions & 64 deletions pkg/pushsync/pushsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,27 +79,25 @@ type Storer interface {
}

type PushSync struct {
address swarm.Address
networkID uint64
radius func() (uint8, error)
nonce []byte
streamer p2p.StreamerDisconnecter
store Storer
topologyDriver topology.Driver
unwrap func(swarm.Chunk)
gsocHandler func(*soc.SOC)
logger log.Logger
accounting accounting.Interface
pricer pricer.Interface
metrics metrics
tracer *tracing.Tracer
validStamp postage.ValidStampFn
signer crypto.Signer
fullNode bool
errSkip *skippeers.List
stabilizer stabilization.Subscriber

shallowReceiptTolerance uint8
address swarm.Address
networkID uint64
radius func() (uint8, error)
nonce []byte
streamer p2p.StreamerDisconnecter
store Storer
topologyDriver topology.Driver
unwrap func(swarm.Chunk)
gsocHandler func(*soc.SOC)
logger log.Logger
accounting accounting.Interface
pricer pricer.Interface
metrics metrics
tracer *tracing.Tracer
validStamp postage.ValidStampFn
signer crypto.Signer
fullNode bool
errSkip *skippeers.List
stabilizer stabilization.Subscriber
overDraftRefreshLimiter *rate.Limiter
}

Expand Down Expand Up @@ -128,7 +126,6 @@ func New(
signer crypto.Signer,
tracer *tracing.Tracer,
stabilizer stabilization.Subscriber,
shallowReceiptTolerance uint8,
) *PushSync {
ps := &PushSync{
address: address,
Expand All @@ -149,7 +146,6 @@ func New(
signer: signer,
errSkip: skippeers.NewList(time.Minute),
stabilizer: stabilizer,
shallowReceiptTolerance: shallowReceiptTolerance,
overDraftRefreshLimiter: rate.NewLimiter(rate.Every(time.Second), 1),
}

Expand Down Expand Up @@ -299,6 +295,12 @@ func (ps *PushSync) handler(ctx context.Context, p p2p.Peer, stream p2p.Stream)

switch receipt, err := ps.pushToClosest(ctx, chunk, false); {
case errors.Is(err, topology.ErrWantSelf):
// Out-of-AOR chunks are unreachable via retrieval even when not
// evicted; let the origin try the next peer instead.
if swarm.Proximity(ps.address.Bytes(), chunkAddress.Bytes()) < rad {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you have doubling set on, and the chunk is saved into a sister neighborhood? Should we have here a check based on the committed depth?

ps.metrics.OutOfDepthStoring.Inc()
return ErrOutOfDepthStoring
}
stored, reason = true, "want self"
return store(ctx)
case err == nil:
Expand Down Expand Up @@ -361,10 +363,12 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
ps.metrics.TotalRequests.Inc()

var (
sentErrorsLeft = 1
preemptiveTicker <-chan time.Time
inflight int
parallelForwards = maxMultiplexForwards
sentErrorsLeft = 1
preemptiveTicker <-chan time.Time
inflight int
parallelForwards = maxMultiplexForwards
shallowReceiptResult *pb.Receipt
shallowReceiptPO uint8
)

if origin {
Expand Down Expand Up @@ -415,33 +419,45 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
// For non-origin peers, if the chunk is not within depth, they may store the chunk if they are the closest peer to the chunk.
fullSkip := append(skip.ChunkPeers(idAddress), ps.errSkip.ChunkPeers(idAddress)...)
peer, err := ps.closestPeer(ch.Address(), origin, fullSkip)
if errors.Is(err, topology.ErrNotFound) {
if skip.PruneExpiresAfter(idAddress, overDraftRefresh) == 0 { // no overdraft peers, we have depleted ALL peers
if inflight == 0 {
if ps.fullNode {
if cac.Valid(ch) {
go ps.unwrap(ch)
}
return nil, topology.ErrWantSelf
}
ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err)
return nil, err

// ErrWantSelf on a forwarder can mean closer peers exist but are
// overdraft-skipped; wait for refresh before falling back to self.
if errors.Is(err, topology.ErrNotFound) || errors.Is(err, topology.ErrWantSelf) {
if skip.PruneExpiresAfter(idAddress, overDraftRefresh) > 0 {
ps.metrics.OverdraftRefresh.Inc()
if ps.overDraftRefreshLimiter.Allow() {
ps.logger.Debug("sleeping to refresh overdraft balance")
}
continue // there is still an inflight request, wait for it's result
}

ps.metrics.OverdraftRefresh.Inc()
if ps.overDraftRefreshLimiter.Allow() {
ps.logger.Debug("sleeping to refresh overdraft balance")
select {
case <-time.After(overDraftRefresh):
retry()
continue
case <-ctx.Done():
return nil, ctx.Err()
}
}
}

select {
case <-time.After(overDraftRefresh):
retry()
continue
case <-ctx.Done():
return nil, ctx.Err()
if errors.Is(err, topology.ErrNotFound) {
if inflight == 0 {
if ps.fullNode {
if cac.Valid(ch) {
go ps.unwrap(ch)
}
// prefer a shallow peer over self-store when one exists
if shallowReceiptResult != nil {
return shallowReceiptResult, ErrShallowReceipt
}
return nil, topology.ErrWantSelf
}
if shallowReceiptResult != nil {
return shallowReceiptResult, ErrShallowReceipt
}
ps.logger.Debug("no peers left", "chunk_address", ch.Address(), "error", err)
return nil, err
}
continue // there is still an inflight request, wait for it's result
}

if err != nil {
Expand Down Expand Up @@ -486,12 +502,19 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
return result.receipt, nil
}

switch err := ps.checkReceipt(result.receipt); {
// Cache the best (highest-PO) shallow receipt and exhaust the
// budget; surface ErrShallowReceipt only when nothing better lands.
switch po, err := ps.checkReceipt(result.receipt); {
case err == nil:
return result.receipt, nil
case errors.Is(err, ErrShallowReceipt):
ps.errSkip.Add(idAddress, result.peer, skiplistDur)
return result.receipt, err
if shallowReceiptResult == nil || po > shallowReceiptPO {
shallowReceiptResult = result.receipt
shallowReceiptPO = po
}
fallthrough
default:
result.err = err
}
}

Expand All @@ -505,6 +528,9 @@ func (ps *PushSync) pushToClosest(ctx context.Context, ch swarm.Chunk, origin bo
}
}

if shallowReceiptResult != nil {
return shallowReceiptResult, ErrShallowReceipt
}
return nil, ErrNoPush
}

Expand Down Expand Up @@ -564,42 +590,40 @@ func (ps *PushSync) push(parentCtx context.Context, resultChan chan<- receiptRes
err = action.Apply()
}

func (ps *PushSync) checkReceipt(receipt *pb.Receipt) error {
// checkReceipt validates the receipt and returns the storer-to-chunk PO so
// callers can rank shallow receipts; PO is zero on signature errors. Strict
// po >= rad: a chunk is not synced until it lands within the AOR.
func (ps *PushSync) checkReceipt(receipt *pb.Receipt) (uint8, error) {
addr := swarm.NewAddress(receipt.Address)

publicKey, err := crypto.Recover(receipt.Signature, addr.Bytes())
if err != nil {
return fmt.Errorf("pushsync: receipt recover: %w", err)
return 0, fmt.Errorf("pushsync: receipt recover: %w", err)
}

peer, err := crypto.NewOverlayAddress(*publicKey, ps.networkID, receipt.Nonce)
if err != nil {
return fmt.Errorf("pushsync: receipt storer address: %w", err)
return 0, fmt.Errorf("pushsync: receipt storer address: %w", err)
}

po := swarm.Proximity(addr.Bytes(), peer.Bytes())

r, err := ps.radius()
if err != nil {
return fmt.Errorf("pushsync: storage radius: %w", err)
}

var tolerance uint8
if r >= ps.shallowReceiptTolerance { // check for underflow of uint8
tolerance = r - ps.shallowReceiptTolerance
return po, fmt.Errorf("pushsync: storage radius: %w", err)
}

if po < tolerance || uint32(po) < receipt.StorageRadius {
if po < r || uint32(po) < receipt.StorageRadius {
ps.metrics.ShallowReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
ps.metrics.ShallowReceipt.Inc()
ps.logger.Debug("shallow receipt", "chunk_address", addr, "peer_address", peer, "proximity_order", po, "peer_radius", receipt.StorageRadius, "self_radius", r)
return ErrShallowReceipt
return po, ErrShallowReceipt
}

ps.metrics.ReceiptDepth.WithLabelValues(strconv.Itoa(int(po))).Inc()
ps.logger.Debug("chunk pushed", "chunk_address", addr, "peer_address", peer, "proximity_order", po)

return nil
return po, nil
}

func (ps *PushSync) pushChunkToPeer(ctx context.Context, peer swarm.Address, ch swarm.Chunk) (receipt *pb.Receipt, err error) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/pushsync/pushsync_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func TestPushSyncIntegration(t *testing.T) {
serverSigner,
nil,
stabilizationmock.NewSubscriber(true),
0,
)
t.Cleanup(func() { serverPushSync.Close() })

Expand Down Expand Up @@ -106,7 +105,6 @@ func TestPushSyncIntegration(t *testing.T) {
clientSigner,
nil,
stabilizationmock.NewSubscriber(true),
0,
)
t.Cleanup(func() { clientPushSync.Close() })

Expand Down
Loading
Loading