diff --git a/packages/dappmanager/src/calls/ipfsClientTargetSet.ts b/packages/dappmanager/src/calls/ipfsClientTargetSet.ts index f900ce0bb0..af0991e667 100644 --- a/packages/dappmanager/src/calls/ipfsClientTargetSet.ts +++ b/packages/dappmanager/src/calls/ipfsClientTargetSet.ts @@ -35,7 +35,7 @@ async function changeIpfsClient(nextTarget: IpfsClientTarget, nextGateway?: stri dappnodeInstaller.changeIpfsGatewayUrl(params.IPFS_LOCAL); } else { // Set new values in db - db.ipfsGateway.set(nextGateway || params.IPFS_GATEWAY); + db.ipfsGateway.set(nextGateway || params.IPFS_REMOTE); db.ipfsClientTarget.set(IpfsClientTarget.remote); // Change IPFS host diff --git a/packages/installer/src/installer/downloadImages.ts b/packages/installer/src/installer/downloadImages.ts index f3197f34e5..ec98c20942 100644 --- a/packages/installer/src/installer/downloadImages.ts +++ b/packages/installer/src/installer/downloadImages.ts @@ -16,37 +16,35 @@ export async function downloadImages( packagesData: InstallPackageData[], log: Log ): Promise { - await Promise.all( - packagesData.map(async function (pkg) { - const { dnpName, semVersion, isCore, imageFile, imagePath } = pkg; - log(dnpName, "Starting download..."); - - function onProgress(progress: number): void { - let message = `Downloading ${progress}%`; - if (progress > 100) message += ` (expected ${imageFile.size} bytes)`; - log(dnpName, message); - } - - try { - await getImage(dappnodeInstaller, imageFile, imagePath, onProgress); - } catch (e) { - e.message = `Can't download ${dnpName} image: ${e.message}`; - throw e; // Use this format to keep the stack trace - } - - // Do not throw for core packages - log(dnpName, "Verifying download..."); - try { - await verifyDockerImage({ imagePath, dnpName, version: semVersion }); - } catch (e) { - const errorMessage = `Error verifying image: ${e.message}`; - if (isCore) logs.error(errorMessage); - else throw Error(errorMessage); - } - - log(dnpName, "Package downloaded"); - }) - ); + for (const pkg of packagesData) { + const { dnpName, semVersion, isCore, imageFile, imagePath } = pkg; + log(dnpName, "Starting download..."); + + function onProgress(progress: number): void { + let message = `Downloading ${progress}%`; + if (progress > 100) message += ` (expected ${imageFile.size} bytes)`; + log(dnpName, message); + } + + try { + await getImage(dappnodeInstaller, imageFile, imagePath, onProgress); + } catch (e) { + e.message = `Can't download ${dnpName} image: ${e.message}`; + throw e; // Use this format to keep the stack trace + } + + // Do not throw for core packages + log(dnpName, "Verifying download..."); + try { + await verifyDockerImage({ imagePath, dnpName, version: semVersion }); + } catch (e) { + const errorMessage = `Error verifying image: ${e.message}`; + if (isCore) logs.error(errorMessage); + else throw Error(errorMessage); + } + + log(dnpName, "Package downloaded"); + } } /** @@ -84,7 +82,7 @@ export async function getImage( progress: (n: number) => void ): Promise { // Validate parameters - if (!path || path.startsWith("/ipfs/") || !isAbsolute("/")) throw Error(`Invalid path: "${path}"`); + if (!path || path.startsWith("/ipfs/") || !isAbsolute(path)) throw Error(`Invalid path: "${path}"`); validatePath(path); // Check if cache exist and validate it diff --git a/packages/toolkit/src/repository/repository.ts b/packages/toolkit/src/repository/repository.ts index b0fe9a2bed..1942151a6f 100644 --- a/packages/toolkit/src/repository/repository.ts +++ b/packages/toolkit/src/repository/repository.ts @@ -41,6 +41,10 @@ const source = "ipfs" as const; export class DappnodeRepository extends ApmRepository { protected gatewayUrl: string; protected localIpfsUrl = "http://ipfs.dappnode:5001"; + protected localIpfsGatewayUrl = "http://ipfs.dappnode:8080"; + protected downloadAttempts = 3; + protected downloadRetryDelayMs = 1500; + protected downloadTimeoutMs = 2 * 60 * 1000; /** * Constructs an instance of DappnodeRepository @@ -258,27 +262,28 @@ export class DappnodeRepository extends ApmRepository { * @see catCarReaderToMemory */ public async writeFileToMemory(hash: string, maxLength?: number): Promise { - const chunks: Uint8Array[] = []; - const { carReader, root } = await this.getAndVerifyContentFromGateway(hash); - const content = await this.unpackCarReader(carReader, root); - for await (const chunk of content) chunks.push(chunk); - - // Concatenate the chunks into a single Uint8Array - let totalLength = 0; - chunks.forEach((chunk) => (totalLength += chunk.length)); - const buffer = new Uint8Array(totalLength); - let offset = 0; - chunks.forEach((chunk) => { - buffer.set(chunk, offset); - offset += chunk.length; + return this.withGatewayRetries(hash, async ({ carReader, root }) => { + const chunks: Uint8Array[] = []; + const content = await this.unpackCarReader(carReader, root); + for await (const chunk of content) chunks.push(chunk); + + // Concatenate the chunks into a single Uint8Array + let totalLength = 0; + chunks.forEach((chunk) => (totalLength += chunk.length)); + const buffer = new Uint8Array(totalLength); + let offset = 0; + chunks.forEach((chunk) => { + buffer.set(chunk, offset); + offset += chunk.length; + }); + + if (maxLength && buffer.length >= maxLength) throw Error(`Maximum size ${maxLength} bytes exceeded`); + + // Convert the Uint8Array to a string + // TODO: This assumes the data is UTF-8 encoded. If it's not, you will need a more complex conversion. Research which encoding is used by IPFS. + const decoder = new TextDecoder("utf-8"); + return decoder.decode(buffer); }); - - if (maxLength && buffer.length >= maxLength) throw Error(`Maximum size ${maxLength} bytes exceeded`); - - // Convert the Uint8Array to a string - // TODO: This assumes the data is UTF-8 encoded. If it's not, you will need a more complex conversion. Research which encoding is used by IPFS. - const decoder = new TextDecoder("utf-8"); - return decoder.decode(buffer); } /** @@ -309,66 +314,25 @@ export class DappnodeRepository extends ApmRepository { fileSize?: number; progress?: (n: number) => void; }): Promise { - const { carReader, root } = await this.getAndVerifyContentFromGateway(hash); - const readable = await this.unpackCarReader(carReader, root); - - return new Promise((resolve, reject) => { - async function handleDownload(): Promise { - if (!_path || _path.startsWith("/ipfs/") || !path.isAbsolute("/")) reject(Error(`Invalid path: "${path}"`)); - - const asyncIterableArray: Uint8Array[] = []; - - // Timeout cancel mechanism - const timeoutToCancel = setTimeout( - () => { - reject(Error(`Timeout downloading ${hash}`)); - }, - timeout || 30 * 1000 - ); - - let totalData = 0; - let previousProgress = -1; - const resolution = 1; - const round = (n: number): number => resolution * Math.round((100 * n) / resolution); - - const onData = (chunk: Uint8Array): void => { - clearTimeout(timeoutToCancel); - totalData += chunk.length; - asyncIterableArray.push(chunk); - if (progress && fileSize) { - const currentProgress = round(totalData / fileSize); - if (currentProgress !== previousProgress) { - progress(currentProgress); - previousProgress = currentProgress; - } - } - }; - - const onFinish = (): void => { - clearTimeout(timeoutToCancel); - resolve(); - }; + if (!_path || _path.startsWith("/ipfs/") || !path.isAbsolute(_path)) throw Error(`Invalid path: "${_path}"`); + await fs.promises.mkdir(path.dirname(_path), { recursive: true }); - const onError = - (streamId: string) => - (err: Error): void => { - clearTimeout(timeoutToCancel); - reject(Error(streamId + ": " + err)); - }; - - try { - for await (const chunk of readable) onData(chunk); - - const writable = fs.createWriteStream(_path); - await util.promisify(stream.pipeline)(stream.Readable.from(asyncIterableArray), writable); - onFinish(); - } catch (e) { - onError("Error writing to fs")(e as Error); - } - } - - handleDownload().catch((error) => reject(error)); - }); + try { + await this.withGatewayRetries(hash, async ({ carReader, root }) => { + const readable = await this.unpackCarReader(carReader, root); + await this.writeReadableToFs({ + hash, + readable, + path: _path, + timeout: timeout ?? this.downloadTimeoutMs, + fileSize, + progress + }); + }); + } catch (e) { + await fs.promises.rm(_path, { force: true }); + throw e; + } } /** @@ -420,20 +384,20 @@ export class DappnodeRepository extends ApmRepository { * @returns The content as a CAR reader and the root CID. * @throws Error when the root CID does not match the provided hash (content is untrusted). */ - private async getAndVerifyContentFromGateway(hash: string): Promise<{ + private async getAndVerifyContentFromGateway(hash: string, gatewayUrl: string): Promise<{ carReader: CarReader; root: CID; }> { // 1. Download the CAR - const url = `${this.gatewayUrl}/ipfs/${hash}?format=car`; + const url = `${gatewayUrl}/ipfs/${hash}?format=car`; const res = await fetch(url, { headers: { Accept: "application/vnd.ipld.car" } }); if (!res.ok) throw new Error(`Gateway error: ${res.status} ${res.statusText}`); // 2. Parse into a CarReader - const bytes = new Uint8Array(await res.arrayBuffer()); - const carReader = await CarReader.fromBytes(bytes); + if (!res.body) throw Error("Gateway response body is empty"); + const carReader = await CarReader.fromIterable(res.body as unknown as AsyncIterable); // 3. Verify the root CID const roots = await carReader.getRoots(); @@ -445,6 +409,120 @@ export class DappnodeRepository extends ApmRepository { return { carReader, root }; } + private async withGatewayRetries( + hash: string, + fn: (content: { carReader: CarReader; root: CID }) => Promise + ): Promise { + const gatewayCandidates = this.getGatewayCandidates(); + let lastError: Error | undefined; + + for (const gatewayUrl of gatewayCandidates) { + for (let attempt = 1; attempt <= this.downloadAttempts; attempt++) { + try { + const content = await this.getAndVerifyContentFromGateway(hash, gatewayUrl); + return await fn(content); + } catch (e) { + const error = this.normalizeError(e); + lastError = new Error(`[${gatewayUrl}] ${error.message}`); + if (!this.isRetryableDownloadError(error) || attempt >= this.downloadAttempts) break; + await this.sleep(this.downloadRetryDelayMs * attempt); + } + } + } + + throw lastError || new Error(`Unable to download ${hash}`); + } + + private getGatewayCandidates(): string[] { + return [...new Set([this.gatewayUrl, this.localIpfsGatewayUrl])]; + } + + private normalizeError(error: unknown): Error { + if (error instanceof Error) return error; + return new Error(String(error)); + } + + private isRetryableDownloadError(error: Error): boolean { + const message = error.message || ""; + if ( + message.includes("UNTRUSTED CONTENT") || + message.includes("Invalid path") || + message.includes("Maximum size") + ) { + return false; + } + + const gatewayStatusMatch = message.match(/Gateway error:\s(\d{3})/); + if (gatewayStatusMatch) { + const status = Number(gatewayStatusMatch[1]); + return status === 408 || status === 425 || status === 429 || status >= 500; + } + + return true; + } + + private sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); + } + + private async writeReadableToFs({ + hash, + readable, + path: filePath, + timeout, + fileSize, + progress + }: { + hash: string; + readable: AsyncIterable; + path: string; + timeout: number; + fileSize?: number; + progress?: (n: number) => void; + }): Promise { + const writable = fs.createWriteStream(filePath, { flags: "w" }); + let timeoutToCancel: NodeJS.Timeout | undefined; + let totalData = 0; + let previousProgress = -1; + const resolution = 1; + const round = (n: number): number => resolution * Math.round((100 * n) / resolution); + + const onTimeout = (): void => { + const timeoutError = Error(`Timeout downloading ${hash}`); + writable.destroy(timeoutError); + progressTracker.destroy(timeoutError); + }; + + const resetTimeout = (): void => { + clearTimeout(timeoutToCancel); + timeoutToCancel = setTimeout(onTimeout, timeout); + }; + + const progressTracker = new stream.Transform({ + transform(chunk: Uint8Array, _encoding: BufferEncoding, callback: stream.TransformCallback): void { + resetTimeout(); + totalData += chunk.length; + if (progress && fileSize) { + const currentProgress = round(totalData / fileSize); + if (currentProgress !== previousProgress) { + progress(currentProgress); + previousProgress = currentProgress; + } + } + callback(null, chunk); + } + }); + + try { + resetTimeout(); + await util.promisify(stream.pipeline)(stream.Readable.from(readable), progressTracker, writable); + clearTimeout(timeoutToCancel); + } catch (e) { + clearTimeout(timeoutToCancel); + throw Error(`Error writing to fs: ${this.normalizeError(e).message}`); + } + } + /** * Unpacks a CAR reader and returns an async iterable of uint8arrays. *