diff --git a/pkg/api/api.go b/pkg/api/api.go index 3f17a83b100..ac403fd4c11 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -94,6 +94,7 @@ const ( SwarmActTimestampHeader = "Swarm-Act-Timestamp" SwarmActPublisherHeader = "Swarm-Act-Publisher" SwarmActHistoryAddressHeader = "Swarm-Act-History-Address" + SwarmChunkTypeHeader = "Swarm-Chunk-Type" ImmutableHeader = "Immutable" GasPriceHeader = "Gas-Price" diff --git a/pkg/api/chunk.go b/pkg/api/chunk.go index 6ddb2a50b10..39558183fdf 100644 --- a/pkg/api/chunk.go +++ b/pkg/api/chunk.go @@ -11,6 +11,7 @@ import ( "io" "net/http" "strconv" + "strings" "github.com/ethersphere/bee/v2/pkg/accesscontrol" "github.com/ethersphere/bee/v2/pkg/cac" @@ -37,6 +38,7 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { SwarmTag uint64 `map:"Swarm-Tag"` Act bool `map:"Swarm-Act"` HistoryAddress swarm.Address `map:"Swarm-Act-History-Address"` + ChunkType string `map:"Swarm-Chunk-Type"` }{} if response := s.mapStructure(r.Header, &headers); response != nil { response("invalid header params", logger, w) @@ -138,33 +140,66 @@ func (s *Service) chunkUploadHandler(w http.ResponseWriter, r *http.Request) { return } - chunk, err := cac.NewWithDataSpan(data) - if err != nil { - // not a valid cac chunk. Check if it's a replica soc chunk. - logger.Debug("chunk upload: create chunk failed", "error", err) - - // FromChunk only uses the chunk data to recreate the soc chunk. So the address is irrelevant. + var chunk swarm.Chunk + switch strings.ToLower(headers.ChunkType) { + case "soc", "1": sch, err := soc.FromChunk(swarm.NewChunk(swarm.EmptyAddress, data)) if err != nil { - logger.Debug("chunk upload: create soc chunk from data failed", "error", err) - logger.Error(nil, "chunk upload: create chunk error") - jsonhttp.InternalServerError(ow, "create chunk error") + logger.Debug("chunk upload: soc parse failed", "error", err) + logger.Error(nil, "chunk upload: invalid soc chunk") + jsonhttp.BadRequest(ow, "invalid soc chunk") return } chunk, err = sch.Chunk() - if err != nil { - logger.Debug("chunk upload: create chunk from soc failed", "error", err) - logger.Error(nil, "chunk upload: create chunk error") - jsonhttp.InternalServerError(ow, "create chunk error") + if err != nil || !soc.Valid(chunk) { + logger.Debug("chunk upload: soc validation failed", "error", err) + logger.Error(nil, "chunk upload: invalid soc chunk") + jsonhttp.BadRequest(ow, "invalid soc chunk") return } - - if !soc.Valid(chunk) { - logger.Debug("chunk upload: invalid soc chunk") - logger.Error(nil, "chunk upload: create chunk error") - jsonhttp.InternalServerError(ow, "create chunk error") + case "cac", "0": + chunk, err = cac.NewWithDataSpan(data) + if err != nil { + logger.Debug("chunk upload: cac parse failed", "error", err) + logger.Error(nil, "chunk upload: invalid cac chunk") + jsonhttp.BadRequest(ow, "invalid cac chunk") return } + case "": + // No chunk type specified — auto-detect. + // Try SOC first because CAC is too permissive (accepts any 8-4104 byte data). + sch, err := soc.FromChunk(swarm.NewChunk(swarm.EmptyAddress, data)) + if err == nil { + chunk, err = sch.Chunk() + if err != nil { + logger.Debug("chunk upload: create chunk from soc failed", "error", err) + logger.Error(nil, "chunk upload: create chunk error") + jsonhttp.InternalServerError(ow, "create chunk error") + return + } + if !soc.Valid(chunk) { + chunk, err = cac.NewWithDataSpan(data) + if err != nil { + logger.Debug("chunk upload: create chunk failed", "error", err) + logger.Error(nil, "chunk upload: create chunk error") + jsonhttp.InternalServerError(ow, "create chunk error") + return + } + } + } else { + chunk, err = cac.NewWithDataSpan(data) + if err != nil { + logger.Debug("chunk upload: create chunk failed", "error", err) + logger.Error(nil, "chunk upload: create chunk error") + jsonhttp.InternalServerError(ow, "create chunk error") + return + } + } + default: + logger.Debug("chunk upload: invalid chunk type", "type", headers.ChunkType) + logger.Error(nil, "chunk upload: invalid chunk type") + jsonhttp.BadRequest(ow, "invalid chunk type; expected 'soc' or 'cac'") + return } err = putter.Put(r.Context(), chunk) diff --git a/pkg/api/chunk_test.go b/pkg/api/chunk_test.go index b2512aa2ae8..fc929972159 100644 --- a/pkg/api/chunk_test.go +++ b/pkg/api/chunk_test.go @@ -23,9 +23,11 @@ import ( mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock" "github.com/ethersphere/bee/v2/pkg/api" + "github.com/ethersphere/bee/v2/pkg/cac" "github.com/ethersphere/bee/v2/pkg/jsonhttp" "github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest" testingpostage "github.com/ethersphere/bee/v2/pkg/postage/testing" + testingsoc "github.com/ethersphere/bee/v2/pkg/soc/testing" testingc "github.com/ethersphere/bee/v2/pkg/storage/testing" "github.com/ethersphere/bee/v2/pkg/swarm" ) @@ -282,3 +284,354 @@ func TestPreSignedUpload(t *testing.T) { jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())), ) } + +// nolint:paralleltest,tparallel +// TestChunkUploadSOC tests that SOC chunks uploaded via POST /chunks are correctly +// identified and stored at their SOC address, not misclassified as CAC chunks. +func TestChunkUploadSOC(t *testing.T) { + t.Parallel() + + var ( + chunksEndpoint = "/chunks" + chunksResource = func(a swarm.Address) string { return "/chunks/" + a.String() } + ) + + t.Run("soc upload returns soc address", func(t *testing.T) { + var ( + mockSOC = testingsoc.GenerateMockSOC(t, []byte("test payload")) + socChunk = mockSOC.Chunk() + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + // drain pusher feed + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(socChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: mockSOC.Address()}), + ) + }) + + t.Run("soc upload chunk is retrievable", func(t *testing.T) { + var ( + mockSOC = testingsoc.GenerateMockSOC(t, []byte("retrievable")) + socChunk = mockSOC.Chunk() + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + tag, err := storerMock.NewSession() + if err != nil { + t.Fatal(err) + } + + // upload with tag so chunk is stored in ChunkStore (deferred upload path) + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmTagHeader, fmt.Sprintf("%d", tag.TagID)), + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(socChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: mockSOC.Address()}), + ) + + // verify chunk exists in store at SOC address + has, err := storerMock.ChunkStore().Has(context.Background(), mockSOC.Address()) + if err != nil { + t.Fatal(err) + } + if !has { + t.Fatal("soc chunk not found in store at soc address") + } + + // retrieve by SOC address + jsonhttptest.Request(t, client, http.MethodGet, chunksResource(mockSOC.Address()), http.StatusOK, + jsonhttptest.WithExpectedResponse(socChunk.Data()), + jsonhttptest.WithExpectedContentLength(len(socChunk.Data())), + ) + }) + + t.Run("soc direct upload", func(t *testing.T) { + var ( + mockSOC = testingsoc.GenerateMockSOC(t, []byte("direct")) + socChunk = mockSOC.Chunk() + storerMock = mockstorer.New() + client, _, _, chanStorer = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + DirectUpload: true, + }) + ) + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(socChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: mockSOC.Address()}), + ) + + time.Sleep(time.Millisecond * 100) + err := spinlock.Wait(time.Second, func() bool { return chanStorer.Has(mockSOC.Address()) }) + if err != nil { + t.Fatal("soc chunk not found at soc address in direct upload channel") + } + }) + + t.Run("cac upload still works", func(t *testing.T) { + var ( + chunk = testingc.GenerateTestRandomChunk() + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}), + ) + }) + + t.Run("cac upload small payload", func(t *testing.T) { + // Minimal CAC: span (8 bytes) + 1 byte payload = 9 bytes total + var ( + cacChunk, _ = cac.New([]byte{0x01}) + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(cacChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: cacChunk.Address()}), + ) + }) + + t.Run("cac upload max payload", func(t *testing.T) { + // Maximum CAC: span (8 bytes) + 4096 byte payload = 4104 bytes total + var ( + payload = make([]byte, swarm.ChunkSize) + cacChunk, _ = cac.New(payload) + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(cacChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: cacChunk.Address()}), + ) + }) + + t.Run("data too short", func(t *testing.T) { + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: mockstorer.New(), + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusBadRequest, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader([]byte{0x01, 0x02, 0x03})), + jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{ + Message: "insufficient data length", + Code: http.StatusBadRequest, + }), + ) + }) + + t.Run("invalid soc falls back to cac", func(t *testing.T) { + // Data that is >= SocMinChunkSize (105 bytes) but not a valid SOC + // (random data won't have valid ECDSA signature). Should fall back to CAC. + var ( + payload = make([]byte, swarm.SocMinChunkSize-swarm.SpanSize) // 97 bytes payload + cacChunk, _ = cac.New(payload) // 105 bytes total (span + 97) + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestBody(bytes.NewReader(cacChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: cacChunk.Address()}), + ) + }) + + t.Run("soc upload with chunk type header", func(t *testing.T) { + var ( + mockSOC = testingsoc.GenerateMockSOC(t, []byte("header soc")) + socChunk = mockSOC.Chunk() + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmChunkTypeHeader, "soc"), + jsonhttptest.WithRequestBody(bytes.NewReader(socChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: mockSOC.Address()}), + ) + }) + + t.Run("soc upload with chunk type header numeric", func(t *testing.T) { + var ( + mockSOC = testingsoc.GenerateMockSOC(t, []byte("header soc 1")) + socChunk = mockSOC.Chunk() + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmChunkTypeHeader, "1"), + jsonhttptest.WithRequestBody(bytes.NewReader(socChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: mockSOC.Address()}), + ) + }) + + t.Run("cac upload with chunk type header", func(t *testing.T) { + var ( + chunk = testingc.GenerateTestRandomChunk() + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmChunkTypeHeader, "cac"), + jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}), + ) + }) + + t.Run("cac upload with chunk type header numeric", func(t *testing.T) { + var ( + chunk = testingc.GenerateTestRandomChunk() + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmChunkTypeHeader, "0"), + jsonhttptest.WithRequestBody(bytes.NewReader(chunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: chunk.Address()}), + ) + }) + + t.Run("soc data with cac header returns cac address", func(t *testing.T) { + // When SOC data is uploaded with Swarm-Chunk-Type: cac, it should be + // treated as CAC (no SOC detection), producing the content address. + var ( + mockSOC = testingsoc.GenerateMockSOC(t, []byte("wrong type")) + socChunk = mockSOC.Chunk() + cacChunk, _ = cac.NewWithDataSpan(socChunk.Data()) + storerMock = mockstorer.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + ) + + go func() { <-storerMock.PusherFeed() }() + + // Should succeed but return the CAC address, not the SOC address + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmChunkTypeHeader, "cac"), + jsonhttptest.WithRequestBody(bytes.NewReader(socChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: cacChunk.Address()}), + ) + }) + + t.Run("invalid chunk type header", func(t *testing.T) { + client, _, _, _ := newTestServer(t, testServerOptions{ + Storer: mockstorer.New(), + Post: mockpost.New(mockpost.WithAcceptAll()), + }) + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusBadRequest, + jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr), + jsonhttptest.WithRequestHeader(api.SwarmChunkTypeHeader, "invalid"), + jsonhttptest.WithRequestBody(bytes.NewReader([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0xff})), + jsonhttptest.WithExpectedJSONResponse(jsonhttp.StatusResponse{ + Message: "invalid chunk type; expected 'soc' or 'cac'", + Code: http.StatusBadRequest, + }), + ) + }) + + t.Run("soc with pre-signed stamp", func(t *testing.T) { + var ( + mockSOC = testingsoc.GenerateMockSOC(t, []byte("stamped")) + socChunk = mockSOC.Chunk() + storerMock = mockstorer.New() + batchStore = mockbatchstore.New() + client, _, _, _ = newTestServer(t, testServerOptions{ + Storer: storerMock, + BatchStore: batchStore, + }) + ) + + key, _ := crypto.GenerateSecp256k1Key() + signer := crypto.NewDefaultSigner(key) + owner, _ := signer.EthereumAddress() + stamp := testingpostage.MustNewValidStamp(signer, mockSOC.Address()) + _ = batchStore.Save(&postage.Batch{ + ID: stamp.BatchID(), + Owner: owner.Bytes(), + }) + stampBytes, _ := stamp.MarshalBinary() + + go func() { <-storerMock.PusherFeed() }() + + jsonhttptest.Request(t, client, http.MethodPost, chunksEndpoint, http.StatusCreated, + jsonhttptest.WithRequestHeader(api.SwarmPostageStampHeader, hex.EncodeToString(stampBytes)), + jsonhttptest.WithRequestBody(bytes.NewReader(socChunk.Data())), + jsonhttptest.WithExpectedJSONResponse(api.ChunkAddressResponse{Reference: mockSOC.Address()}), + ) + }) +}