diff --git a/.changeset/six-adapter-initial.md b/.changeset/six-adapter-initial.md new file mode 100644 index 00000000000..1ac27105127 --- /dev/null +++ b/.changeset/six-adapter-initial.md @@ -0,0 +1,5 @@ +--- +'@chainlink/six-adapter': major +--- + +New adapter for SIX Exchange and BME equity market data via WebSocket streaming with mTLS authentication diff --git a/.pnp.cjs b/.pnp.cjs index a51709666f1..e7648ac6595 100644 --- a/.pnp.cjs +++ b/.pnp.cjs @@ -618,6 +618,10 @@ const RAW_RUNTIME_STATE = "name": "@chainlink/securitize-adapter",\ "reference": "workspace:packages/sources/securitize"\ },\ + {\ + "name": "@chainlink/six-adapter",\ + "reference": "workspace:packages/sources/six"\ + },\ {\ "name": "@chainlink/snowflake-adapter",\ "reference": "workspace:packages/sources/snowflake"\ @@ -931,6 +935,7 @@ const RAW_RUNTIME_STATE = ["@chainlink/secure-mint-adapter", ["workspace:packages/composites/secure-mint"]],\ ["@chainlink/securitize-adapter", ["workspace:packages/sources/securitize"]],\ ["@chainlink/set-token-index-adapter", ["workspace:packages/composites/set-token-index"]],\ + ["@chainlink/six-adapter", ["workspace:packages/sources/six"]],\ ["@chainlink/snowflake-adapter", ["workspace:packages/sources/snowflake"]],\ ["@chainlink/sochain-adapter", ["workspace:packages/sources/sochain"]],\ ["@chainlink/solactive-adapter", ["workspace:packages/sources/solactive"]],\ @@ -7583,6 +7588,26 @@ const RAW_RUNTIME_STATE = "linkType": "SOFT"\ }]\ ]],\ + ["@chainlink/six-adapter", [\ + ["workspace:packages/sources/six", {\ + "packageLocation": "./packages/sources/six/",\ + "packageDependencies": [\ + ["@chainlink/six-adapter", "workspace:packages/sources/six"],\ + ["@chainlink/external-adapter-framework", "npm:2.13.1"],\ + ["@sinonjs/fake-timers", "npm:9.1.2"],\ + ["@types/jest", "npm:29.5.14"],\ + ["@types/node", "npm:22.14.1"],\ + ["@types/sinonjs__fake-timers", "npm:8.1.5"],\ + ["@types/ws", "npm:8.18.1"],\ + ["axios", "npm:1.13.4"],\ + ["nock", "npm:13.5.6"],\ + ["tslib", "npm:2.4.1"],\ + ["typescript", "patch:typescript@npm%3A5.8.3#optional!builtin::version=5.8.3&hash=5786d5"],\ + ["ws", "virtual:76798ef4297c06624e6a890a042a8db65fa32cc5d1d7d8828006e188fc8f070b35b00d18d56a03211a6f7ef7c28f126042b0f4ef3fc99489849fd25a37aa15d0#npm:8.18.3"]\ + ],\ + "linkType": "SOFT"\ + }]\ + ]],\ ["@chainlink/snowflake-adapter", [\ ["workspace:packages/sources/snowflake", {\ "packageLocation": "./packages/sources/snowflake/",\ diff --git a/packages/sources/six/package.json b/packages/sources/six/package.json new file mode 100644 index 00000000000..88b8eeec86b --- /dev/null +++ b/packages/sources/six/package.json @@ -0,0 +1,45 @@ +{ + "name": "@chainlink/six-adapter", + "version": "0.1.0", + "description": "Chainlink six adapter.", + "keywords": [ + "Chainlink", + "LINK", + "blockchain", + "oracle", + "six" + ], + "main": "dist/index.js", + "types": "dist/index.d.ts", + "files": [ + "dist" + ], + "repository": { + "url": "https://github.com/smartcontractkit/external-adapters-js", + "type": "git" + }, + "license": "MIT", + "scripts": { + "clean": "rm -rf dist && rm -f tsconfig.tsbuildinfo", + "prepack": "yarn build", + "build": "tsc -b", + "server": "node -e 'require(\"./index.js\").server()'", + "server:dist": "node -e 'require(\"./dist/index.js\").server()'", + "start": "yarn server:dist" + }, + "devDependencies": { + "@sinonjs/fake-timers": "9.1.2", + "@types/jest": "^29.5.14", + "@types/node": "22.14.1", + "@types/sinonjs__fake-timers": "8.1.5", + "@types/ws": "^8", + "nock": "13.5.6", + "typescript": "5.8.3" + }, + "dependencies": { + "@chainlink/external-adapter-framework": "2.13.1", + "axios": "1.13.4", + "tslib": "2.4.1", + "ws": "^8.18.3" + } +} diff --git a/packages/sources/six/src/config/index.ts b/packages/sources/six/src/config/index.ts new file mode 100644 index 00000000000..fed5e61a69d --- /dev/null +++ b/packages/sources/six/src/config/index.ts @@ -0,0 +1,32 @@ +import { AdapterConfig } from '@chainlink/external-adapter-framework/config' + +export const config = new AdapterConfig( + { + WS_API_ENDPOINT: { + description: 'SIX WebSocket API endpoint', + type: 'string', + default: 'wss://api.six-group.com/web/v2/websocket', + required: true, + sensitive: false, + }, + REST_API_ENDPOINT: { + description: 'SIX REST API base URL (used to fetch Market Base reference data)', + type: 'string', + default: 'https://api.six-group.com', + required: true, + sensitive: false, + }, + CONFLATION_PERIOD: { + description: 'Conflation period in ISO 8601 duration format (e.g. PT1S for 1 second)', + type: 'string', + default: 'PT1S', + required: false, + sensitive: false, + }, + }, + { + envDefaultOverrides: { + CACHE_MAX_AGE: 10_000, // 10s - data updates every 1s, stale after 10s + }, + }, +) diff --git a/packages/sources/six/src/endpoint/price.ts b/packages/sources/six/src/endpoint/price.ts new file mode 100644 index 00000000000..127c194a5ac --- /dev/null +++ b/packages/sources/six/src/endpoint/price.ts @@ -0,0 +1,58 @@ +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter/endpoint' +import { InputParameters } from '@chainlink/external-adapter-framework/validation' +import { config } from '../config' +import { generateTransport } from '../transport/price' + +const inputParameters = new InputParameters( + { + ticker: { + aliases: ['base', 'symbol', 'asset'], + required: true, + type: 'string', + description: 'Instrument ticker (e.g. ABBN, ALC, ANA, ANE)', + }, + bc: { + aliases: ['market', 'bourseCode', 'exchange'], + required: true, + type: 'string', + description: 'SIX Bourse Code (e.g. 4 for SIX Swiss Exchange, 1058 for BME)', + }, + }, + [ + { + ticker: 'ABBN', + bc: '4', + }, + ], +) + +interface PriceResponse { + Result: number | null + Data: { + mid?: number + bid?: number + bidSize?: number + ask?: number + askSize?: number + lastTradedPrice?: number + volume?: number + marketStatus: number + ripcord: boolean + ripcordAsInt: number + } +} + +export type BaseEndpointTypes = { + Parameters: typeof inputParameters.definition + Response: PriceResponse + Settings: typeof config.settings +} + +const transport = generateTransport() + +export const endpoint = new AdapterEndpoint({ + name: 'price', + aliases: ['equity', 'stock'], + transport, + inputParameters, +}) diff --git a/packages/sources/six/src/index.ts b/packages/sources/six/src/index.ts new file mode 100644 index 00000000000..5f7d7d4b06d --- /dev/null +++ b/packages/sources/six/src/index.ts @@ -0,0 +1,13 @@ +import { expose, ServerInstance } from '@chainlink/external-adapter-framework' +import { Adapter } from '@chainlink/external-adapter-framework/adapter' +import { config } from './config' +import { endpoint } from './endpoint/price' + +export const adapter = new Adapter({ + defaultEndpoint: endpoint.name, + name: 'SIX', + config, + endpoints: [endpoint], +}) + +export const server = (): Promise => expose(adapter) diff --git a/packages/sources/six/src/transport/price.ts b/packages/sources/six/src/transport/price.ts new file mode 100644 index 00000000000..37fecc1f9c3 --- /dev/null +++ b/packages/sources/six/src/transport/price.ts @@ -0,0 +1,273 @@ +import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports/websocket' +import { makeLogger, ProviderResult } from '@chainlink/external-adapter-framework/util' +import axios from 'axios' +import https from 'https' +import { BaseEndpointTypes } from '../endpoint/price' +import { + buildCloseStreamQuery, + buildSubscriptionQuery, + mapMarketBaseStatusToV11, + MARKET_STATUS_UNKNOWN, + toMilliseconds, +} from './utils' + +const logger = makeLogger('SixPriceTransport') + +const STALE_DATA_THRESHOLD_SECONDS = 300 +const MARKET_BASE_REQUEST_TIMEOUT_MS = 10_000 + +type SixPriceField = { + value?: number + size?: number + unixTimestamp?: number +} + +type SixStreamMessage = { + type: 'START' | 'UPDATE' | 'STOP' | 'ERROR' + requestedId: string + streamId: string + requestedScheme: string + last?: SixPriceField + bestBid?: SixPriceField + bestAsk?: SixPriceField + mid?: SixPriceField + volume?: SixPriceField +} + +type WsMessage = { + data?: { + startStream?: SixStreamMessage[] + } + errors?: { message: string; category?: string; type?: string; messageCode?: number }[] +} + +type MarketBaseMarket = { + lookupStatus?: string + referenceData?: { + marketBase?: { + marketStatus?: string + } + } +} + +type MarketBaseResponse = { + data?: { + markets?: MarketBaseMarket[] + } +} + +export type WsTransportTypes = BaseEndpointTypes & { + Provider: { + WsMessage: WsMessage + } +} + +// Market Base `marketStatus` is quasi-static (exchange-level flag, not session +// state). Cache per BC for the process lifetime; on the first message for a +// new BC we kick off an async fetch and return MARKET_STATUS_UNKNOWN until the +// cache is populated. +const marketStatusByBc = new Map() +const pendingMarketStatusFetches = new Set() + +let cachedCredentials: { cert: Buffer; key: Buffer } | undefined +let cachedHttpsAgent: https.Agent | undefined + +const getCredentials = (certBase64: string, keyBase64: string) => { + if (!cachedCredentials) { + cachedCredentials = { + cert: Buffer.from(certBase64, 'base64'), + key: Buffer.from(keyBase64, 'base64'), + } + } + return cachedCredentials +} + +const getHttpsAgent = (): https.Agent | undefined => { + if (!cachedCredentials) return undefined + if (!cachedHttpsAgent) { + cachedHttpsAgent = new https.Agent({ + cert: cachedCredentials.cert, + key: cachedCredentials.key, + }) + } + return cachedHttpsAgent +} + +const fetchMarketBaseStatus = async (bc: string, restEndpoint: string): Promise => { + const httpsAgent = getHttpsAgent() + if (!httpsAgent) throw new Error('mTLS credentials not initialized') + const response = await axios.get( + `${restEndpoint}/web/v2/markets/referenceData/marketBase`, + { + params: { scheme: 'BC', ids: bc }, + headers: { accept: 'application/json' }, + httpsAgent, + timeout: MARKET_BASE_REQUEST_TIMEOUT_MS, + }, + ) + const status = response.data.data?.markets?.[0]?.referenceData?.marketBase?.marketStatus + return mapMarketBaseStatusToV11(status) +} + +const ensureMarketStatusFetched = (bc: string, restEndpoint: string): void => { + if (marketStatusByBc.has(bc) || pendingMarketStatusFetches.has(bc)) return + if (!cachedCredentials) return + pendingMarketStatusFetches.add(bc) + fetchMarketBaseStatus(bc, restEndpoint) + .then((status) => { + marketStatusByBc.set(bc, status) + logger.info({ bc, status }, 'Market base status fetched') + }) + .catch((error) => { + logger.error({ bc, error: String(error) }, 'Failed to fetch market base status') + }) + .finally(() => { + pendingMarketStatusFetches.delete(bc) + }) +} + +const makeRipcordResult = ( + ticker: string, + bc: string, + details: string, +): ProviderResult => ({ + params: { ticker, bc }, + response: { + statusCode: 502, + errorMessage: `Ripcord activated for ${ticker}_${bc}. Details: ${details}`, + }, +}) + +export const generateTransport = () => { + const transport = new WebSocketTransport({ + url: ({ adapterSettings: { WS_API_ENDPOINT } }) => WS_API_ENDPOINT, + + options: ({ adapterSettings: { TLS_PUBLIC_KEY, TLS_PRIVATE_KEY } }) => { + if (!TLS_PUBLIC_KEY || !TLS_PRIVATE_KEY) { + throw new Error('TLS_PUBLIC_KEY and TLS_PRIVATE_KEY must be set (Base64-encoded)') + } + const { cert, key } = getCredentials(TLS_PUBLIC_KEY, TLS_PRIVATE_KEY) + return { cert, key } + }, + + handlers: { + open: async () => { + logger.info('Connected to SIX WebSocket API') + }, + + message: (message, context): ProviderResult[] | undefined => { + if (message.errors) { + logger.error({ errors: message.errors }, 'SIX API returned errors') + return [] + } + + const streams = message.data?.startStream + if (!streams) return [] + + const results: ProviderResult[] = [] + + for (const stream of streams) { + const parts = stream.streamId.split('_') + if (parts.length !== 2) { + logger.warn({ streamId: stream.streamId }, 'Unexpected streamId format') + continue + } + + const [ticker, bc] = parts + + if (stream.type === 'ERROR') { + logger.error( + { streamId: stream.streamId, requestedId: stream.requestedId }, + 'SIX stream error', + ) + results.push(makeRipcordResult(ticker, bc, 'SIX stream error (check entitlements)')) + continue + } + + if (stream.type !== 'START' && stream.type !== 'UPDATE') continue + + const lastPrice = stream.last?.value + + if (lastPrice == null && stream.bestBid?.value == null && stream.bestAsk?.value == null) { + logger.debug({ streamId: stream.streamId }, 'No price data in message') + continue + } + + const latestTimestamp = + stream.last?.unixTimestamp ?? + stream.bestBid?.unixTimestamp ?? + stream.bestAsk?.unixTimestamp + if (latestTimestamp != null) { + const ageS = Date.now() / 1000 - latestTimestamp + if (ageS > STALE_DATA_THRESHOLD_SECONDS) { + logger.warn( + { streamId: stream.streamId, ageS: Math.round(ageS) }, + 'Stale data detected', + ) + results.push(makeRipcordResult(ticker, bc, `Stale data (${Math.round(ageS)}s old)`)) + continue + } + } + + ensureMarketStatusFetched(bc, context.adapterSettings.REST_API_ENDPOINT) + const marketStatus = marketStatusByBc.get(bc) ?? MARKET_STATUS_UNKNOWN + + const mid = + stream.mid?.value ?? + (stream.bestBid?.value != null && stream.bestAsk?.value != null + ? (stream.bestBid.value + stream.bestAsk.value) / 2 + : undefined) + + const providerIndicatedTimeUnixMs = toMilliseconds(latestTimestamp) + + results.push({ + params: { ticker, bc }, + response: { + result: lastPrice ?? mid ?? null, + data: { + mid, + bid: stream.bestBid?.value, + bidSize: stream.bestBid?.size, + ask: stream.bestAsk?.value, + askSize: stream.bestAsk?.size, + lastTradedPrice: lastPrice, + volume: stream.volume?.value, + marketStatus, + ripcord: false, + ripcordAsInt: 0, + }, + timestamps: { + providerIndicatedTimeUnixMs, + }, + }, + }) + } + + return results + }, + + error: (errorEvent) => { + logger.error({ errorEvent }, 'SIX WebSocket error') + }, + + close: (closeEvent) => { + const code = (closeEvent as any)?.code + const reason = (closeEvent as any)?.reason + const wasClean = (closeEvent as any)?.wasClean + logger.info({ code, reason, wasClean }, 'SIX WebSocket closed') + }, + }, + + builders: { + subscribeMessage: (params, { adapterSettings: { CONFLATION_PERIOD } }) => ({ + query: buildSubscriptionQuery(params.ticker, params.bc, CONFLATION_PERIOD), + }), + + unsubscribeMessage: (params) => ({ + query: buildCloseStreamQuery(params.ticker, params.bc), + }), + }, + }) + + return transport +} diff --git a/packages/sources/six/src/transport/utils.ts b/packages/sources/six/src/transport/utils.ts new file mode 100644 index 00000000000..06d54740d1a --- /dev/null +++ b/packages/sources/six/src/transport/utils.ts @@ -0,0 +1,76 @@ +/** + * Build a GraphQL subscription query for SIX streaming market data. + */ +export const buildSubscriptionQuery = ( + ticker: string, + bc: string, + conflationPeriod: string, +): string => { + const streamId = `${ticker}_${bc}` + return `subscription { + startStream( + scheme: TICKER_BC, + ids: ["${streamId}"], + streamId: "${streamId}", + conflationType: INTERVAL, + conflationPeriod: "${conflationPeriod}" + ) { + type + requestedId + streamId + requestedScheme + last { value size unixTimestamp } + bestBid { value size unixTimestamp } + bestAsk { value size unixTimestamp } + mid { value unixTimestamp } + volume { value unixTimestamp } + } + }` +} + +/** + * Build a GraphQL mutation to close a stream. + */ +export const buildCloseStreamQuery = (ticker: string, bc: string): string => { + const streamId = `${ticker}_${bc}` + return `mutation { closeStream(streamId: "${streamId}") { type streamId } }` +} + +/** + * v11 marketStatus uint32 enum values. + * + * 0 = unknown + * 1 = pre-market + * 2 = open + * 3 = post-market + * 4 = closed + */ +export const MARKET_STATUS_UNKNOWN = 0 +export const MARKET_STATUS_PRE_MARKET = 1 +export const MARKET_STATUS_OPEN = 2 +export const MARKET_STATUS_POST_MARKET = 3 +export const MARKET_STATUS_CLOSED = 4 + +/** + * Map SIX Market Base `marketStatus` enum (ACTIVE/INACTIVE/REFERENCE_ONLY/OTHER) + * to the Chainlink Data Streams v11 RWA Advanced `marketStatus` uint32. + */ +const MARKET_BASE_STATUS_MAP: Record = { + ACTIVE: MARKET_STATUS_OPEN, + INACTIVE: MARKET_STATUS_CLOSED, + REFERENCE_ONLY: MARKET_STATUS_UNKNOWN, + OTHER: MARKET_STATUS_UNKNOWN, +} + +export const mapMarketBaseStatusToV11 = (status: string | undefined): number => { + if (!status) return MARKET_STATUS_UNKNOWN + return MARKET_BASE_STATUS_MAP[status.toUpperCase()] ?? MARKET_STATUS_UNKNOWN +} + +/** + * Convert SIX unix timestamp (seconds with microsecond precision) to milliseconds. + */ +export const toMilliseconds = (unixTimestamp: number | undefined): number | undefined => { + if (unixTimestamp == null) return undefined + return Math.round(unixTimestamp * 1e3) +} diff --git a/packages/sources/six/test-payload.json b/packages/sources/six/test-payload.json new file mode 100644 index 00000000000..b914d7f1d54 --- /dev/null +++ b/packages/sources/six/test-payload.json @@ -0,0 +1,6 @@ +{ + "requests": [{ + "ticker": "ANA", + "bc": "1058" + }] +} diff --git a/packages/sources/six/test/integration/adapter.test.ts b/packages/sources/six/test/integration/adapter.test.ts new file mode 100644 index 00000000000..0de6e9a72b6 --- /dev/null +++ b/packages/sources/six/test/integration/adapter.test.ts @@ -0,0 +1,139 @@ +import { WebSocketClassProvider } from '@chainlink/external-adapter-framework/transports' +import { + mockWebSocketProvider, + MockWebsocketServer, + setEnvVariables, + TestAdapter, +} from '@chainlink/external-adapter-framework/util/testing-utils' +import FakeTimers from '@sinonjs/fake-timers' +import nock from 'nock' +import { mockWebSocketServer } from './fixtures' + +describe('SIX Adapter', () => { + let oldEnv: NodeJS.ProcessEnv + let mockWsServer: MockWebsocketServer + let testAdapter: TestAdapter + let marketBaseScope: nock.Scope + const wsEndpoint = 'ws://localhost:9090' + const restEndpoint = 'https://api.six-group.test' + + beforeAll(async () => { + oldEnv = JSON.parse(JSON.stringify(process.env)) + + process.env['WS_API_ENDPOINT'] = wsEndpoint + process.env['REST_API_ENDPOINT'] = restEndpoint + process.env['TLS_PUBLIC_KEY'] = Buffer.from('mock-cert').toString('base64') + process.env['TLS_PRIVATE_KEY'] = Buffer.from('mock-key').toString('base64') + process.env['WS_SUBSCRIPTION_TTL'] = '5000' + process.env['CACHE_MAX_AGE'] = '5000' + process.env['CACHE_POLLING_MAX_RETRIES'] = '0' + process.env['METRICS_ENABLED'] = 'false' + process.env['WS_SUBSCRIPTION_UNRESPONSIVE_TTL'] = '30000' + + // Mock Market Base REST endpoint for any BC - always returns ACTIVE + marketBaseScope = nock(restEndpoint) + .get('/web/v2/markets/referenceData/marketBase') + .query(true) + .reply(200, { + data: { + markets: [ + { + lookupStatus: 'FOUND', + referenceData: { marketBase: { marketStatus: 'ACTIVE' } }, + }, + ], + }, + }) + .persist() + + mockWebSocketProvider(WebSocketClassProvider) + mockWsServer = mockWebSocketServer(wsEndpoint) + + const adapter = (await import('../../src')).adapter + testAdapter = await TestAdapter.startWithMockedCache(adapter, { + clock: FakeTimers.install(), + testAdapter: {} as TestAdapter, + }) + + // Prime subscriptions so the background loop establishes WS and populates cache + await testAdapter.request({ ticker: 'ANA', bc: '1058' }) + await testAdapter.request({ ticker: 'ABBN', bc: '4' }) + await testAdapter.request({ ticker: 'STALE', bc: '999' }) + await testAdapter.waitForCache(3) + }) + + afterAll(async () => { + // Verify the Market Base REST endpoint was actually called during tests + expect(marketBaseScope.isDone()).toBe(true) + nock.cleanAll() + setEnvVariables(oldEnv) + mockWsServer?.close() + testAdapter.clock?.uninstall() + await testAdapter.api.close() + }) + + describe('price endpoint', () => { + it('should return success for entitled BME instrument', async () => { + const response = await testAdapter.request({ + ticker: 'ANA', + bc: '1058', + }) + + expect(response.statusCode).toBe(200) + + const body = response.json() + expect(body.statusCode).toBe(200) + expect(body).toHaveProperty('data') + expect(body).toHaveProperty('result') + + const d = body.data + expect(typeof d.bid).toBe('number') + expect(typeof d.ask).toBe('number') + expect(typeof d.lastTradedPrice).toBe('number') + expect(typeof d.volume).toBe('number') + expect(d.ripcord).toBe(false) + expect(d.ripcordAsInt).toBe(0) + + expect(d.bid).toBe(230.8) + expect(d.ask).toBe(231.6) + expect(d.lastTradedPrice).toBe(231.6) + expect(d.mid).toBe(231.2) + expect(d.volume).toBe(53750.0) + }) + + it('should return ripcord error for non-entitled instrument', async () => { + const response = await testAdapter.request({ + ticker: 'ABBN', + bc: '4', + }) + + const body = response.json() + expect(body.statusCode).toBe(502) + expect(body.errorMessage).toContain('Ripcord activated for ABBN_4') + expect(body.errorMessage).toContain('SIX stream error') + }) + + it('should return ripcord for stale data', async () => { + const response = await testAdapter.request({ + ticker: 'STALE', + bc: '999', + }) + + const body = response.json() + expect(body.statusCode).toBe(502) + expect(body.errorMessage).toContain('Ripcord activated for STALE_999') + expect(body.errorMessage).toContain('Stale data') + }) + + it('should work with alias parameters', async () => { + const response = await testAdapter.request({ + symbol: 'ANA', + market: '1058', + }) + + expect(response.statusCode).toBe(200) + const body = response.json() + expect(body.data.lastTradedPrice).toBe(231.6) + }) + }) +}) diff --git a/packages/sources/six/test/integration/fixtures.ts b/packages/sources/six/test/integration/fixtures.ts new file mode 100644 index 00000000000..da9f3a90ba5 --- /dev/null +++ b/packages/sources/six/test/integration/fixtures.ts @@ -0,0 +1,179 @@ +import { MockWebsocketServer } from '@chainlink/external-adapter-framework/util/testing-utils' + +export const mockStartResponse = { + data: { + startStream: [ + { + type: 'START', + requestedId: 'ANA_1058', + streamId: 'ANA_1058', + requestedScheme: 'TICKER_BC', + last: { + value: 231.6, + size: 40.0, + unixTimestamp: Date.now() / 1000, // Fresh timestamp + }, + bestBid: { + value: 230.8, + size: 20.0, + unixTimestamp: Date.now() / 1000, + }, + bestAsk: { + value: 231.6, + size: 165.0, + unixTimestamp: Date.now() / 1000, + }, + mid: { + value: 231.2, + unixTimestamp: Date.now() / 1000, + }, + volume: { + value: 53750.0, + unixTimestamp: Date.now() / 1000, + }, + }, + ], + }, +} + +export const mockUpdateResponse = { + data: { + startStream: [ + { + type: 'UPDATE', + requestedId: 'ANA_1058', + streamId: 'ANA_1058', + requestedScheme: 'TICKER_BC', + bestBid: { + value: 231.0, + size: 25.0, + unixTimestamp: Date.now() / 1000, + }, + bestAsk: { + value: 231.8, + size: 150.0, + unixTimestamp: Date.now() / 1000, + }, + }, + ], + }, +} + +export const mockErrorResponse = { + data: { + startStream: [ + { + type: 'ERROR', + requestedId: 'ABBN_4', + streamId: 'ABBN_4', + requestedScheme: 'TICKER_BC', + }, + ], + }, +} + +// Stale timestamps must be relative to the fake clock (epoch 0). +// 600s in the past from epoch 0 = negative timestamp, so we use a small positive +// timestamp that will be >300s old once the fake clock advances past ~302s. +// Since fake clock starts at 0 and the test runs quickly, a timestamp of 0 minus +// STALE_DATA_THRESHOLD_SECONDS (300) in the transport means any timestamp older than +// (currentTime - 300s) is stale. At fake time ~1s, anything < -299s is stale. +// We can't use negative timestamps, so we set timestamp to 0 and rely on the +// fake clock not advancing much — but the transport compares Date.now()/1000 - timestamp. +// With fake timers Date.now() returns ~1 (1 second), so age = 1 - 0 = 1s — NOT stale. +// Solution: use a very old real-world timestamp that will always be stale. +const VERY_OLD_TIMESTAMP = 1000000 +export const mockStaleDataResponse = { + data: { + startStream: [ + { + type: 'START', + requestedId: 'STALE_999', + streamId: 'STALE_999', + requestedScheme: 'TICKER_BC', + last: { + value: 100.0, + size: 10.0, + unixTimestamp: VERY_OLD_TIMESTAMP, + }, + bestBid: { + value: 99.5, + size: 50.0, + unixTimestamp: VERY_OLD_TIMESTAMP, + }, + bestAsk: { + value: 100.5, + size: 50.0, + unixTimestamp: VERY_OLD_TIMESTAMP, + }, + }, + ], + }, +} + +export const mockApiErrorResponse = { + errors: [ + { + message: 'User does not have access to the requested quality of service', + category: 'ENTITLEMENT_ERROR', + type: 'ACCESS_DENIED', + messageCode: 3001, + }, + ], +} + +const makeStaleResponse = () => { + // Use Date.now() at call time so it captures fake clock value, + // then subtract enough to exceed STALE_DATA_THRESHOLD_SECONDS (300s) + const staleTimestamp = Date.now() / 1000 - 600 + return { + data: { + startStream: [ + { + type: 'START', + requestedId: 'STALE_999', + streamId: 'STALE_999', + requestedScheme: 'TICKER_BC', + last: { value: 100.0, size: 10.0, unixTimestamp: staleTimestamp }, + bestBid: { value: 99.5, size: 50.0, unixTimestamp: staleTimestamp }, + bestAsk: { value: 100.5, size: 50.0, unixTimestamp: staleTimestamp }, + }, + ], + }, + } +} + +export const mockWebSocketServer = (URL: string): MockWebsocketServer => { + const mockWsServer = new MockWebsocketServer(URL, { mock: false }) + mockWsServer.on('connection', (socket) => { + socket.on('message', (msg) => { + const parsed = JSON.parse(msg.toString()) + const query = parsed.query || '' + + if (query.includes('ABBN_4')) { + socket.send(JSON.stringify(mockErrorResponse)) + } else if (query.includes('STALE_999')) { + socket.send(JSON.stringify(makeStaleResponse())) + } else if (query.includes('closeStream')) { + socket.send( + JSON.stringify({ + data: { closeStream: [{ type: 'STOP', streamId: 'ANA_1058' }] }, + }), + ) + } else { + // Fresh timestamps based on current (fake) clock + const now = Date.now() / 1000 + const fresh = JSON.parse(JSON.stringify(mockStartResponse)) + for (const s of fresh.data.startStream) { + if (s.last) s.last.unixTimestamp = now + if (s.bestBid) s.bestBid.unixTimestamp = now + if (s.bestAsk) s.bestAsk.unixTimestamp = now + if (s.mid) s.mid.unixTimestamp = now + if (s.volume) s.volume.unixTimestamp = now + } + socket.send(JSON.stringify(fresh)) + } + }) + }) + return mockWsServer +} diff --git a/packages/sources/six/test/unit/utils.test.ts b/packages/sources/six/test/unit/utils.test.ts new file mode 100644 index 00000000000..c5df9619a3f --- /dev/null +++ b/packages/sources/six/test/unit/utils.test.ts @@ -0,0 +1,73 @@ +import { + buildCloseStreamQuery, + buildSubscriptionQuery, + mapMarketBaseStatusToV11, + MARKET_STATUS_CLOSED, + MARKET_STATUS_OPEN, + MARKET_STATUS_UNKNOWN, +} from '../../src/transport/utils' + +describe('buildSubscriptionQuery', () => { + it('builds a valid GraphQL subscription with conflation', () => { + const query = buildSubscriptionQuery('ABBN', '4', 'PT1S') + expect(query).toContain('startStream') + expect(query).toContain('TICKER_BC') + expect(query).toContain('"ABBN_4"') + expect(query).toContain('streamId: "ABBN_4"') + expect(query).toContain('conflationType: INTERVAL') + expect(query).toContain('conflationPeriod: "PT1S"') + expect(query).toContain('last { value size unixTimestamp }') + expect(query).toContain('bestBid { value size unixTimestamp }') + expect(query).toContain('bestAsk { value size unixTimestamp }') + expect(query).toContain('mid { value unixTimestamp }') + expect(query).toContain('volume { value unixTimestamp }') + }) + + it('uses the provided conflation period', () => { + const query = buildSubscriptionQuery('ANA', '1058', 'PT0.5S') + expect(query).toContain('"ANA_1058"') + expect(query).toContain('conflationPeriod: "PT0.5S"') + }) +}) + +describe('buildCloseStreamQuery', () => { + it('builds a valid closeStream mutation', () => { + const query = buildCloseStreamQuery('ABBN', '4') + expect(query).toContain('closeStream') + expect(query).toContain('streamId: "ABBN_4"') + expect(query).toContain('type') + expect(query).toContain('streamId') + }) +}) + +describe('mapMarketBaseStatusToV11', () => { + it('maps ACTIVE to open (2)', () => { + expect(mapMarketBaseStatusToV11('ACTIVE')).toBe(MARKET_STATUS_OPEN) + }) + + it('maps INACTIVE to closed (4)', () => { + expect(mapMarketBaseStatusToV11('INACTIVE')).toBe(MARKET_STATUS_CLOSED) + }) + + it('maps REFERENCE_ONLY to unknown (0)', () => { + expect(mapMarketBaseStatusToV11('REFERENCE_ONLY')).toBe(MARKET_STATUS_UNKNOWN) + }) + + it('maps OTHER to unknown (0)', () => { + expect(mapMarketBaseStatusToV11('OTHER')).toBe(MARKET_STATUS_UNKNOWN) + }) + + it('is case-insensitive', () => { + expect(mapMarketBaseStatusToV11('active')).toBe(MARKET_STATUS_OPEN) + expect(mapMarketBaseStatusToV11('Inactive')).toBe(MARKET_STATUS_CLOSED) + }) + + it('returns unknown (0) for undefined', () => { + expect(mapMarketBaseStatusToV11(undefined)).toBe(MARKET_STATUS_UNKNOWN) + }) + + it('returns unknown (0) for unrecognized statuses', () => { + expect(mapMarketBaseStatusToV11('SOMETHING_ELSE')).toBe(MARKET_STATUS_UNKNOWN) + expect(mapMarketBaseStatusToV11('')).toBe(MARKET_STATUS_UNKNOWN) + }) +}) diff --git a/packages/sources/six/tsconfig.json b/packages/sources/six/tsconfig.json new file mode 100644 index 00000000000..f59363fd76c --- /dev/null +++ b/packages/sources/six/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*", "src/**/*.json"], + "exclude": ["dist", "**/*.spec.ts", "**/*.test.ts"] +} diff --git a/packages/sources/six/tsconfig.test.json b/packages/sources/six/tsconfig.test.json new file mode 100644 index 00000000000..e3de28cb5c0 --- /dev/null +++ b/packages/sources/six/tsconfig.test.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.base.json", + "include": ["src/**/*", "**/test", "src/**/*.json"], + "compilerOptions": { + "noEmit": true + } +} diff --git a/packages/tsconfig.json b/packages/tsconfig.json index d4ffaadaa42..012d5cf382d 100644 --- a/packages/tsconfig.json +++ b/packages/tsconfig.json @@ -446,6 +446,9 @@ { "path": "./sources/securitize" }, + { + "path": "./sources/six" + }, { "path": "./sources/snowflake" }, diff --git a/packages/tsconfig.test.json b/packages/tsconfig.test.json index 07cd9c69557..f480439e041 100644 --- a/packages/tsconfig.test.json +++ b/packages/tsconfig.test.json @@ -446,6 +446,9 @@ { "path": "./sources/securitize/tsconfig.test.json" }, + { + "path": "./sources/six/tsconfig.test.json" + }, { "path": "./sources/snowflake/tsconfig.test.json" }, diff --git a/yarn.lock b/yarn.lock index d025ea6bee8..0286e8bf63a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4806,6 +4806,24 @@ __metadata: languageName: unknown linkType: soft +"@chainlink/six-adapter@workspace:packages/sources/six": + version: 0.0.0-use.local + resolution: "@chainlink/six-adapter@workspace:packages/sources/six" + dependencies: + "@chainlink/external-adapter-framework": "npm:2.13.1" + "@sinonjs/fake-timers": "npm:9.1.2" + "@types/jest": "npm:^29.5.14" + "@types/node": "npm:22.14.1" + "@types/sinonjs__fake-timers": "npm:8.1.5" + "@types/ws": "npm:^8" + axios: "npm:1.13.4" + nock: "npm:13.5.6" + tslib: "npm:2.4.1" + typescript: "npm:5.8.3" + ws: "npm:^8.18.3" + languageName: unknown + linkType: soft + "@chainlink/snowflake-adapter@workspace:packages/sources/snowflake": version: 0.0.0-use.local resolution: "@chainlink/snowflake-adapter@workspace:packages/sources/snowflake"