diff --git a/src/command/upload.ts b/src/command/upload.ts index e0c46c86..2c7de315 100644 --- a/src/command/upload.ts +++ b/src/command/upload.ts @@ -8,6 +8,12 @@ import { exit } from 'process' import { setCurlStore } from '../curl' import { pickStamp, printStamp } from '../service/stamp' import { fileExists, readStdin } from '../utils' +import { + ChunkedUploadProgress, + uploadChunkedData, + uploadChunkedFile, + uploadChunkedFolder, +} from '../utils/chunked-upload' import { CommandLineError } from '../utils/error' import { getMime } from '../utils/mime' import { stampProperties } from '../utils/option' @@ -120,6 +126,21 @@ export class Upload extends RootCommand implements LeafCommand { }) public redundancy!: string + @Option({ + key: 'chunked', + type: 'boolean', + description: 'Upload chunk-by-chunk with per-chunk retry for resilience', + }) + public chunked!: boolean + + @Option({ + key: 'chunked-retries', + type: 'number', + default: 3, + description: 'Max retries per chunk when --chunked is set', + }) + public chunkedRetries!: number + public stdinData!: Buffer public historyAddress: Optional = Optional.empty() @@ -132,6 +153,8 @@ export class Upload extends RootCommand implements LeafCommand { exit(1) } + this.assertChunkedCompatibility() + await this.maybePrintSyncWarning() if (!this.stdin && !FS.existsSync(this.path)) { @@ -204,6 +227,10 @@ export class Upload extends RootCommand implements LeafCommand { } private async uploadAnyWithSpinner(tag: Tag | undefined, isFolder: boolean): Promise { + if (this.chunked) { + return this.uploadAnyChunked(isFolder) + } + const spinner = createSpinner(this.path ? `Uploading ${this.path}...` : 'Uploading data from stdin...') if (this.verbosity !== VerbosityLevel.Quiet && !this.curl) { @@ -233,6 +260,94 @@ export class Upload extends RootCommand implements LeafCommand { } } + private async uploadAnyChunked(isFolder: boolean): Promise { + const progressBar = + this.verbosity !== VerbosityLevel.Quiet && !this.curl + ? new SingleBar({ clearOnComplete: true }, Presets.rect) + : null + + progressBar?.start(1, 0) + + const onProgress = ({ total, processed }: ChunkedUploadProgress) => { + if (!progressBar) { + return + } + progressBar.setTotal(Math.max(total, processed, 1)) + progressBar.update(processed) + } + + const onRetry = (attempt: number, error: unknown) => { + const message = error instanceof Error ? error.message : String(error) + this.console.verbose(`Chunk upload failed (attempt ${attempt}): ${message}. Retrying...`) + } + + const commonOptions = { + pin: this.pin, + deferred: this.deferred, + maxRetries: this.chunkedRetries, + onProgress, + onRetry, + } + + try { + if (this.stdin) { + const reference = await uploadChunkedData(this.bee, this.stamp, this.stdinData, commonOptions) + this.result = Optional.of(reference) + + return `${this.bee.url}/bytes/${reference.toHex()}` + } + + if (isFolder) { + const reference = await uploadChunkedFolder(this.bee, this.stamp, this.path, { + ...commonOptions, + indexDocument: this.indexDocument, + errorDocument: this.errorDocument, + }) + this.result = Optional.of(reference) + + return `${this.bee.url}/bzz/${reference.toHex()}/` + } + + const parsedPath = parse(this.path) + const name = this.determineFileName(parsedPath.base) + const contentType = this.contentType || getMime(this.path) || undefined + const reference = await uploadChunkedFile(this.bee, this.stamp, this.path, name, contentType, commonOptions) + this.result = Optional.of(reference) + + return name ? `${this.bee.url}/bzz/${reference.toHex()}/` : `${this.bee.url}/bytes/${reference.toHex()}` + } finally { + progressBar?.stop() + } + } + + private assertChunkedCompatibility(): void { + if (!this.chunked) { + return + } + + const conflicts: string[] = [] + + if (this.encrypt) { + conflicts.push('--encrypt') + } + + if (this.act) { + conflicts.push('--act') + } + + if (this.redundancy) { + conflicts.push('--redundancy') + } + + if (this.sync) { + conflicts.push('--sync') + } + + if (conflicts.length) { + throw new CommandLineError(`--chunked cannot be combined with: ${conflicts.join(', ')}`) + } + } + private async uploadStdin(tag?: Tag): Promise { if (this.fileName) { const contentType = this.contentType || getMime(this.fileName) || undefined diff --git a/src/utils/chunked-upload.ts b/src/utils/chunked-upload.ts new file mode 100644 index 00000000..d4cb8086 --- /dev/null +++ b/src/utils/chunked-upload.ts @@ -0,0 +1,232 @@ +import { Bee, MantarayNode, MerkleTree, Reference } from '@ethersphere/bee-js' +import { Chunk, System } from 'cafe-utility' +import * as FS from 'fs' +import { join, sep } from 'path' +import { getMime } from './mime' + +export interface ChunkedUploadProgress { + total: number + processed: number +} + +export interface ChunkedUploadOptions { + pin?: boolean + deferred?: boolean + maxRetries?: number + retryBaseDelayMs?: number + onProgress?: (progress: ChunkedUploadProgress) => void + onRetry?: (attempt: number, error: unknown) => void +} + +export interface ChunkedFolderOptions extends ChunkedUploadOptions { + indexDocument?: string + errorDocument?: string +} + +const NULL_REFERENCE_BYTES = new Uint8Array(32) + +async function uploadChunkWithRetry( + bee: Bee, + stamp: string, + chunkData: Uint8Array, + options: ChunkedUploadOptions, +): Promise { + const maxRetries = options.maxRetries ?? 3 + const baseDelay = options.retryBaseDelayMs ?? 500 + let lastError: unknown + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + await bee.uploadChunk(stamp, chunkData, { pin: options.pin, deferred: options.deferred }) + + return + } catch (error) { + lastError = error + + if (attempt < maxRetries) { + options.onRetry?.(attempt + 1, error) + await System.sleepMillis(baseDelay * 2 ** attempt) + } + } + } + + throw lastError +} + +function totalLeafChunks(size: number): number { + if (size === 0) { + return 1 + } + + return Math.ceil(size / 4096) +} + +async function streamFileToMerkleTree(filePath: string, onChunk: (chunk: Chunk) => Promise): Promise { + const tree = new MerkleTree(onChunk) + const readable = FS.createReadStream(filePath) + + for await (const data of readable) { + await tree.append(data as Uint8Array) + } + + return tree.finalize() +} + +export async function uploadChunkedData( + bee: Bee, + stamp: string, + data: Uint8Array, + options: ChunkedUploadOptions = {}, +): Promise { + let processed = 0 + const total = totalLeafChunks(data.length) + options.onProgress?.({ total, processed }) + + const onChunk = async (chunk: Chunk) => { + await uploadChunkWithRetry(bee, stamp, chunk.build(), options) + processed += 1 + options.onProgress?.({ total: Math.max(total, processed), processed }) + } + + const tree = new MerkleTree(onChunk) + await tree.append(data) + const root = await tree.finalize() + + return new Reference(root.hash()) +} + +export async function uploadChunkedFile( + bee: Bee, + stamp: string, + filePath: string, + name: string | undefined, + contentType: string | undefined, + options: ChunkedUploadOptions = {}, +): Promise { + const size = FS.statSync(filePath).size + let processed = 0 + const total = totalLeafChunks(size) + options.onProgress?.({ total, processed }) + + const onChunk = async (chunk: Chunk) => { + await uploadChunkWithRetry(bee, stamp, chunk.build(), options) + processed += 1 + options.onProgress?.({ total: Math.max(total, processed), processed }) + } + + const rootChunk = await streamFileToMerkleTree(filePath, onChunk) + const rootHash = rootChunk.hash() + + if (!name) { + return new Reference(rootHash) + } + + const mime = contentType || getMime(name) || 'application/octet-stream' + const mantaray = new MantarayNode() + mantaray.addFork(name, rootHash, { + 'Content-Type': mime, + Filename: name, + }) + mantaray.addFork('/', NULL_REFERENCE_BYTES, { + 'website-index-document': name, + }) + + const result = await mantaray.saveRecursively(bee, stamp, { + pin: options.pin, + deferred: options.deferred, + }) + + return result.reference +} + +interface CollectedFile { + fsPath: string + relPath: string + size: number +} + +function collectFiles(root: string, current: string, out: CollectedFile[]): void { + const absolute = current ? join(root, current) : root + const entries = FS.readdirSync(absolute, { withFileTypes: true }) + + for (const entry of entries) { + const relPath = current ? join(current, entry.name) : entry.name + const fsPath = join(absolute, entry.name) + + if (entry.isDirectory()) { + collectFiles(root, relPath, out) + } else if (entry.isFile()) { + out.push({ fsPath, relPath, size: FS.statSync(fsPath).size }) + } + } +} + +export async function uploadChunkedFolder( + bee: Bee, + stamp: string, + dirPath: string, + options: ChunkedFolderOptions = {}, +): Promise { + const files: CollectedFile[] = [] + collectFiles(dirPath, '', files) + + if (files.length === 0) { + throw new Error(`No files found in directory: ${dirPath}`) + } + + let total = files.reduce((sum, file) => sum + totalLeafChunks(file.size), 0) + let processed = 0 + options.onProgress?.({ total, processed }) + + const onChunk = async (chunk: Chunk) => { + await uploadChunkWithRetry(bee, stamp, chunk.build(), options) + processed += 1 + options.onProgress?.({ total: Math.max(total, processed), processed }) + } + + const mantaray = new MantarayNode() + let hasIndexHtml = false + + for (const file of files) { + const manifestPath = file.relPath.split(sep).join('/') + const rootChunk = await streamFileToMerkleTree(file.fsPath, onChunk) + const mime = getMime(manifestPath) || 'application/octet-stream' + const filename = manifestPath.includes('/') + ? manifestPath.substring(manifestPath.lastIndexOf('/') + 1) + : manifestPath + + mantaray.addFork(manifestPath, rootChunk.hash(), { + 'Content-Type': mime, + Filename: filename, + }) + + if (manifestPath === 'index.html') { + hasIndexHtml = true + } + } + + if (hasIndexHtml || options.indexDocument || options.errorDocument) { + const metadata: Record = {} + + if (options.indexDocument) { + metadata['website-index-document'] = options.indexDocument + } else if (hasIndexHtml) { + metadata['website-index-document'] = 'index.html' + } + + if (options.errorDocument) { + metadata['website-error-document'] = options.errorDocument + } + mantaray.addFork('/', NULL_REFERENCE_BYTES, metadata) + } + + const result = await mantaray.saveRecursively(bee, stamp, { + pin: options.pin, + deferred: options.deferred, + }) + + total = Math.max(total, processed) + options.onProgress?.({ total, processed }) + + return result.reference +} diff --git a/test/command/upload.spec.ts b/test/command/upload.spec.ts index 78273b74..38c5a102 100644 --- a/test/command/upload.spec.ts +++ b/test/command/upload.spec.ts @@ -174,4 +174,43 @@ describeCommand('Test Upload command', ({ consoleMessages, hasMessageContaining await invokeTestCli(['upload', 'test/message.txt', ...getStampOption()]) expect(consoleMessages[0]).toContain('Swarm hash') }) + + it('should upload file with --chunked', async () => { + const commandBuilder = await invokeTestCli(['upload', 'README.md', '--chunked', ...getStampOption()]) + const uploadCommand = commandBuilder.runnable as Upload + expect(uploadCommand.result.getOrThrow().toHex()).toHaveLength(64) + }) + + it('should upload file with --chunked and --drop-name', async () => { + const commandBuilder = await invokeTestCli(['upload', 'README.md', '--chunked', '--drop-name', ...getStampOption()]) + const uploadCommand = commandBuilder.runnable as Upload + expect(uploadCommand.result.getOrThrow().toHex()).toHaveLength(64) + expect(hasMessageContaining('/bytes/')).toBeTruthy() + }) + + it('should upload folder with --chunked', async () => { + const commandBuilder = await invokeTestCli(['upload', 'test/testpage', '--chunked', ...getStampOption()]) + const uploadCommand = commandBuilder.runnable as Upload + expect(uploadCommand.result.getOrThrow().toHex()).toHaveLength(64) + }) + + it('should reject --chunked with --encrypt', async () => { + await invokeTestCli(['upload', 'README.md', '--chunked', '--encrypt', ...getStampOption()]) + expect(hasMessageContaining('--chunked cannot be combined with: --encrypt')).toBeTruthy() + }) + + it('should reject --chunked with --act', async () => { + await invokeTestCli(['upload', 'README.md', '--chunked', '--act', ...getStampOption()]) + expect(hasMessageContaining('--chunked cannot be combined with: --act')).toBeTruthy() + }) + + it('should reject --chunked with --sync', async () => { + await invokeTestCli(['upload', 'README.md', '--chunked', '--sync', ...getStampOption()]) + expect(hasMessageContaining('--chunked cannot be combined with: --sync')).toBeTruthy() + }) + + it('should reject --chunked with --redundancy', async () => { + await invokeTestCli(['upload', 'README.md', '--chunked', '--redundancy', 'MEDIUM', ...getStampOption()]) + expect(hasMessageContaining('--chunked cannot be combined with: --redundancy')).toBeTruthy() + }) })