Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
8de8fac
feat/pubsub
nugaon Apr 14, 2026
e3de025
refactor: mode id enum type
nugaon Apr 16, 2026
ad500fc
refactor: rename participant to publisher
nugaon Apr 16, 2026
c3d1fe7
docs: openapi
nugaon Apr 16, 2026
ecf9be6
fix: eth address instead of public key on api
nugaon Apr 17, 2026
a5dd875
fix: pubsub header parsing
bosi95 Apr 20, 2026
a613d1f
fix: config cli option for broker mode
nugaon Apr 20, 2026
d82bcfd
fix: http hijacked write error
nugaon Apr 21, 2026
dd4fe24
chore: debugging
nugaon Apr 21, 2026
6a69782
fix: sharky corruption (#5409)
martinconic Apr 15, 2026
43377ed
feat: optimize rpc calls (#5394)
sbackend123 Apr 15, 2026
3de7983
chore(deps): update GitHub Actions versions (#5433)
akrem-chabchoub Apr 17, 2026
95e448f
chore: apply gofumpt formatting across codebase (#5439)
gacevicljubisa Apr 17, 2026
80a17c7
fix: libp2p backoff clear
bosi95 Apr 21, 2026
9075234
fix: http hijack handler
bosi95 Apr 21, 2026
392f1e4
fix: hijacked websocket error
nugaon Apr 21, 2026
f4f1bf0
chore: logs debugging
nugaon Apr 21, 2026
4ea5afb
fix: pingpong with subscriber
nugaon Apr 21, 2026
ed51991
fix: upgrade ws no compress
bosi95 Apr 21, 2026
3986034
fix: readdeadline on pong
nugaon Apr 21, 2026
5ea506a
fix(broker): eof return
nugaon Apr 21, 2026
6d4197a
chore: logs
nugaon Apr 21, 2026
cbca6de
fix: wrong connection mapping and many refactors
nugaon Apr 22, 2026
292d7dc
fix: EOF in websocket
nugaon Apr 22, 2026
fb5b8dd
fix: p2p ping
nugaon Apr 22, 2026
6014b39
chore: debugging on broker side
nugaon Apr 22, 2026
64d1199
chore: debug
nugaon Apr 22, 2026
55aab09
chore: add logs for debugging
bosi95 Apr 22, 2026
ebfb786
chore: debug instead of info log
nugaon Apr 22, 2026
aecd516
fix: soc sig validation
bosi95 Apr 22, 2026
b1a529e
chore: message check
nugaon Apr 22, 2026
d8c0d22
fix: chunk span calc
bosi95 Apr 22, 2026
6277692
fix: span size 8
bosi95 Apr 22, 2026
dfd136a
Merge branch 'fix/pubsub-debug' into feat/pubsub
nugaon Apr 22, 2026
5ab0e17
fix: swap pubsub ws upgrade and connect order
bosi95 Apr 23, 2026
7ac99ef
fix: revert ws and svc connect order and cancel subscriberConn if pre…
bosi95 Apr 23, 2026
32a94c6
fix: unregister delete overlay
bosi95 Apr 23, 2026
fa0e4e1
feat: allow light node mode
nugaon Apr 23, 2026
a529e8c
fix: remove and create sub conn race conditions
bosi95 Apr 23, 2026
03dfbd4
Merge pull request #1 from Apiary-Suite/fix/pubsub-debug
nugaon Apr 23, 2026
1491bbb
fix: multiple ws on the same topic
nugaon Apr 23, 2026
5ab544d
refactor: remove unnecessary param in runMux
nugaon Apr 29, 2026
e474e5e
refactor: move connection related struct defs and methods to service
nugaon Apr 29, 2026
542e442
refactor: subtract the pinging service from mode
nugaon Apr 29, 2026
e6c7bb5
docs: correction of size in the span
nugaon Apr 29, 2026
db51c83
fix: removing duplicated stream close, leave only cancel
nugaon Apr 29, 2026
3b58640
refactor: log out dropping messages when ws buffer is full
nugaon Apr 29, 2026
c603e82
fix: clear subscriberConn in runMux defer to prevent stale reference …
nugaon Apr 29, 2026
0cb0202
fix: msg type in mode
nugaon Apr 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/postage"
"github.com/ethersphere/bee/v2/pkg/postage/postagecontract"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/resolver"
"github.com/ethersphere/bee/v2/pkg/resolver/client/ens"
"github.com/ethersphere/bee/v2/pkg/resolver/multiresolver"
Expand Down Expand Up @@ -94,6 +95,9 @@ const (
SwarmActTimestampHeader = "Swarm-Act-Timestamp"
SwarmActPublisherHeader = "Swarm-Act-Publisher"
SwarmActHistoryAddressHeader = "Swarm-Act-History-Address"
SwarmPubsubPeerHeader = "Swarm-Pubsub-Peer"
SwarmPubsubGsocPublicKeyHeader = "Swarm-Pubsub-Gsoc-Public-Key"
SwarmPubsubGsocTopicHeader = "Swarm-Pubsub-Gsoc-Topic"

ImmutableHeader = "Immutable"
GasPriceHeader = "Gas-Price"
Expand Down Expand Up @@ -187,6 +191,7 @@ type Service struct {

topologyDriver topology.Driver
p2p p2p.DebugService
pubsubSvc *pubsub.Service
accounting accounting.Interface
chequebook chequebook.Service
pseudosettle settlement.Interface
Expand Down Expand Up @@ -270,6 +275,7 @@ type ExtraOptions struct {
SyncStatus func() (bool, error)
NodeStatus *status.Service
PinIntegrity PinIntegrity
PubsubService *pubsub.Service
}

func New(
Expand Down Expand Up @@ -361,6 +367,7 @@ func (s *Service) Configure(signer crypto.Signer, tracer *tracing.Tracer, o Opti
s.lightNodes = e.LightNodes
s.pseudosettle = e.Pseudosettle
s.blockTime = e.BlockTime
s.pubsubSvc = e.PubsubService

s.statusSem = semaphore.NewWeighted(1)
s.postageSem = semaphore.NewWeighted(1)
Expand Down Expand Up @@ -589,6 +596,7 @@ func (s *Service) corsHandler(h http.Handler) http.Handler {
SwarmRedundancyStrategyHeader, SwarmRedundancyFallbackModeHeader, SwarmChunkRetrievalTimeoutHeader, SwarmLookAheadBufferSizeHeader,
SwarmFeedIndexHeader, SwarmFeedIndexNextHeader, SwarmSocSignatureHeader, SwarmOnlyRootChunk, GasPriceHeader, GasLimitHeader, ImmutableHeader,
SwarmActHeader, SwarmActTimestampHeader, SwarmActPublisherHeader, SwarmActHistoryAddressHeader,
SwarmPubsubPeerHeader, SwarmPubsubGsocPublicKeyHeader, SwarmPubsubGsocTopicHeader,
}
allowedHeadersStr := strings.Join(allowedHeaders, ", ")

Expand Down
144 changes: 144 additions & 0 deletions pkg/api/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2026 The Swarm Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package api

import (
"context"
"encoding/hex"
"net/http"
"time"

"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/swarm"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
ma "github.com/multiformats/go-multiaddr"
)

func (s *Service) pubsubWsHandler(w http.ResponseWriter, r *http.Request) {
logger := s.logger.WithName("pubsub").Build()

paths := struct {
Topic string `map:"topic" validate:"required"`
}{}
if response := s.mapStructure(mux.Vars(r), &paths); response != nil {
response("invalid path params", logger, w)
return
}

var topicAddr [32]byte
if decoded, err := hex.DecodeString(paths.Topic); err == nil && len(decoded) == swarm.HashSize {
copy(topicAddr[:], decoded)
} else {
h := swarm.NewHasher()
_, _ = h.Write([]byte(paths.Topic))
copy(topicAddr[:], h.Sum(nil))
}

// Required header: underlay multiaddr
peerHeader := r.Header.Get(SwarmPubsubPeerHeader)
if peerHeader == "" {
jsonhttp.BadRequest(w, "missing Swarm-Pubsub-Peer header")
return
}
underlay, err := ma.NewMultiaddr(peerHeader)
if err != nil {
logger.Debug("invalid peer multiaddr", "value", peerHeader, "error", err)
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Peer header")
return
}

// Optional headers: GSOC fields for Participant upgrade
var connectOpts pubsub.ConnectOptions

gsocPubKeyHex := r.Header.Get(SwarmPubsubGsocPublicKeyHeader)
gsocTopicHex := r.Header.Get(SwarmPubsubGsocTopicHeader)
if gsocPubKeyHex != "" && gsocTopicHex != "" {
gsocOwner, err := hex.DecodeString(gsocPubKeyHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Public-Key header")
return
}
gsocID, err := hex.DecodeString(gsocTopicHex)
if err != nil {
jsonhttp.BadRequest(w, "invalid Swarm-Pubsub-Gsoc-Topic header")
return
}
connectOpts.GsocOwner = gsocOwner
connectOpts.GsocID = gsocID
connectOpts.ReadWrite = true
}

headers := struct {
KeepAlive time.Duration `map:"Swarm-Keep-Alive"`
}{}
if response := s.mapStructure(r.Header, &headers); response != nil {
response("invalid header params", logger, w)
return
}

if s.beeMode == DevMode {
logger.Warning("pubsub endpoint is disabled in dev mode")
jsonhttp.BadRequest(w, errUnsupportedDevNodeOperation)
return
}

// Connect to broker peer
ctx, cancel := context.WithCancel(context.Background())
subscriberConn, err := s.pubsubSvc.Connect(ctx, underlay, topicAddr, pubsub.ModeGSOCEphemeral, connectOpts)
if err != nil {
cancel()
logger.Debug("pubsub connect failed", "error", err)
jsonhttp.InternalServerError(w, "pubsub connect failed")
return
}

// Upgrade to WebSocket
upgrader := websocket.Upgrader{
ReadBufferSize: swarm.ChunkWithSpanSize,
WriteBufferSize: swarm.ChunkWithSpanSize,
CheckOrigin: s.checkOrigin,
}

conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
cancel()
_ = subscriberConn.Stream.Close()
logger.Debug("websocket upgrade failed", "error", err)
logger.Error(nil, "websocket upgrade failed")
jsonhttp.InternalServerError(w, "upgrade failed")
return
}

pingPeriod := headers.KeepAlive * time.Second
if pingPeriod == 0 {
pingPeriod = time.Minute
}

isParticipant := connectOpts.ReadWrite

s.wsWg.Add(1)
go func() {
pubsub.ListeningWs(ctx, conn, pubsub.WsOptions{PingPeriod: pingPeriod, Cancel: cancel}, logger, subscriberConn, isParticipant)
_ = conn.Close()
subscriberConn.Cancel()
s.wsWg.Done()
}()
}

func (s *Service) pubsubListHandler(w http.ResponseWriter, r *http.Request) {
if s.pubsubSvc == nil {
jsonhttp.NotFound(w, "pubsub service not available")
return
}

topics := s.pubsubSvc.Topics()
jsonhttp.OK(w, struct {
Topics []pubsub.TopicInfo `json:"topics"`
}{
Topics: topics,
})
}
64 changes: 64 additions & 0 deletions pkg/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,70 @@ func (s *Service) mountAPI() {
),
})

handle("/pubsub/{topic}", web.ChainHandlers(
web.FinalHandlerFunc(s.pubsubWsHandler),
))

handle("/pubsub/", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.pubsubListHandler),
}),
))

handle("/pss/subscribe/{topic}", web.ChainHandlers(
web.FinalHandlerFunc(s.pssWsHandler),
))

handle("/tags", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listTagsHandler),
"POST": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(1024),
web.FinalHandlerFunc(s.createTagHandler),
),
})),
)

handle("/tags/{id}", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getTagHandler),
"DELETE": http.HandlerFunc(s.deleteTagHandler),
"PATCH": web.ChainHandlers(
jsonhttp.NewMaxBodyBytesHandler(1024),
web.FinalHandlerFunc(s.doneSplitHandler),
),
})),
)

handle("/pins", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.listPinnedRootHashes),
})),
)

handle("/pins/check", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.pinIntegrityHandler),
}),
))

handle("/pins/{reference}", web.ChainHandlers(
web.FinalHandler(jsonhttp.MethodHandler{
"GET": http.HandlerFunc(s.getPinnedRootHash),
"POST": http.HandlerFunc(s.pinRootHash),
"DELETE": http.HandlerFunc(s.unpinRootHash),
})),
)

handle("/stewardship/{address}", jsonhttp.MethodHandler{
"GET": web.ChainHandlers(
web.FinalHandlerFunc(s.stewardshipGetHandler),
),
"PUT": web.ChainHandlers(
web.FinalHandlerFunc(s.stewardshipPutHandler),
),
})

handle("/pss/subscribe/{topic}", http.HandlerFunc(s.pssWsHandler))

handle("/gsoc/subscribe/{address}", web.ChainHandlers(
Expand Down
10 changes: 10 additions & 0 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/pricer"
"github.com/ethersphere/bee/v2/pkg/pricing"
"github.com/ethersphere/bee/v2/pkg/pss"
"github.com/ethersphere/bee/v2/pkg/pubsub"
"github.com/ethersphere/bee/v2/pkg/puller"
"github.com/ethersphere/bee/v2/pkg/pullsync"
"github.com/ethersphere/bee/v2/pkg/pusher"
Expand Down Expand Up @@ -192,6 +193,8 @@ type Options struct {
WarmupTime time.Duration
WelcomeMessage string
WhitelistedWithdrawalAddress []string
PubsubBrokerMode bool
PubsubMaxConnections int
}

const (
Expand Down Expand Up @@ -665,6 +668,7 @@ func NewBee(
Nonce: nonce,
ValidateOverlay: chainEnabled,
Registry: registry,
PubsubReservedStreamSlots: o.PubsubMaxConnections,
})
if err != nil {
return nil, fmt.Errorf("p2p service: %w", err)
Expand Down Expand Up @@ -737,6 +741,11 @@ func NewBee(
return nil, fmt.Errorf("init batch service: %w", err)
}

pubsubSvc := pubsub.New(p2ps, logger, o.PubsubBrokerMode, o.PubsubMaxConnections)
if err = p2ps.AddProtocol(pubsubSvc.Protocol()); err != nil {
return nil, fmt.Errorf("pubsub protocol: %w", err)
}

// Construct protocols.
pingPong := pingpong.New(p2ps, logger, tracer)

Expand Down Expand Up @@ -1266,6 +1275,7 @@ func NewBee(
SyncStatus: syncStatusFn,
NodeStatus: nodeStatus,
PinIntegrity: localStore.PinIntegrity(),
PubsubService: pubsubSvc,
}

if o.APIAddr != "" {
Expand Down
10 changes: 8 additions & 2 deletions pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ type Options struct {
HeadersRWTimeout time.Duration
Registry *prometheus.Registry
autoTLSCertManager autoTLSCertManager
PubsubReservedStreamSlots int
}

func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay swarm.Address, addr string, ab addressbook.Putter, storer storage.StateStorer, lightNodes *lightnode.Container, logger log.Logger, tracer *tracing.Tracer, o Options) (s *Service, returnErr error) {
Expand Down Expand Up @@ -209,11 +210,16 @@ func New(ctx context.Context, signer beecrypto.Signer, networkID uint64, overlay
}

// Tweak certain settings
inboundLimit := rcmgr.LimitVal(IncomingStreamCountLimit - o.PubsubReservedStreamSlots)
if inboundLimit < 0 {
inboundLimit = 0
}

cfg := rcmgr.PartialLimitConfig{
System: rcmgr.ResourceLimits{
Streams: IncomingStreamCountLimit + OutgoingStreamCountLimit,
Streams: inboundLimit + OutgoingStreamCountLimit,
StreamsOutbound: OutgoingStreamCountLimit,
StreamsInbound: IncomingStreamCountLimit,
StreamsInbound: inboundLimit,
},
}

Expand Down
Loading
Loading