diff --git a/config/local.yaml b/config/local.yaml index 738978be9..79f2e1d88 100644 --- a/config/local.yaml +++ b/config/local.yaml @@ -106,7 +106,7 @@ clusters: node-groups: local: _inherit: "" - image: k3d-registry.localhost:5000/ethersphere/bee:latest + image: ethersphere/bee:latest image-pull-policy: Always ingress-annotations: nginx.ingress.kubernetes.io/affinity: "cookie" @@ -142,6 +142,10 @@ node-groups: p2p-wss-node-port: 31635 local-gc: _inherit: "local" + local-test: + _inherit: "local" + image: ethersphere/bee:master-scenario-b + image-pull-policy: IfNotPresent local-light: _inherit: "local" @@ -155,7 +159,7 @@ bee-configs: autotls-domain: "local.test" autotls-registration-endpoint: http://p2p-forge.local.svc.cluster.local:8080 block-time: 1 - blockchain-rpc-endpoint: "ws://geth-swap:8546" + blockchain-rpc-endpoint: "ws://10.43.138.185:8546" bootnode-mode: false bootnodes: "" cache-capacity: 20000 @@ -223,6 +227,9 @@ bee-configs: _inherit: "bee-local" bootnode: /dnsaddr/localhost full-node: false + bee-local-test: + _inherit: "bee-local" + cache-capacity: 200 bee-local-gc: _inherit: "bee-local" cache-capacity: 10 @@ -465,3 +472,14 @@ checks: forge-dns-address: "127.0.0.1:30053" # When running inside cluster, use p2p-forge.local.svc.cluster.local:53 forge-tls-host-address: "" # When running locally, use 127.0.0.1:31635 pebble-mgmt-url: "https://127.0.0.1:31500/roots/0" # When running inside cluster, use https://pebble.local.svc.cluster.local:15000/roots/0 + ci-radius-decrease: + options: + upload-size-mb: 4 + overflow-timeout: 5m + cascade-timeout: 2m + recovery-timeout: 20m + postage-label: radius-decrease-check + postage-amount: 1 + postage-depth: 17 + timeout: 28m + type: radius-decrease diff --git a/pkg/check/radiusdecrease/check.go b/pkg/check/radiusdecrease/check.go new file mode 100644 index 000000000..6bc942518 --- /dev/null +++ b/pkg/check/radiusdecrease/check.go @@ -0,0 +1,241 @@ +// Package radiusdecrease checks that puller workers recover promptly after a +// storage-radius decrease. +// +// When the reserve worker decreases the storage radius it calls manage(), +// which calls disconnectPeer() for every current peer. If disconnectPeer() +// blocks while holding syncPeersMtx, manage() freezes for as long as the +// sync goroutines take to finish — indefinitely if the peers are still alive. +// This check verifies that manage() completes and workers restart within +// RecoveryTimeout after the radius decrease. +// +// # Required test binary +// +// This check MUST run against a bee binary built with the three +// radius-decrease CI patches (see bee/.github/patches/radius_decrease_*.patch): +// +// - DefaultReserveCapacity = 200 (pkg/storer/storer.go) +// - ReserveWakeUpDuration = 10s (pkg/storer/storer.go) +// - threshold(capacity) = capacity (pkg/storer/reserve.go, 100 % not 50 %) +// +// Without these patches the reserve is 4.2 M chunks and a radius decrease +// cannot be triggered in CI time. +package radiusdecrease + +import ( + "context" + "fmt" + "time" + + "github.com/ethersphere/beekeeper/pkg/bee/api" + "github.com/ethersphere/beekeeper/pkg/beekeeper" + "github.com/ethersphere/beekeeper/pkg/logging" + "github.com/ethersphere/beekeeper/pkg/orchestration" + "github.com/ethersphere/beekeeper/pkg/random" +) + +// Options holds tunable parameters for the check. +type Options struct { + // UploadSizeMB is the total upload volume. With 3 nodes and capacity=200 + // chunks, ≥4 MB guarantees at least one node overflows via pushsync routing. + UploadSizeMB int + // OverflowTimeout is the maximum time to wait for StorageRadius to reach 1 + // (reserve overflow confirmed) on any node. + OverflowTimeout time.Duration + // CascadeTimeout is the maximum time to wait for StorageRadius to fall back + // to 0 (radius-decrease cascade triggered) after overflow. + CascadeTimeout time.Duration + // RecoveryTimeout is the maximum time to wait for PullsyncRate > 0 after + // the radius decrease. A timeout here indicates that manage() is blocked + // and workers cannot restart. + RecoveryTimeout time.Duration + PostageLabel string + PostageAmount int64 + PostageDepth uint64 + Seed int64 +} + +// NewDefaultOptions returns sensible defaults for CI. +func NewDefaultOptions() Options { + return Options{ + UploadSizeMB: 4, + OverflowTimeout: 5 * time.Minute, + CascadeTimeout: 2 * time.Minute, + RecoveryTimeout: 20 * time.Minute, + PostageLabel: "radius-decrease-check", + PostageAmount: 1, + PostageDepth: 17, + Seed: 0, + } +} + +// compile-time interface check +var _ beekeeper.Action = (*Check)(nil) + +// Check is the beekeeper action that tests puller recovery after radius decrease. +type Check struct { + logger logging.Logger +} + +// NewCheck returns a new Check instance. +func NewCheck(logger logging.Logger) beekeeper.Action { + return &Check{logger: logger} +} + +// Run executes the radius-decrease liveness check. +func (c *Check) Run(ctx context.Context, cluster orchestration.Cluster, opts any) error { + o, ok := opts.(Options) + if !ok { + return fmt.Errorf("invalid options type") + } + + rnd := random.PseudoGenerator(o.Seed) + + uploadNode, err := cluster.RandomNode(ctx, rnd) + if err != nil { + return fmt.Errorf("random node: %w", err) + } + c.logger.Infof("upload node: %s", uploadNode.Name()) + + // Flat list of all nodes for monitoring. + allNodes := flatNodes(cluster) + c.logger.Infof("monitoring %d nodes", len(allNodes)) + + // Buy stamp and wait for usable (built-in poll inside CreatePostageBatch). + batchID, err := uploadNode.Client().CreatePostageBatch(ctx, o.PostageAmount, o.PostageDepth, o.PostageLabel, false) + if err != nil { + return fmt.Errorf("create postage batch: %w", err) + } + c.logger.Infof("postage batch ready: %s", batchID) + + // Pre-condition: all nodes must be at StorageRadius 0. + for name, n := range allNodes { + s, err := n.Client().Status(ctx) + if err != nil { + return fmt.Errorf("pre-check status node %s: %w", name, err) + } + if s.StorageRadius != 0 { + return fmt.Errorf("pre-condition failed: node %s has StorageRadius %d, want 0", name, s.StorageRadius) + } + } + + // Seed the reserve with random bytes. We upload in 512 KB blocks so we + // don't allocate a single large buffer. Content is simple XOR so each + // block has a unique address. + c.logger.Infof("uploading %d MB to seed reserve …", o.UploadSizeMB) + totalBytes := o.UploadSizeMB * 1024 * 1024 + blockSize := 512 * 1024 + uploadOpts := api.UploadOptions{BatchID: batchID} + for uploaded := 0; uploaded < totalBytes; uploaded += blockSize { + size := blockSize + if uploaded+blockSize > totalBytes { + size = totalBytes - uploaded + } + data := make([]byte, size) + for i := range data { + data[i] = byte(uploaded>>8) ^ byte(i) + } + if _, err := uploadNode.Client().UploadBytes(ctx, data, uploadOpts); err != nil { + return fmt.Errorf("upload at offset %d: %w", uploaded, err) + } + } + c.logger.Info("upload complete") + + // Phase 1: wait for StorageRadius to reach 1 on any node (overflow). + c.logger.Info("waiting for reserve overflow (StorageRadius = 1) …") + overflowNodeName, err := waitForRadius(ctx, allNodes, 1, o.OverflowTimeout) + if err != nil { + return fmt.Errorf("overflow phase: %w", err) + } + overflowNode := allNodes[overflowNodeName] + c.logger.Infof("overflow on node %s", overflowNodeName) + + // Phase 2: wait for StorageRadius to fall back to 0 (radius decrease). + // With ReserveWakeUpDuration=10s and threshold=100%, this fires within ~20s + // once the pullsync rate drops to 0 after the initial sync burst. + c.logger.Info("waiting for radius decrease (StorageRadius = 0) …") + if _, err := waitForRadius(ctx, map[string]orchestration.Node{overflowNodeName: overflowNode}, 0, o.CascadeTimeout); err != nil { + return fmt.Errorf("cascade phase: %w", err) + } + c.logger.Infof("radius decreased on node %s — disconnectPeer() called for all live peers", overflowNodeName) + + // Phase 3: measure worker recovery. After the radius decrease, manage() + // should reconnect peers and restart sync workers. PullsyncRate > 0 + // confirms workers are running. A timeout here means manage() is stuck. + c.logger.Infof("waiting up to %s for PullsyncRate > 0 on node %s …", o.RecoveryTimeout, overflowNodeName) + if err := waitForRecovery(ctx, overflowNodeName, overflowNode, o.RecoveryTimeout, c.logger); err != nil { + return err + } + + c.logger.Info("puller workers recovered after radius decrease — liveness confirmed") + return nil +} + +// flatNodes returns a name→Node map combining every node group in the cluster. +func flatNodes(cluster orchestration.Cluster) map[string]orchestration.Node { + return cluster.Nodes() +} + +// waitForRadius polls every node in the supplied map every 2 s until any +// node's StorageRadius equals target. Returns the name of the first matching +// node, or an error if the timeout elapses. +func waitForRadius(ctx context.Context, nodes map[string]orchestration.Node, target uint8, timeout time.Duration) (string, error) { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + for name, n := range nodes { + s, err := n.Client().Status(ctx) + if err != nil { + continue + } + if s.StorageRadius == target { + return name, nil + } + } + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-time.After(2 * time.Second): + } + } + return "", fmt.Errorf("timeout after %s waiting for StorageRadius = %d", timeout, target) +} + +// waitForRecovery polls the named node every second until PullsyncRate > 0 +// or the timeout elapses. It logs progress every 10 s so the CI log shows +// the freeze duration rather than an apparent hang. +func waitForRecovery(ctx context.Context, name string, node orchestration.Node, timeout time.Duration, logger logging.Logger) error { + deadline := time.Now().Add(timeout) + start := time.Now() + lastLog := time.Now() + + for time.Now().Before(deadline) { + s, err := node.Client().Status(ctx) + if err == nil && s.PullsyncRate > 0 { + logger.Infof("node %s: PullsyncRate = %.4f after %s — workers recovered", + name, s.PullsyncRate, time.Since(start).Round(time.Millisecond)) + return nil + } + + if time.Since(lastLog) >= 10*time.Second { + rate := 0.0 + if err == nil { + rate = s.PullsyncRate + } + logger.Infof("node %s: waiting for recovery — PullsyncRate = %.4f, elapsed = %s", + name, rate, time.Since(start).Round(time.Second)) + lastLog = time.Now() + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + } + } + + return fmt.Errorf( + "node %s: workers did not recover within %s after radius decrease "+ + "(PullsyncRate stayed at 0) — manage() goroutine may be blocked in disconnectPeer(); "+ + "see pkg/puller/puller.go", + name, timeout, + ) +} diff --git a/pkg/config/check.go b/pkg/config/check.go index 06332fb17..6f2204e76 100644 --- a/pkg/config/check.go +++ b/pkg/config/check.go @@ -29,6 +29,7 @@ import ( "github.com/ethersphere/beekeeper/pkg/check/pss" "github.com/ethersphere/beekeeper/pkg/check/pullsync" "github.com/ethersphere/beekeeper/pkg/check/pushsync" + "github.com/ethersphere/beekeeper/pkg/check/radiusdecrease" "github.com/ethersphere/beekeeper/pkg/check/redundancy" "github.com/ethersphere/beekeeper/pkg/check/retrieval" "github.com/ethersphere/beekeeper/pkg/check/settlements" @@ -644,6 +645,31 @@ var Checks = map[string]CheckType{ return opts, nil }, }, + "radius-decrease": { + NewAction: radiusdecrease.NewCheck, + NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (any, error) { + checkOpts := new(struct { + UploadSizeMB *int `yaml:"upload-size-mb"` + OverflowTimeout *time.Duration `yaml:"overflow-timeout"` + CascadeTimeout *time.Duration `yaml:"cascade-timeout"` + RecoveryTimeout *time.Duration `yaml:"recovery-timeout"` + PostageLabel *string `yaml:"postage-label"` + PostageAmount *int64 `yaml:"postage-amount"` + PostageDepth *uint64 `yaml:"postage-depth"` + Seed *int64 `yaml:"seed"` + }) + if err := check.Options.Decode(checkOpts); err != nil { + return nil, fmt.Errorf("decoding check %s options: %w", check.Type, err) + } + opts := radiusdecrease.NewDefaultOptions() + + if err := applyCheckConfig(checkGlobalConfig, checkOpts, &opts); err != nil { + return nil, fmt.Errorf("applying options: %w", err) + } + + return opts, nil + }, + }, "withdraw": { NewAction: withdraw.NewCheck, NewOptions: func(checkGlobalConfig CheckGlobalConfig, check Check) (any, error) { diff --git a/pkg/k8s/containers/security.go b/pkg/k8s/containers/security.go index ed8535100..8db1736af 100644 --- a/pkg/k8s/containers/security.go +++ b/pkg/k8s/containers/security.go @@ -18,14 +18,17 @@ type SecurityContext struct { // toK8S converts SecurityContext to Kubernetes client object func (sc *SecurityContext) toK8S() *v1.SecurityContext { + var procMount *v1.ProcMountType + if sc.ProcMount != "" { + p := v1.ProcMountType(sc.ProcMount) + procMount = &p + } + return &v1.SecurityContext{ AllowPrivilegeEscalation: &sc.AllowPrivilegeEscalation, Capabilities: sc.Capabilities.toK8S(), Privileged: &sc.Privileged, - ProcMount: func() *v1.ProcMountType { - p := v1.ProcMountType(sc.ProcMount) - return &p - }(), + ProcMount: procMount, ReadOnlyRootFilesystem: &sc.ReadOnlyRootFilesystem, RunAsGroup: &sc.RunAsGroup, RunAsNonRoot: &sc.RunAsNonRoot, diff --git a/pkg/orchestration/k8s/orchestrator.go b/pkg/orchestration/k8s/orchestrator.go index 3e11d9b0c..f4ba2bbea 100644 --- a/pkg/orchestration/k8s/orchestrator.go +++ b/pkg/orchestration/k8s/orchestrator.go @@ -132,7 +132,10 @@ func (n *nodeOrchestrator) Create(ctx context.Context, o orchestration.CreateOpt } n.log.Infof("service %s is set in namespace %s", o.Name, o.Namespace) - if o.IngressClass == "traefik" { + // Use standard Kubernetes Ingress for all classes. Some clusters ship Traefik + // without the legacy traefik.containo.us CRDs, so the custom IngressRoute + // object path is not portable. + if false && o.IngressClass == "traefik" { // api service's ingressroute if _, err := n.k8s.IngressRoute.Set(ctx, o.Name, o.Namespace, ingressroute.Options{ Annotations: mergeMaps(o.Annotations, o.IngressAnnotations),