diff --git a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts index 08f0bb5d38..c5889af82b 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.spec.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.spec.ts @@ -15,8 +15,8 @@ class TestDynamicDsService extends DynamicDsService { let datasourceParams: DatasourceParams[] = initData; @@ -40,7 +41,7 @@ const mockMetadata = (initData: DatasourceParams[] = []) => { describe('DynamicDsService', () => { let service: TestDynamicDsService; const project = { - templates: [{name: 'Test'}], + templates: [{name: 'Test'}, {name: 'Other'}], } as any as ISubqueryProject; beforeEach(() => { @@ -70,6 +71,69 @@ describe('DynamicDsService', () => { ]); }); + it('can destroy a dynamic datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + // Destroy specific datasource by index + await service.destroyDynamicDatasource('Test', 50, 0); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBe(50); + }); + + it('throws error when destroying non-existent datasource', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('NonExistent', 50, 0)).rejects.toThrow( + 'Datasource at index 0 has template name "Test", not "NonExistent"' + ); + }); + + it('throws error when destroying already destroyed datasource', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'Dynamic datasource at index 0 is already destroyed' + ); + }); + + it('allows creating new datasource after destroying existing one', async () => { + const meta = mockMetadata([testParam1]); + await service.init(meta); + + expect((service as any)._datasourceParams).toEqual([testParam1]); + + // Destroy by index + await service.destroyDynamicDatasource('Test', 50, 0); + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 50}); + + const newParam = {templateName: 'Test', startBlock: 60}; + await service.createDynamicDatasource(newParam); + + const finalParams: DatasourceParams[] = (service as any)._datasourceParams; + const destroyedCount = finalParams.filter((p: DatasourceParams) => p.endBlock !== undefined).length; + const activeCount = finalParams.filter((p: DatasourceParams) => p.endBlock === undefined).length; + + expect(destroyedCount).toBeGreaterThanOrEqual(1); + expect(activeCount).toBeGreaterThanOrEqual(1); + + const destroyedParam = finalParams.find((p: DatasourceParams) => p.startBlock === 1 && p.endBlock === 50); + expect(destroyedParam).toBeDefined(); + + const newParamFound = finalParams.find((p: DatasourceParams) => p.startBlock === 60 && !p.endBlock); + expect(newParamFound).toBeDefined(); + }); + it('resets dynamic datasources', async () => { const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); await service.init(meta); @@ -83,6 +147,26 @@ describe('DynamicDsService', () => { ]); }); + it('handles reset after datasource destruction correctly', async () => { + const params = [testParam1, testParam2, testParam3, testParam4]; + const meta = mockMetadata(params); + await service.init(meta); + + // Destroy only the first datasource by index + await service.destroyDynamicDatasource('Test', 25, 0); + + const paramsAfterDestroy = (service as any)._datasourceParams; + expect(paramsAfterDestroy[0]).toEqual({...testParam1, endBlock: 25}); + + // Reset to block 2 (should keep testParam1 and testParam2) + await service.resetDynamicDatasource(2, null as any); + + const paramsAfterReset = (service as any)._datasourceParams; + expect(paramsAfterReset).toHaveLength(2); + expect(paramsAfterReset[0]).toEqual({...testParam1, endBlock: 25}); + expect(paramsAfterReset[1]).toEqual(testParam2); + }); + it('getDynamicDatasources with force reloads from metadata', async () => { const meta = mockMetadata([testParam1, testParam2]); await service.init(meta); @@ -107,6 +191,30 @@ describe('DynamicDsService', () => { ]); }); + it('loads destroyed datasources with endBlock correctly', async () => { + const destroyedParam = {...testParam1, endBlock: 100}; + const meta = mockMetadata([destroyedParam, testParam2]); + await service.init(meta); + + const datasources = await service.getDynamicDatasources(); + expect(datasources).toHaveLength(2); + expect(datasources[0].endBlock).toBe(100); + expect(datasources[1].endBlock).toBeUndefined(); + }); + + it('updates metadata correctly when destroying datasource', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + // Destroy first datasource by index + await service.destroyDynamicDatasource('Test', 75, 0); + + const metadataParams = await meta.find('dynamicDatasources'); + expect(metadataParams).toBeDefined(); + expect(metadataParams![0]).toEqual({...testParam1, endBlock: 75}); + expect(metadataParams![1]).toEqual(testParam2); + }); + it('can find a template and cannot mutate the template', () => { const template1 = service.getTemplate('Test', 1); const template2 = service.getTemplate('Test', 2); @@ -119,4 +227,306 @@ describe('DynamicDsService', () => { expect(project.templates![0]).toEqual({name: 'Test'}); }); + + it('can create template with endBlock', () => { + const template = service.getTemplate('Test', 1, 100); + + expect(template.startBlock).toBe(1); + expect(template.endBlock).toBe(100); + expect((template as any).name).toBeUndefined(); + }); + + it('handles multiple templates with same name during destruction', async () => { + const param1 = {templateName: 'Test', startBlock: 1}; + const param2 = {templateName: 'Test', startBlock: 5}; + const param3 = {templateName: 'Other', startBlock: 3}; + + const meta = mockMetadata([param1, param2, param3]); + await service.init(meta); + + // Should destroy the first matching one by index + await service.destroyDynamicDatasource('Test', 10, 0); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...param1, endBlock: 10}); + expect(updatedParams[1]).toEqual(param2); // Not destroyed + expect(updatedParams[2]).toEqual(param3); // Not destroyed + }); + + it('throws error when service not initialized for destruction', async () => { + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'DynamicDsService has not been initialized' + ); + }); + + describe('getDynamicDatasourcesByTemplate', () => { + it('returns list of active datasources for a template', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + const testDatasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(testDatasources).toHaveLength(3); + expect(testDatasources[0]).toEqual({ + index: 0, + templateName: 'Test', + startBlock: 1, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[1]).toEqual({ + index: 1, + templateName: 'Test', + startBlock: 2, + endBlock: undefined, + args: undefined, + }); + expect(testDatasources[2]).toEqual({ + index: 2, + templateName: 'Test', + startBlock: 3, + endBlock: undefined, + args: undefined, + }); + }); + + it('excludes destroyed datasources from list', async () => { + const destroyedParam = {...testParam1, endBlock: 50}; + const meta = mockMetadata([destroyedParam, testParam2, testParam3]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(2); + expect(datasources[0].index).toBe(1); // Global index + expect(datasources[0].startBlock).toBe(2); + expect(datasources[1].index).toBe(2); // Global index + expect(datasources[1].startBlock).toBe(3); + }); + + it('returns empty array when no datasources match template', async () => { + const meta = mockMetadata([testParamOther]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toEqual([]); + }); + + it('includes args in datasource info when present', async () => { + const paramWithArgs = {...testParam1, args: {address: '0x123', tokenId: 1}}; + const meta = mockMetadata([paramWithArgs]); + await service.init(meta); + + const datasources = service.getDynamicDatasourcesByTemplate('Test'); + + expect(datasources).toHaveLength(1); + expect(datasources[0].args).toEqual({address: '0x123', tokenId: 1}); + }); + + it('throws error when service not initialized', () => { + expect(() => service.getDynamicDatasourcesByTemplate('Test')).toThrow( + 'DynamicDsService has not been initialized' + ); + }); + }); + + describe('destroyDynamicDatasource with index', () => { + it('destroys specific datasource by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 1); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual(testParam1); // Not destroyed + expect(updatedParams[1]).toEqual({...testParam2, endBlock: 50}); // Destroyed + expect(updatedParams[2]).toEqual(testParam3); // Not destroyed + expect(updatedParams[3]).toEqual(testParamOther); // Not destroyed + }); + + it('throws error when index is out of bounds', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 5)).rejects.toThrow( + 'Index 5 is out of bounds. There are 2 datasource(s) in total' + ); + }); + + it('throws error when index is negative', async () => { + const meta = mockMetadata([testParam1, testParam2]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, -1)).rejects.toThrow( + 'Index -1 is out of bounds. There are 2 datasource(s) in total' + ); + }); + + it('throws error when trying to destroy already destroyed datasource', async () => { + const destroyedParam = {...testParam1, endBlock: 30}; + const meta = mockMetadata([destroyedParam]); + await service.init(meta); + + await expect(service.destroyDynamicDatasource('Test', 50, 0)).rejects.toThrow( + 'Dynamic datasource at index 0 is already destroyed' + ); + }); + + it('correctly handles global index after some datasources are destroyed', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3, testParam4]); + await service.init(meta); + + // Destroy the first one using global index 0 + await service.destroyDynamicDatasource('Test', 40, 0); + + // Now only 3 active datasources for 'Test' template, with global indices 1, 2, 3 + const activeDatasources = service.getDynamicDatasourcesByTemplate('Test'); + expect(activeDatasources).toHaveLength(3); + expect(activeDatasources[0].index).toBe(1); // Global index + expect(activeDatasources[0].startBlock).toBe(2); + expect(activeDatasources[1].index).toBe(2); // Global index + expect(activeDatasources[1].startBlock).toBe(3); + expect(activeDatasources[2].index).toBe(3); // Global index + expect(activeDatasources[2].startBlock).toBe(4); + + // Destroy using global index 2 (testParam3) + await service.destroyDynamicDatasource('Test', 60, 2); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 40}); + expect(updatedParams[1]).toEqual(testParam2); // Still active + expect(updatedParams[2]).toEqual({...testParam3, endBlock: 60}); + expect(updatedParams[3]).toEqual(testParam4); // Still active + }); + + it('updates datasources in memory correctly when destroying by index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 100, 1); + + const datasources = (service as any)._datasources; + expect(datasources[0].endBlock).toBeUndefined(); + expect(datasources[1].endBlock).toBe(100); + expect(datasources[2].endBlock).toBeUndefined(); + }); + + it('allows destroying datasources from different templates independently', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + await service.destroyDynamicDatasource('Test', 50, 0); + await service.destroyDynamicDatasource('Other', 60, 2); + + const updatedParams = (service as any)._datasourceParams; + expect(updatedParams[0]).toEqual({...testParam1, endBlock: 50}); + expect(updatedParams[1]).toEqual(testParam2); // Not destroyed + expect(updatedParams[2]).toEqual({...testParamOther, endBlock: 60}); + }); + + it('throws error when template name does not match global index', async () => { + const meta = mockMetadata([testParam1, testParam2, testParamOther]); + await service.init(meta); + + // Try to destroy 'Test' template with index 2, which is 'Other' template + await expect(service.destroyDynamicDatasource('Test', 50, 2)).rejects.toThrow( + 'Datasource at index 2 has template name "Other", not "Test"' + ); + }); + + it('sets endBlock correctly allowing in-place removal during block processing', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + // Destroy datasource at index 1 at block 50 + await service.destroyDynamicDatasource('Test', 50, 1); + + // Verify the datasource has endBlock set + const dsParam = service.getDatasourceParamByIndex(1); + expect(dsParam).toBeDefined(); + expect(dsParam?.endBlock).toBe(50); + expect(dsParam?.startBlock).toBe(2); + expect(dsParam?.templateName).toBe('Test'); + + // Verify the internal _datasources array also has endBlock set + const datasources = (service as any)._datasources; + expect(datasources[1]).toBeDefined(); + expect(datasources[1].endBlock).toBe(50); + }); + + it('destroyed datasource is filtered out in subsequent block processing', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + const blockHeight = 100; + const datasources = (service as any)._datasources; + + // Simulate filtering datasources for block 100 (all should be included initially) + const filterDataSources = (blockHeight: number, dataSources: BaseDataSource[]) => { + return dataSources.filter( + (ds) => + ds.startBlock !== undefined && + ds.startBlock <= blockHeight && + (ds.endBlock ?? Number.MAX_SAFE_INTEGER) > blockHeight + ); + }; + + // Initial state: all 3 datasources should be active + let filteredDs = filterDataSources(blockHeight, datasources); + expect(filteredDs.length).toBe(3); + + // Simulate processing: DS2 destroys DS3 during block 100 + await service.destroyDynamicDatasource('Test', blockHeight, 2); + + // Re-filter datasources + filteredDs = filterDataSources(blockHeight, datasources); + + // After destruction, only DS1 and DS2 should remain + expect(filteredDs.length).toBe(2); + expect(filteredDs[0].startBlock).toBe(1); // DS1 + expect(filteredDs[1].startBlock).toBe(2); // DS2 + + // Verify DS3 was destroyed + expect(datasources[2].endBlock).toBe(blockHeight); + }); + + it('demonstrates traditional for loop with in-place mutation stops processing destroyed datasources', async () => { + const meta = mockMetadata([testParam1, testParam2, testParam3]); + await service.init(meta); + + const blockHeight = 100; + // Work on a filtered copy, same as internalIndexBlock does (filterDataSources returns a new array) + const allDs: BaseDataSource[] = (service as any)._datasources; + const filtered = [...allDs]; + + const processed: number[] = []; + + for (let i = 0; i < filtered.length; i++) { + const ds = filtered[i]; + processed.push(ds.startBlock!); + + // When processing DS2, destroy DS3 and re-filter in-place + if (ds.startBlock === 2) { + await service.destroyDynamicDatasource('Test', blockHeight, 2); + + // Mutate the filtered array in-place (same pattern as internalIndexBlock) + const refiltered = filtered.filter( + (d) => + d.startBlock !== undefined && + d.startBlock <= blockHeight && + (d.endBlock ?? Number.MAX_SAFE_INTEGER) > blockHeight + ); + filtered.length = 0; + filtered.push(...refiltered); + } + } + // DS3 is never processed because the array shrank before the loop reached it + expect(processed).toEqual([1, 2]); + + // The services internal state still has all 3 entries, but DS3 has endBlock set + expect(allDs).toHaveLength(3); + expect(allDs[2].endBlock).toBe(blockHeight); + }); + }); }); diff --git a/packages/node-core/src/indexer/dynamic-ds.service.ts b/packages/node-core/src/indexer/dynamic-ds.service.ts index 708ab5f3bb..8d468cb545 100644 --- a/packages/node-core/src/indexer/dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/dynamic-ds.service.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import {Inject, Injectable} from '@nestjs/common'; -import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource} from '@subql/types-core'; +import {BaseCustomDataSource, BaseDataSource, BaseTemplateDataSource, DynamicDatasourceInfo} from '@subql/types-core'; import {Transaction} from '@subql/x-sequelize'; import {cloneDeep} from 'lodash'; import {IBlockchainService} from '../blockchain.service'; @@ -19,12 +19,16 @@ export interface DatasourceParams { templateName: string; args?: Record; startBlock: number; + endBlock?: number; } export interface IDynamicDsService { dynamicDatasources: DS[]; createDynamicDatasource(params: DatasourceParams): Promise; + destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise; getDynamicDatasources(forceReload?: boolean): Promise; + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[]; + getDatasourceParamByIndex(index: number): DatasourceParams | undefined; } @Injectable() @@ -91,6 +95,92 @@ export class DynamicDsService ({params, globalIndex})) + .filter(({params}) => params.templateName === templateName && params.endBlock === undefined); + + return matchingDatasources.map(({globalIndex, params}) => ({ + index: globalIndex, + templateName: params.templateName, + startBlock: params.startBlock, + endBlock: params.endBlock, + args: params.args, + })); + } + + /** + * Get datasource parameters by global index. + * + * @param index - Global index in the internal datasource parameters array + * @returns DatasourceParams if found, undefined otherwise + */ + getDatasourceParamByIndex(index: number): DatasourceParams | undefined { + return this._datasourceParams?.[index]; + } + + async destroyDynamicDatasource( + templateName: string, + currentBlockHeight: number, + index: number, + tx?: Transaction + ): Promise { + if (!this._datasources || !this._datasourceParams) { + throw new Error('DynamicDsService has not been initialized'); + } + + // Get the datasource at the global index + const dsParam = this._datasourceParams[index]; + + // Validate datasource exists + if (!dsParam) { + throw new Error( + `Index ${index} is out of bounds. There are ${this._datasourceParams.length} datasource(s) in total` + ); + } + + // Validate it matches the template name and is not already destroyed + if (dsParam.templateName !== templateName) { + throw new Error( + `Datasource at index ${index} has template name "${dsParam.templateName}", not "${templateName}"` + ); + } + + if (dsParam.endBlock !== undefined) { + throw new Error(`Dynamic datasource at index ${index} is already destroyed`); + } + + // Update the datasource params + const updatedParams = {...dsParam, endBlock: currentBlockHeight}; + this._datasourceParams[index] = updatedParams; + + // Update the datasource object if it exists + // Note: _datasources and _datasourceParams arrays should always be in sync. + // If the index is valid for params, it must also be valid for datasources. + const datasource = this._datasources[index]; + if (!datasource) { + throw new Error(`Datasources array out of sync with params at index ${index}`); + } + // Set endBlock on the datasource object + datasource.endBlock = currentBlockHeight; + + await this.metadata.set(METADATA_KEY, this._datasourceParams, tx); + + logger.info(`Destroyed dynamic datasource "${templateName}" at block ${currentBlockHeight}`); + } + // Not force only seems to be used for project changes async getDynamicDatasources(forceReload?: boolean): Promise { // Workers should not cache this result in order to keep in sync @@ -117,19 +207,19 @@ export class DynamicDsService t.name === templateName); if (!t) { throw new Error(`Unable to find matching template in project for name: "${templateName}"`); } const {name, ...template} = cloneDeep(t); - return {...template, startBlock} as DS; + return {...template, startBlock, endBlock} as DS; } private async getDatasource(params: DatasourceParams): Promise { - const dsObj = this.getTemplate(params.templateName, params.startBlock); + const dsObj = this.getTemplate(params.templateName, params.startBlock, params.endBlock); try { await this.blockchainService.updateDynamicDs(params, dsObj); diff --git a/packages/node-core/src/indexer/indexer.manager.spec.ts b/packages/node-core/src/indexer/indexer.manager.spec.ts new file mode 100644 index 0000000000..c35d3a1c70 --- /dev/null +++ b/packages/node-core/src/indexer/indexer.manager.spec.ts @@ -0,0 +1,195 @@ +// Copyright 2020-2025 SubQuery Pte Ltd authors & contributors +// SPDX-License-Identifier: GPL-3.0 + +import {BaseCustomDataSource, BaseDataSource} from '@subql/types-core'; +import {IApi} from '../api.service'; +import {IBlockchainService} from '../blockchain.service'; +import {NodeConfig} from '../configure'; +import {ProcessBlockResponse} from './blockDispatcher'; +import {DsProcessorService} from './ds-processor.service'; +import {DatasourceParams, DynamicDsService} from './dynamic-ds.service'; +import {BaseIndexerManager, FilterTypeMap, HandlerInputTypeMap, ProcessorTypeMap} from './indexer.manager'; +import {IndexerSandbox} from './sandbox'; +import {CacheMetadataModel} from './storeModelProvider'; +import {IBlock, ISubqueryProject} from './types'; +import {IUnfinalizedBlocksService} from './unfinalizedBlocks.service'; + +type FM = FilterTypeMap; +type PM = ProcessorTypeMap; +type HIM = HandlerInputTypeMap; + +// Minimal sandbox mock that lets us grab the frozen destroy callback +class MockSandbox { + // eslint-disable-next-line @typescript-eslint/ban-types + frozenFns: Record = {}; + + freeze(value: any, name: string): void { + if (typeof value === 'function') { + this.frozenFns[name] = value; + } + } + + // eslint-disable-next-line @typescript-eslint/no-empty-function + async securedExec(): Promise {} +} + +class TestIndexerManager extends BaseIndexerManager< + any, + any, + any, + IApi, + BaseDataSource, + BaseDataSource & BaseCustomDataSource, + FM, + PM, + HIM +> { + processedStartBlocks: number[] = []; + destroyConfig?: {triggerStartBlock: number; targetTemplate: string; targetIndex: number}; + + async indexBlock(block: IBlock, datasources: BaseDataSource[]): Promise { + return this.internalIndexBlock(block, datasources, () => Promise.resolve({} as any)); + } + + // Simulates what chain-specific indexBlockData does: iterate dataSources, call getVM, run handlers + protected async indexBlockData( + _block: any, + dataSources: BaseDataSource[], + getVM: (d: BaseDataSource) => Promise + ): Promise { + for (let i = 0; i < dataSources.length; i++) { + const ds = dataSources[i]; + this.processedStartBlocks.push(ds.startBlock!); + + const vm = (await getVM(ds)) as unknown as MockSandbox; + + // Trigger destroy if this ds matches the config + if (this.destroyConfig && ds.startBlock === this.destroyConfig.triggerStartBlock) { + const destroyFn = vm.frozenFns.destroyDynamicDatasource; + if (destroyFn) { + await destroyFn(this.destroyConfig.targetTemplate, this.destroyConfig.targetIndex); + } + } + } + } + + // eslint-disable-next-line @typescript-eslint/require-await + protected async prepareFilteredData(_kind: any, data: T): Promise { + return data; + } +} + +class TestDynamicDsService extends DynamicDsService { + constructor(project: ISubqueryProject) { + super(project, { + updateDynamicDs: () => Promise.resolve(undefined), + } as unknown as IBlockchainService); + } +} + +const mockMetadata = (initData: DatasourceParams[] = []) => { + let datasourceParams: DatasourceParams[] = initData; + + return { + set: (_key: string, value: any) => { + datasourceParams = value; + }, + find: (_key: string) => Promise.resolve([...datasourceParams]), + setNewDynamicDatasource: (params: DatasourceParams) => datasourceParams.push(params), + } as unknown as CacheMetadataModel; +}; + +const mockBlock = (height: number): IBlock => ({ + getHeader: () => ({blockHeight: height, blockHash: `hash-${height}`, parentHash: undefined, timestamp: new Date()}), + block: {}, +}); + +describe('BaseIndexerManager', () => { + let dynamicDsService: TestDynamicDsService; + let manager: TestIndexerManager; + + const project = { + templates: [{name: 'Test'}, {name: 'Other'}], + } as any as ISubqueryProject; + + beforeEach(() => { + dynamicDsService = new TestDynamicDsService(project); + + manager = new TestIndexerManager( + {unsafeApi: {}} as unknown as IApi, + {unfinalizedBlocks: false, profiler: false} as unknown as NodeConfig, + {getDsProcessor: () => new MockSandbox()} as any, + {} as DsProcessorService, + dynamicDsService as any, + {processUnfinalizedBlocks: () => Promise.resolve(undefined)} as unknown as IUnfinalizedBlocksService, + {} as FM, + {} as PM, + {isRuntimeDs: () => true, isCustomDs: () => false} as unknown as IBlockchainService + ); + }); + + describe('destroy dynamic datasource mid-block', () => { + it('removes destroyed ds from the iteration within the same block', async () => { + const meta = mockMetadata([ + {templateName: 'Test', startBlock: 1}, + {templateName: 'Test', startBlock: 5}, + {templateName: 'Test', startBlock: 10}, + ]); + await dynamicDsService.init(meta); + const datasources = await dynamicDsService.getDynamicDatasources(); + + // When processing startBlock=5, destroy ds at index 2 (startBlock=10) + manager.destroyConfig = {triggerStartBlock: 5, targetTemplate: 'Test', targetIndex: 2}; + + await manager.indexBlock(mockBlock(100), datasources); + + // startBlock=10 should never be reached + expect(manager.processedStartBlocks).toEqual([1, 5]); + }); + + it('sets endBlock on the destroyed datasource', async () => { + const meta = mockMetadata([ + {templateName: 'Test', startBlock: 1}, + {templateName: 'Test', startBlock: 5}, + ]); + await dynamicDsService.init(meta); + const datasources = await dynamicDsService.getDynamicDatasources(); + + manager.destroyConfig = {triggerStartBlock: 1, targetTemplate: 'Test', targetIndex: 1}; + + await manager.indexBlock(mockBlock(50), datasources); + + const param = dynamicDsService.getDatasourceParamByIndex(1); + expect(param?.endBlock).toBe(50); + }); + + it('processes all datasources when nothing is destroyed', async () => { + const meta = mockMetadata([ + {templateName: 'Test', startBlock: 1}, + {templateName: 'Test', startBlock: 5}, + {templateName: 'Test', startBlock: 10}, + ]); + await dynamicDsService.init(meta); + const datasources = await dynamicDsService.getDynamicDatasources(); + + await manager.indexBlock(mockBlock(100), datasources); + + expect(manager.processedStartBlocks).toEqual([1, 5, 10]); + }); + + it('filters out already-destroyed datasources before processing starts', async () => { + const meta = mockMetadata([ + {templateName: 'Test', startBlock: 1, endBlock: 50}, + {templateName: 'Test', startBlock: 5}, + {templateName: 'Test', startBlock: 10}, + ]); + await dynamicDsService.init(meta); + const datasources = await dynamicDsService.getDynamicDatasources(); + + await manager.indexBlock(mockBlock(100), datasources); + + // startBlock=1 was destroyed at block 50, should be excluded + expect(manager.processedStartBlocks).toEqual([5, 10]); + }); + }); +}); diff --git a/packages/node-core/src/indexer/indexer.manager.ts b/packages/node-core/src/indexer/indexer.manager.ts index c484eeab0f..663a807cc3 100644 --- a/packages/node-core/src/indexer/indexer.manager.ts +++ b/packages/node-core/src/indexer/indexer.manager.ts @@ -116,6 +116,24 @@ export abstract class BaseIndexerManager< dynamicDsCreated = true; }, 'createDynamicDatasource'); + // Inject function to get dynamic datasources by template into vm + vm.freeze((templateName: string) => { + return this.dynamicDsService.getDynamicDatasourcesByTemplate(templateName); + }, 'getDynamicDatasources'); + + // Inject function to destroy ds into vm + vm.freeze(async (templateName: string, index: number) => { + await this.dynamicDsService.destroyDynamicDatasource(templateName, blockHeight, index); + + // Re-filter and mutate the array in-place so that indexBlockData's reference + // sees the change. A simple reassignment (filteredDataSources = ...) would only + // update this closure's variable, leaving the dataSources parameter in + // indexBlockData pointing at the stale array. + const refiltered = this.filterDataSources(blockHeight, filteredDataSources); + filteredDataSources.length = 0; + filteredDataSources.push(...refiltered); + }, 'destroyDynamicDatasource'); + return vm; }); } @@ -135,11 +153,15 @@ export abstract class BaseIndexerManager< private filterDataSources(nextProcessingHeight: number, dataSources: DS[]): DS[] { let filteredDs: DS[]; + // Strict `>` (not `>=`) is intentional: destroyDynamicDatasource sets + // endBlock = currentBlockHeight, and the destroyed DS must be excluded + // when re-filtering within the same block. With `>=` the DS would pass + // the filter and remain active for the rest of the block. filteredDs = dataSources.filter( (ds) => ds.startBlock !== undefined && ds.startBlock <= nextProcessingHeight && - (ds.endBlock ?? Number.MAX_SAFE_INTEGER) >= nextProcessingHeight + (ds.endBlock ?? Number.MAX_SAFE_INTEGER) > nextProcessingHeight ); // perform filter for custom ds diff --git a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts index 3c6e456a08..a8c33206fb 100644 --- a/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts +++ b/packages/node-core/src/indexer/worker/worker.dynamic-ds.service.ts @@ -3,16 +3,23 @@ import {isMainThread} from 'node:worker_threads'; import {Injectable} from '@nestjs/common'; +import {DynamicDatasourceInfo} from '@subql/types-core'; import {DatasourceParams, IDynamicDsService} from '../dynamic-ds.service'; export type HostDynamicDS = { dynamicDsCreateDynamicDatasource: (params: DatasourceParams) => Promise; + dynamicDsDestroyDynamicDatasource: (templateName: string, currentBlockHeight: number, index: number) => Promise; dynamicDsGetDynamicDatasources: () => Promise; + dynamicDsGetDynamicDatasourcesByTemplate: (templateName: string) => DynamicDatasourceInfo[]; + dynamicDsGetDatasourceParamByIndex: (index: number) => DatasourceParams | undefined; }; export const hostDynamicDsKeys: (keyof HostDynamicDS)[] = [ 'dynamicDsCreateDynamicDatasource', + 'dynamicDsDestroyDynamicDatasource', 'dynamicDsGetDynamicDatasources', + 'dynamicDsGetDynamicDatasourcesByTemplate', + 'dynamicDsGetDatasourceParamByIndex', ]; @Injectable() @@ -32,14 +39,29 @@ export class WorkerDynamicDsService implements IDynamicDsService { return this.host.dynamicDsCreateDynamicDatasource(JSON.parse(JSON.stringify(params))); } + async destroyDynamicDatasource(templateName: string, currentBlockHeight: number, index: number): Promise { + return this.host.dynamicDsDestroyDynamicDatasource(templateName, currentBlockHeight, index); + } + async getDynamicDatasources(): Promise { return this.host.dynamicDsGetDynamicDatasources(); } + + getDynamicDatasourcesByTemplate(templateName: string): DynamicDatasourceInfo[] { + return this.host.dynamicDsGetDynamicDatasourcesByTemplate(templateName); + } + + getDatasourceParamByIndex(index: number): DatasourceParams | undefined { + return this.host.dynamicDsGetDatasourceParamByIndex(index); + } } export function dynamicDsHostFunctions(dynamicDsService: IDynamicDsService): HostDynamicDS { return { dynamicDsCreateDynamicDatasource: dynamicDsService.createDynamicDatasource.bind(dynamicDsService), + dynamicDsDestroyDynamicDatasource: dynamicDsService.destroyDynamicDatasource.bind(dynamicDsService), dynamicDsGetDynamicDatasources: dynamicDsService.getDynamicDatasources.bind(dynamicDsService), + dynamicDsGetDynamicDatasourcesByTemplate: dynamicDsService.getDynamicDatasourcesByTemplate.bind(dynamicDsService), + dynamicDsGetDatasourceParamByIndex: dynamicDsService.getDatasourceParamByIndex.bind(dynamicDsService), }; } diff --git a/packages/node/src/indexer/indexer.manager.ts b/packages/node/src/indexer/indexer.manager.ts index f505fb3ec0..d67fdd74d0 100644 --- a/packages/node/src/indexer/indexer.manager.ts +++ b/packages/node/src/indexer/indexer.manager.ts @@ -190,7 +190,11 @@ export class IndexerManager extends BaseIndexerManager< getVM: (d: SubstrateProjectDs) => Promise, ) => Promise { return async (content, dataSources, getVM) => { - for (const ds of dataSources) { + // Traditional for-loop (not for...of) because dataSources may be mutated + // in-place when a handler destroys a dynamic datasource. The length check + // on each iteration picks up the shorter array. + for (let i = 0; i < dataSources.length; i++) { + const ds = dataSources[i]; await this.indexData(kind, content, ds, getVM); } }; diff --git a/packages/types-core/src/global.ts b/packages/types-core/src/global.ts index cbaf1be173..654999c971 100644 --- a/packages/types-core/src/global.ts +++ b/packages/types-core/src/global.ts @@ -2,7 +2,7 @@ // SPDX-License-Identifier: GPL-3.0 import type Pino from 'pino'; -import {Cache, DynamicDatasourceCreator} from './interfaces'; +import {Cache, DynamicDatasourceCreator, DynamicDatasourceDestructor, DynamicDatasourceGetter} from './interfaces'; import {Store} from './store'; // base global @@ -12,4 +12,6 @@ declare global { const cache: Cache; const chainId: string; const createDynamicDatasource: DynamicDatasourceCreator; + const destroyDynamicDatasource: DynamicDatasourceDestructor; + const getDynamicDatasources: DynamicDatasourceGetter; } diff --git a/packages/types-core/src/interfaces.ts b/packages/types-core/src/interfaces.ts index bde78e77ac..7930692ad8 100644 --- a/packages/types-core/src/interfaces.ts +++ b/packages/types-core/src/interfaces.ts @@ -2,6 +2,28 @@ // SPDX-License-Identifier: GPL-3.0 export type DynamicDatasourceCreator = (name: string, args: Record) => Promise; +export type DynamicDatasourceDestructor = (name: string, index: number) => Promise; + +/** + * Information about a dynamic datasource instance. + */ +export interface DynamicDatasourceInfo { + /** + * Global index of the datasource in the internal storage array. + * Use this value when calling destroyDynamicDatasource(). + */ + index: number; + /** Template name this datasource was created from */ + templateName: string; + /** Block height where this datasource starts processing */ + startBlock: number; + /** Block height where this datasource stops processing (if destroyed) */ + endBlock?: number; + /** Arguments passed when creating this datasource */ + args?: Record; +} + +export type DynamicDatasourceGetter = (templateName: string) => DynamicDatasourceInfo[]; export interface Cache = Record> { set(key: keyof T, value: T[keyof T]): Promise;