Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/dappmanager/src/calls/ipfsClientTargetSet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async function changeIpfsClient(nextTarget: IpfsClientTarget, nextGateway?: stri
dappnodeInstaller.changeIpfsGatewayUrl(params.IPFS_LOCAL);
} else {
// Set new values in db
db.ipfsGateway.set(nextGateway || params.IPFS_GATEWAY);
db.ipfsGateway.set(nextGateway || params.IPFS_REMOTE);
db.ipfsClientTarget.set(IpfsClientTarget.remote);

// Change IPFS host
Expand Down
62 changes: 30 additions & 32 deletions packages/installer/src/installer/downloadImages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,35 @@ export async function downloadImages(
packagesData: InstallPackageData[],
log: Log
): Promise<void> {
await Promise.all(
packagesData.map(async function (pkg) {
const { dnpName, semVersion, isCore, imageFile, imagePath } = pkg;
log(dnpName, "Starting download...");

function onProgress(progress: number): void {
let message = `Downloading ${progress}%`;
if (progress > 100) message += ` (expected ${imageFile.size} bytes)`;
log(dnpName, message);
}

try {
await getImage(dappnodeInstaller, imageFile, imagePath, onProgress);
} catch (e) {
e.message = `Can't download ${dnpName} image: ${e.message}`;
throw e; // Use this format to keep the stack trace
}

// Do not throw for core packages
log(dnpName, "Verifying download...");
try {
await verifyDockerImage({ imagePath, dnpName, version: semVersion });
} catch (e) {
const errorMessage = `Error verifying image: ${e.message}`;
if (isCore) logs.error(errorMessage);
else throw Error(errorMessage);
}

log(dnpName, "Package downloaded");
})
);
for (const pkg of packagesData) {
const { dnpName, semVersion, isCore, imageFile, imagePath } = pkg;
log(dnpName, "Starting download...");

function onProgress(progress: number): void {
let message = `Downloading ${progress}%`;
if (progress > 100) message += ` (expected ${imageFile.size} bytes)`;
log(dnpName, message);
}

try {
await getImage(dappnodeInstaller, imageFile, imagePath, onProgress);
} catch (e) {
e.message = `Can't download ${dnpName} image: ${e.message}`;
throw e; // Use this format to keep the stack trace
}

// Do not throw for core packages
log(dnpName, "Verifying download...");
try {
await verifyDockerImage({ imagePath, dnpName, version: semVersion });
} catch (e) {
const errorMessage = `Error verifying image: ${e.message}`;
if (isCore) logs.error(errorMessage);
else throw Error(errorMessage);
}

log(dnpName, "Package downloaded");
}
}

/**
Expand Down Expand Up @@ -84,7 +82,7 @@ export async function getImage(
progress: (n: number) => void
): Promise<void> {
// Validate parameters
if (!path || path.startsWith("/ipfs/") || !isAbsolute("/")) throw Error(`Invalid path: "${path}"`);
if (!path || path.startsWith("/ipfs/") || !isAbsolute(path)) throw Error(`Invalid path: "${path}"`);
validatePath(path);

// Check if cache exist and validate it
Expand Down
244 changes: 161 additions & 83 deletions packages/toolkit/src/repository/repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ const source = "ipfs" as const;
export class DappnodeRepository extends ApmRepository {
protected gatewayUrl: string;
protected localIpfsUrl = "http://ipfs.dappnode:5001";
protected localIpfsGatewayUrl = "http://ipfs.dappnode:8080";
protected downloadAttempts = 3;
protected downloadRetryDelayMs = 1500;
protected downloadTimeoutMs = 2 * 60 * 1000;

/**
* Constructs an instance of DappnodeRepository
Expand Down Expand Up @@ -258,27 +262,28 @@ export class DappnodeRepository extends ApmRepository {
* @see catCarReaderToMemory
*/
public async writeFileToMemory(hash: string, maxLength?: number): Promise<string> {
const chunks: Uint8Array[] = [];
const { carReader, root } = await this.getAndVerifyContentFromGateway(hash);
const content = await this.unpackCarReader(carReader, root);
for await (const chunk of content) chunks.push(chunk);

// Concatenate the chunks into a single Uint8Array
let totalLength = 0;
chunks.forEach((chunk) => (totalLength += chunk.length));
const buffer = new Uint8Array(totalLength);
let offset = 0;
chunks.forEach((chunk) => {
buffer.set(chunk, offset);
offset += chunk.length;
return this.withGatewayRetries(hash, async ({ carReader, root }) => {
const chunks: Uint8Array[] = [];
const content = await this.unpackCarReader(carReader, root);
for await (const chunk of content) chunks.push(chunk);

// Concatenate the chunks into a single Uint8Array
let totalLength = 0;
chunks.forEach((chunk) => (totalLength += chunk.length));
const buffer = new Uint8Array(totalLength);
let offset = 0;
chunks.forEach((chunk) => {
buffer.set(chunk, offset);
offset += chunk.length;
});

if (maxLength && buffer.length >= maxLength) throw Error(`Maximum size ${maxLength} bytes exceeded`);

// Convert the Uint8Array to a string
// TODO: This assumes the data is UTF-8 encoded. If it's not, you will need a more complex conversion. Research which encoding is used by IPFS.
const decoder = new TextDecoder("utf-8");
return decoder.decode(buffer);
});

if (maxLength && buffer.length >= maxLength) throw Error(`Maximum size ${maxLength} bytes exceeded`);

// Convert the Uint8Array to a string
// TODO: This assumes the data is UTF-8 encoded. If it's not, you will need a more complex conversion. Research which encoding is used by IPFS.
const decoder = new TextDecoder("utf-8");
return decoder.decode(buffer);
}

/**
Expand Down Expand Up @@ -309,66 +314,25 @@ export class DappnodeRepository extends ApmRepository {
fileSize?: number;
progress?: (n: number) => void;
}): Promise<void> {
const { carReader, root } = await this.getAndVerifyContentFromGateway(hash);
const readable = await this.unpackCarReader(carReader, root);

return new Promise((resolve, reject) => {
async function handleDownload(): Promise<void> {
if (!_path || _path.startsWith("/ipfs/") || !path.isAbsolute("/")) reject(Error(`Invalid path: "${path}"`));

const asyncIterableArray: Uint8Array[] = [];

// Timeout cancel mechanism
const timeoutToCancel = setTimeout(
() => {
reject(Error(`Timeout downloading ${hash}`));
},
timeout || 30 * 1000
);

let totalData = 0;
let previousProgress = -1;
const resolution = 1;
const round = (n: number): number => resolution * Math.round((100 * n) / resolution);

const onData = (chunk: Uint8Array): void => {
clearTimeout(timeoutToCancel);
totalData += chunk.length;
asyncIterableArray.push(chunk);
if (progress && fileSize) {
const currentProgress = round(totalData / fileSize);
if (currentProgress !== previousProgress) {
progress(currentProgress);
previousProgress = currentProgress;
}
}
};

const onFinish = (): void => {
clearTimeout(timeoutToCancel);
resolve();
};
if (!_path || _path.startsWith("/ipfs/") || !path.isAbsolute(_path)) throw Error(`Invalid path: "${_path}"`);
await fs.promises.mkdir(path.dirname(_path), { recursive: true });

const onError =
(streamId: string) =>
(err: Error): void => {
clearTimeout(timeoutToCancel);
reject(Error(streamId + ": " + err));
};

try {
for await (const chunk of readable) onData(chunk);

const writable = fs.createWriteStream(_path);
await util.promisify(stream.pipeline)(stream.Readable.from(asyncIterableArray), writable);
onFinish();
} catch (e) {
onError("Error writing to fs")(e as Error);
}
}

handleDownload().catch((error) => reject(error));
});
try {
await this.withGatewayRetries(hash, async ({ carReader, root }) => {
const readable = await this.unpackCarReader(carReader, root);
await this.writeReadableToFs({
hash,
readable,
path: _path,
timeout: timeout ?? this.downloadTimeoutMs,
fileSize,
progress
});
});
} catch (e) {
await fs.promises.rm(_path, { force: true });
throw e;
}
}

/**
Expand Down Expand Up @@ -420,20 +384,20 @@ export class DappnodeRepository extends ApmRepository {
* @returns The content as a CAR reader and the root CID.
* @throws Error when the root CID does not match the provided hash (content is untrusted).
*/
private async getAndVerifyContentFromGateway(hash: string): Promise<{
private async getAndVerifyContentFromGateway(hash: string, gatewayUrl: string): Promise<{
carReader: CarReader;
root: CID;
}> {
// 1. Download the CAR
const url = `${this.gatewayUrl}/ipfs/${hash}?format=car`;
const url = `${gatewayUrl}/ipfs/${hash}?format=car`;
const res = await fetch(url, {
headers: { Accept: "application/vnd.ipld.car" }
});
if (!res.ok) throw new Error(`Gateway error: ${res.status} ${res.statusText}`);

// 2. Parse into a CarReader
const bytes = new Uint8Array(await res.arrayBuffer());
const carReader = await CarReader.fromBytes(bytes);
if (!res.body) throw Error("Gateway response body is empty");
const carReader = await CarReader.fromIterable(res.body as unknown as AsyncIterable<Uint8Array>);

// 3. Verify the root CID
const roots = await carReader.getRoots();
Expand All @@ -445,6 +409,120 @@ export class DappnodeRepository extends ApmRepository {
return { carReader, root };
}

private async withGatewayRetries<T>(
hash: string,
fn: (content: { carReader: CarReader; root: CID }) => Promise<T>
): Promise<T> {
const gatewayCandidates = this.getGatewayCandidates();
let lastError: Error | undefined;

for (const gatewayUrl of gatewayCandidates) {
for (let attempt = 1; attempt <= this.downloadAttempts; attempt++) {
try {
const content = await this.getAndVerifyContentFromGateway(hash, gatewayUrl);
return await fn(content);
} catch (e) {
const error = this.normalizeError(e);
lastError = new Error(`[${gatewayUrl}] ${error.message}`);
if (!this.isRetryableDownloadError(error) || attempt >= this.downloadAttempts) break;
await this.sleep(this.downloadRetryDelayMs * attempt);
}
}
}

throw lastError || new Error(`Unable to download ${hash}`);
}

private getGatewayCandidates(): string[] {
return [...new Set([this.gatewayUrl, this.localIpfsGatewayUrl])];
}

private normalizeError(error: unknown): Error {
if (error instanceof Error) return error;
return new Error(String(error));
}

private isRetryableDownloadError(error: Error): boolean {
const message = error.message || "";
if (
message.includes("UNTRUSTED CONTENT") ||
message.includes("Invalid path") ||
message.includes("Maximum size")
) {
return false;
}

const gatewayStatusMatch = message.match(/Gateway error:\s(\d{3})/);
if (gatewayStatusMatch) {
const status = Number(gatewayStatusMatch[1]);
return status === 408 || status === 425 || status === 429 || status >= 500;
}

return true;
}

private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

private async writeReadableToFs({
hash,
readable,
path: filePath,
timeout,
fileSize,
progress
}: {
hash: string;
readable: AsyncIterable<Uint8Array>;
path: string;
timeout: number;
fileSize?: number;
progress?: (n: number) => void;
}): Promise<void> {
const writable = fs.createWriteStream(filePath, { flags: "w" });
let timeoutToCancel: NodeJS.Timeout | undefined;
let totalData = 0;
let previousProgress = -1;
const resolution = 1;
const round = (n: number): number => resolution * Math.round((100 * n) / resolution);

const onTimeout = (): void => {
const timeoutError = Error(`Timeout downloading ${hash}`);
writable.destroy(timeoutError);
progressTracker.destroy(timeoutError);
};

const resetTimeout = (): void => {
clearTimeout(timeoutToCancel);
timeoutToCancel = setTimeout(onTimeout, timeout);
};

const progressTracker = new stream.Transform({
transform(chunk: Uint8Array, _encoding: BufferEncoding, callback: stream.TransformCallback): void {
resetTimeout();
totalData += chunk.length;
if (progress && fileSize) {
const currentProgress = round(totalData / fileSize);
if (currentProgress !== previousProgress) {
progress(currentProgress);
previousProgress = currentProgress;
}
}
callback(null, chunk);
}
});

try {
resetTimeout();
await util.promisify(stream.pipeline)(stream.Readable.from(readable), progressTracker, writable);
clearTimeout(timeoutToCancel);
} catch (e) {
clearTimeout(timeoutToCancel);
throw Error(`Error writing to fs: ${this.normalizeError(e).message}`);
}
}

/**
* Unpacks a CAR reader and returns an async iterable of uint8arrays.
*
Expand Down
Loading