Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8de8fac
feat/pubsub
nugaon Apr 14, 2026
e3de025
refactor: mode id enum type
nugaon Apr 16, 2026
ad500fc
refactor: rename participant to publisher
nugaon Apr 16, 2026
c3d1fe7
docs: openapi
nugaon Apr 16, 2026
ecf9be6
fix: eth address instead of public key on api
nugaon Apr 17, 2026
a5dd875
fix: pubsub header parsing
bosi95 Apr 20, 2026
a613d1f
fix: config cli option for broker mode
nugaon Apr 20, 2026
d82bcfd
fix: http hijacked write error
nugaon Apr 21, 2026
dd4fe24
chore: debugging
nugaon Apr 21, 2026
6a69782
fix: sharky corruption (#5409)
martinconic Apr 15, 2026
43377ed
feat: optimize rpc calls (#5394)
sbackend123 Apr 15, 2026
3de7983
chore(deps): update GitHub Actions versions (#5433)
akrem-chabchoub Apr 17, 2026
95e448f
chore: apply gofumpt formatting across codebase (#5439)
gacevicljubisa Apr 17, 2026
80a17c7
fix: libp2p backoff clear
bosi95 Apr 21, 2026
9075234
fix: http hijack handler
bosi95 Apr 21, 2026
392f1e4
fix: hijacked websocket error
nugaon Apr 21, 2026
f4f1bf0
chore: logs debugging
nugaon Apr 21, 2026
4ea5afb
fix: pingpong with subscriber
nugaon Apr 21, 2026
ed51991
fix: upgrade ws no compress
bosi95 Apr 21, 2026
3986034
fix: readdeadline on pong
nugaon Apr 21, 2026
5ea506a
fix(broker): eof return
nugaon Apr 21, 2026
6d4197a
chore: logs
nugaon Apr 21, 2026
cbca6de
fix: wrong connection mapping and many refactors
nugaon Apr 22, 2026
292d7dc
fix: EOF in websocket
nugaon Apr 22, 2026
fb5b8dd
fix: p2p ping
nugaon Apr 22, 2026
6014b39
chore: debugging on broker side
nugaon Apr 22, 2026
64d1199
chore: debug
nugaon Apr 22, 2026
55aab09
chore: add logs for debugging
bosi95 Apr 22, 2026
ebfb786
chore: debug instead of info log
nugaon Apr 22, 2026
aecd516
fix: soc sig validation
bosi95 Apr 22, 2026
b1a529e
chore: message check
nugaon Apr 22, 2026
d8c0d22
fix: chunk span calc
bosi95 Apr 22, 2026
6277692
fix: span size 8
bosi95 Apr 22, 2026
dfd136a
Merge branch 'fix/pubsub-debug' into feat/pubsub
nugaon Apr 22, 2026
5ab0e17
fix: swap pubsub ws upgrade and connect order
bosi95 Apr 23, 2026
7ac99ef
fix: revert ws and svc connect order and cancel subscriberConn if pre…
bosi95 Apr 23, 2026
32a94c6
fix: unregister delete overlay
bosi95 Apr 23, 2026
fa0e4e1
feat: allow light node mode
nugaon Apr 23, 2026
a529e8c
fix: remove and create sub conn race conditions
bosi95 Apr 23, 2026
03dfbd4
Merge pull request #1 from Apiary-Suite/fix/pubsub-debug
nugaon Apr 23, 2026
1491bbb
fix: multiple ws on the same topic
nugaon Apr 23, 2026
5ab544d
refactor: remove unnecessary param in runMux
nugaon Apr 29, 2026
e474e5e
refactor: move connection related struct defs and methods to service
nugaon Apr 29, 2026
542e442
refactor: subtract the pinging service from mode
nugaon Apr 29, 2026
e6c7bb5
docs: correction of size in the span
nugaon Apr 29, 2026
db51c83
fix: removing duplicated stream close, leave only cancel
nugaon Apr 29, 2026
3b58640
refactor: log out dropping messages when ws buffer is full
nugaon Apr 29, 2026
c603e82
fix: clear subscriberConn in runMux defer to prevent stale reference …
nugaon Apr 29, 2026
0cb0202
fix: msg type in mode
nugaon Apr 30, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions .github/workflows/beekeeper.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ jobs:
cache: false
go-version-file: go.mod
- name: Cache Go Modules
uses: actions/cache@v4
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5
with:
path: |
~/.cache/go-build
Expand Down Expand Up @@ -84,7 +84,7 @@ jobs:
beekeeper version --log-verbosity 0
mv ~/.beekeeper.yaml .beekeeper.yaml
mv ~/.beekeeper/local.yaml local.yaml
- uses: actions/upload-artifact@v4
- uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6
with:
name: temp-artifacts
include-hidden-files: true
Expand All @@ -105,13 +105,13 @@ jobs:
needs: [init]
steps:
- name: Cache
uses: actions/cache@v4
uses: actions/cache@27d5ce7f107fe9357f9df03efb73ab90386fccae # v5
with:
path: |
/tmp/k3s-${{ env.K3S_VERSION }}
key: k3s-${{ env.K3S_VERSION }}
- name: "Download Artifact"
uses: actions/download-artifact@v4
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8
with:
name: temp-artifacts
- name: Unpack artifacts
Expand Down Expand Up @@ -220,7 +220,7 @@ jobs:
echo "Connect to github actions node using"
echo "sh <(curl -sSf https://lets.tunshell.com/init.sh) L $(echo $KEYS | jq -r .peer2_key) \${TUNSHELL_SECRET} eu.relay.tunshell.com"
curl -sSf https://lets.tunshell.com/init.sh | sh /dev/stdin T $(echo $KEYS | jq -r .peer1_key) ${{ secrets.TUNSHELL_SECRET }} eu.relay.tunshell.com
- uses: actions/upload-artifact@v4
- uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a # v7
if: failure()
with:
name: debug-dump
Expand All @@ -233,7 +233,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: "Download Artifact"
uses: actions/download-artifact@v4
uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8
with:
name: temp-artifacts
- name: Unpack artifacts
Expand All @@ -255,7 +255,7 @@ jobs:
echo RUN_TYPE="MERGE RUN" >> $GITHUB_ENV
- name: Trigger Bee Factory latest build
if: github.ref == 'refs/heads/master' && github.event.action != 'beekeeper' && success()
uses: peter-evans/repository-dispatch@v2
uses: peter-evans/repository-dispatch@28959ce8df70de7be546dd1250a005dd32156697 # v4
with:
token: ${{ secrets.GHA_PAT_BASIC }}
repository: ethersphere/bee-factory
Expand Down
6 changes: 6 additions & 0 deletions cmd/bee/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
optionNameRedistributionAddress = "redistribution-address"
optionNameStakingAddress = "staking-address"
optionNameBlockTime = "block-time"
optionNameBlockSyncInterval = "block-sync-interval"
optionWarmUpTime = "warmup-time"
optionNameMainNet = "mainnet"
optionNameRetrievalCaching = "cache-retrieval"
Expand Down Expand Up @@ -88,6 +89,8 @@ const (
optionAutoTLSDomain = "autotls-domain"
optionAutoTLSRegistrationEndpoint = "autotls-registration-endpoint"
optionAutoTLSCAEndpoint = "autotls-ca-endpoint"
optionNamePubsubBrokerMode = "pubsub-broker-mode"
optionNamePubsubMaxConnections = "pubsub-max-connections"

// blockchain-rpc
optionNameBlockchainRpcEndpoint = "blockchain-rpc-endpoint"
Expand Down Expand Up @@ -312,6 +315,7 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionNameRedistributionAddress, "", "redistribution contract address")
cmd.Flags().String(optionNameStakingAddress, "", "staking contract address")
cmd.Flags().Uint64(optionNameBlockTime, 5, "chain block time")
cmd.Flags().Uint64(optionNameBlockSyncInterval, 10, "block number cache sync interval in blocks")
cmd.Flags().Duration(optionWarmUpTime, time.Minute*5, "maximum node warmup duration; proceeds when stable or after this time")
cmd.Flags().Bool(optionNameMainNet, true, "triggers connect to main net bootnodes.")
cmd.Flags().Bool(optionNameRetrievalCaching, true, "enable forwarded content caching")
Expand All @@ -337,6 +341,8 @@ func (c *command) setAllFlags(cmd *cobra.Command) {
cmd.Flags().String(optionAutoTLSDomain, p2pforge.DefaultForgeDomain, "autotls domain")
cmd.Flags().String(optionAutoTLSRegistrationEndpoint, p2pforge.DefaultForgeEndpoint, "autotls registration endpoint")
cmd.Flags().String(optionAutoTLSCAEndpoint, p2pforge.DefaultCAEndpoint, "autotls certificate authority endpoint")
cmd.Flags().Bool(optionNamePubsubBrokerMode, true, "enable pubsub broker mode")
cmd.Flags().Int(optionNamePubsubMaxConnections, 0, "max pubsub connections per topic (0 = unlimited)")
}

// preRun must be called from every command's PreRunE, after which c.logger is
Expand Down
1 change: 0 additions & 1 deletion cmd/bee/cmd/configurateoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
)

func (c *command) initConfigurateOptionsCmd() (err error) {

cmd := &cobra.Command{
Use: "printconfig",
Short: "Print default or provided configuration in yaml format",
Expand Down
7 changes: 4 additions & 3 deletions cmd/bee/cmd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func dbExportReserveCmd(cmd *cobra.Command) {
hdr := &tar.Header{
Name: chunk.Address().String(),
Size: int64(len(b)),
Mode: 0600,
Mode: 0o600,
}
if err := tw.WriteHeader(hdr); err != nil {
return true, fmt.Errorf("writing header: %w", err)
Expand Down Expand Up @@ -533,7 +533,7 @@ func dbExportPinningCmd(cmd *cobra.Command) {
err = tw.WriteHeader(&tar.Header{
Name: root.String() + "/" + addr.String(),
Size: int64(len(b)),
Mode: 0600,
Mode: 0o600,
})
if err != nil {
return true, fmt.Errorf("error writing header: %w", err)
Expand Down Expand Up @@ -867,7 +867,8 @@ func dbNukeCmd(cmd *cobra.Command) {
}

return nil
}}
},
}
c.Flags().String(optionNameDataDir, "", "data directory")
c.Flags().String(optionNameVerbosity, "trace", "verbosity level")
c.Flags().Duration(optionNameSleepAfter, time.Duration(0), "time to sleep after the operation finished")
Expand Down
1 change: 1 addition & 0 deletions cmd/bee/cmd/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func (c *command) initDeployCmd() error {
IdleTimeout: c.config.GetDuration(configKeyBlockchainRpcIdleTimeout),
Keepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive),
},
c.config.GetUint64(optionNameBlockSyncInterval),
)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions cmd/bee/cmd/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type putter struct {
func (s *putter) Put(_ context.Context, chunk swarm.Chunk) error {
return s.cb(chunk)
}

func newPutter(cb func(ch swarm.Chunk) error) *putter {
return &putter{
cb: cb,
Expand Down Expand Up @@ -103,7 +104,7 @@ func splitRefs(cmd *cobra.Command) {
refs = append(refs, ch.Address().String())
return nil
})
writer, err := os.OpenFile(outputFileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
writer, err := os.OpenFile(outputFileName, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open output file: %w", err)
}
Expand Down Expand Up @@ -191,7 +192,7 @@ func splitChunks(cmd *cobra.Command) {
var chunksCount atomic.Int64
store := newPutter(func(chunk swarm.Chunk) error {
filePath := filepath.Join(outputDir, chunk.Address().String())
err := os.WriteFile(filePath, chunk.Data(), 0644)
err := os.WriteFile(filePath, chunk.Data(), 0o644)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/bee/cmd/split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestDBSplitRefs(t *testing.T) {
}

inputFileName := path.Join(t.TempDir(), "input")
err = os.WriteFile(inputFileName, buf, 0644)
err = os.WriteFile(inputFileName, buf, 0o644)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestDBSplitChunks(t *testing.T) {
}

inputFileName := path.Join(t.TempDir(), "input")
err = os.WriteFile(inputFileName, buf, 0644)
err = os.WriteFile(inputFileName, buf, 0o644)
if err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/bee/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo
BlockchainRpcKeepalive: c.config.GetDuration(configKeyBlockchainRpcKeepalive),
BlockProfile: c.config.GetBool(optionNamePProfBlock),
BlockTime: networkConfig.blockTime,
BlockSyncInterval: c.config.GetUint64(optionNameBlockSyncInterval),
BootnodeMode: bootNode,
Bootnodes: networkConfig.bootNodes,
CacheCapacity: c.config.GetUint64(optionNameCacheCapacity),
Expand Down Expand Up @@ -331,6 +332,8 @@ func buildBeeNode(ctx context.Context, c *command, cmd *cobra.Command, logger lo
WarmupTime: c.config.GetDuration(optionWarmUpTime),
WelcomeMessage: c.config.GetString(optionWelcomeMessage),
WhitelistedWithdrawalAddress: c.config.GetStringSlice(optionNameWhitelistedWithdrawalAddress),
PubsubBrokerMode: c.config.GetBool(optionNamePubsubBrokerMode),
PubsubMaxConnections: c.config.GetInt(optionNamePubsubMaxConnections),
})

return b, err
Expand Down
1 change: 0 additions & 1 deletion cmd/bee/cmd/start_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
)

func (c *command) initStartDevCmd() (err error) {

cmd := &cobra.Command{
Use: "dev",
Short: "Start in dev mode. WARNING: This command will be deprecated soon.",
Expand Down
54 changes: 54 additions & 0 deletions openapi/Swarm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2428,3 +2428,57 @@ paths:
$ref: "SwarmCommon.yaml#/components/responses/400"
default:
description: Default response.

"/pubsub/{topic}":
get:
summary: Connect to a pubsub topic via WebSocket
description: |
Opens a WebSocket connection to a pubsub topic. The connection acts as either a publisher (read+write)
or subscriber (read-only) depending on the presence of GSOC headers.

**WebSocket protocol:**
- Inbound (client → node, publisher only): raw SOC payload `[sig:65B][span:8B][payload:N B]`
- Outbound (node → client): raw SOC payload `[sig:65B][span:8B][payload:N B]`
tags:
- Pubsub
parameters:
- in: path
name: topic
schema:
type: string
required: true
description: Topic identifier (hex-encoded address or arbitrary string to be hashed)
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubPeer"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocEthAddress"
- $ref: "SwarmCommon.yaml#/components/parameters/SwarmPubsubGsocTopic"
- in: header
name: swarm-keep-alive
schema:
type: integer
required: false
description: WebSocket ping period in seconds (default: 60)
responses:
"101":
description: WebSocket upgrade successful
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"

"/pubsub/":
get:
summary: List all pubsub topics
description: Returns a list of all active pubsub topics this node is participating in (as broker or subscriber)
tags:
- Pubsub
responses:
"200":
description: List of pubsub topics
content:
application/json:
schema:
$ref: "SwarmCommon.yaml#/components/schemas/PubsubTopicListResponse"
"400":
$ref: "SwarmCommon.yaml#/components/responses/400"
"500":
$ref: "SwarmCommon.yaml#/components/responses/500"
49 changes: 49 additions & 0 deletions openapi/SwarmCommon.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1076,6 +1076,31 @@ components:
required: false
description: "Indicates which feed version was resolved (v1 or v2)"

PubsubTopicInfo:
type: object
properties:
topicAddress:
type: string
description: "Hex-encoded topic address"
mode:
type: integer
description: "Pubsub mode identifier"
role:
type: string
description: "Role of this node: 'broker' or 'subscriber'"
connections:
type: array
items:
type: string
description: "List of connected peer overlays"

PubsubTopicListResponse:
type: object
properties:
topics:
type: array
items:
$ref: "#/components/schemas/PubsubTopicInfo"

parameters:
GasPriceParameter:
Expand Down Expand Up @@ -1279,6 +1304,30 @@ components:
required: false
description: "ACT history Unix timestamp"

SwarmPubsubPeer:
in: header
name: swarm-pubsub-peer
schema:
type: string
required: true
description: "Multiaddress of the broker peer to connect to for pubsub"

SwarmPubsubGsocEthAddress:
in: header
name: swarm-pubsub-gsoc-eth-address
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC owner Ethereum address (20 bytes, hex-encoded) for publisher role. Required together with swarm-pubsub-gsoc-topic to upgrade to publisher."

SwarmPubsubGsocTopic:
in: header
name: swarm-pubsub-gsoc-topic
schema:
$ref: "#/components/schemas/HexString"
required: false
description: "GSOC topic identifier (hex) for publisher role. Required together with swarm-pubsub-gsoc-eth-address to upgrade to publisher."

responses:
"200":
description: Success
Expand Down
2 changes: 2 additions & 0 deletions packaging/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
1 change: 1 addition & 0 deletions packaging/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
- BEE_AUTOTLS_DOMAIN
- BEE_AUTOTLS_REGISTRATION_ENDPOINT
- BEE_BLOCK_TIME
- BEE_BLOCK_SYNC_INTERVAL
- BEE_BLOCKCHAIN_RPC_DIAL_TIMEOUT
- BEE_BLOCKCHAIN_RPC_ENDPOINT
- BEE_BLOCKCHAIN_RPC_IDLE_TIMEOUT
Expand Down
2 changes: 2 additions & 0 deletions packaging/docker/env
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# BEE_AUTOTLS_REGISTRATION_ENDPOINT=
## chain block time (default 5)
# BEE_BLOCK_TIME=5
## sets how many estimated blocks Bee can trust local block-number extrapolation before forcing a fresh HeaderByNumber(nil) RPC sync (default 10)
# BEE_BLOCK_SYNC_INTERVAL=10
## blockchain rpc TCP dial timeout (default 30s)
# BEE_BLOCKCHAIN_RPC_DIAL_TIMEOUT=30s
## rpc blockchain endpoint (default empty)
Expand Down
2 changes: 2 additions & 0 deletions packaging/homebrew-amd64/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
2 changes: 2 additions & 0 deletions packaging/homebrew-arm64/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
2 changes: 2 additions & 0 deletions packaging/scoop/bee.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
# api-addr: 127.0.0.1:1633
## chain block time
# block-time: "5"
## block number cache sync interval in blocks
# block-sync-interval: 10
## blockchain rpc configuration
# blockchain-rpc:
# endpoint: ""
Expand Down
6 changes: 4 additions & 2 deletions pkg/accesscontrol/kvs/mock/kvs.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import (
"github.com/ethersphere/bee/v2/pkg/swarm"
)

var lock = &sync.Mutex{}
var lockGetPut = &sync.Mutex{}
var (
lock = &sync.Mutex{}
lockGetPut = &sync.Mutex{}
)

type single struct {
memoryMock map[string]map[string][]byte
Expand Down
Loading