diff --git a/packages/pipeline/README.md b/packages/pipeline/README.md index a8c37da..1514255 100644 --- a/packages/pipeline/README.md +++ b/packages/pipeline/README.md @@ -229,21 +229,47 @@ Writes generated quads to a destination: Plugins hook into the pipeline lifecycle via the `PipelinePlugin` interface. Register them in the `plugins` array when constructing a `Pipeline`. +#### `namespaceNormalizationPlugin(options)` + +Generic plugin that rewrites namespace prefixes in `void:class` and `void:property` quad objects. Accepts `from` and `to` options specifying the source and target namespace URI prefixes. `void:vocabulary` quads are left unchanged so consumers can see which namespace the source dataset actually uses. + +```typescript +import { namespaceNormalizationPlugin } from ‘@lde/pipeline’; + +new Pipeline({ + // ... + plugins: [ + namespaceNormalizationPlugin({ + from: ‘http://example.org/’, + to: ‘https://example.org/’, + }), + ], +}); +``` + #### `provenancePlugin()` Appends [PROV-O](https://www.w3.org/TR/prov-o/) provenance quads (`prov:Entity`, `prov:Activity`, `prov:startedAtTime`, `prov:endedAtTime`) to every stage’s output. -#### `schemaOrgNormalizationPlugin()` +#### `schemaOrgNormalizationPlugin(options?)` + +Normalizes Schema.org namespace prefixes in `void:class` and `void:property` quad objects. By default, rewrites `http://schema.org/` to `https://schema.org/`. Pass `{ reverse: true }` to normalize in the opposite direction (`https://` to `http://`). `void:vocabulary` quads are left unchanged so consumers can see which namespace the source dataset actually uses. -Normalizes `http://schema.org/` to `https://schema.org/` in `void:class` and `void:property` quad objects, so downstream consumers can rely on a single canonical namespace. `void:vocabulary` quads are left unchanged so consumers can see which namespace the source dataset actually uses. +This is a convenience wrapper around `namespaceNormalizationPlugin`. ```typescript -import { schemaOrgNormalizationPlugin, provenancePlugin } from '@lde/pipeline'; +import { schemaOrgNormalizationPlugin, provenancePlugin } from ‘@lde/pipeline’; new Pipeline({ // ... plugins: [schemaOrgNormalizationPlugin(), provenancePlugin()], }); + +// Or reverse: normalize https to http +new Pipeline({ + // ... + plugins: [schemaOrgNormalizationPlugin({reverse: true})], +}); ``` ## Usage diff --git a/packages/pipeline/src/index.ts b/packages/pipeline/src/index.ts index b7aa806..6a77d68 100644 --- a/packages/pipeline/src/index.ts +++ b/packages/pipeline/src/index.ts @@ -9,5 +9,6 @@ export * from './stageOutputResolver.js'; export * from './sparql/index.js'; export * from './distribution/index.js'; export * from './writer/index.js'; +export * from './plugin/namespaceNormalization.js'; export * from './plugin/provenance.js'; export * from './plugin/schemaOrgNormalization.js'; diff --git a/packages/pipeline/src/plugin/namespaceNormalization.ts b/packages/pipeline/src/plugin/namespaceNormalization.ts new file mode 100644 index 0000000..f065ba7 --- /dev/null +++ b/packages/pipeline/src/plugin/namespaceNormalization.ts @@ -0,0 +1,68 @@ +import type {QuadTransform} from '../stage.js'; +import type {PipelinePlugin} from '../pipeline.js'; +import type {Quad} from '@rdfjs/types'; +import {DataFactory} from 'n3'; + +const {namedNode, quad} = DataFactory; + +const VOID_CLASS = namedNode('http://rdfs.org/ns/void#class'); +const VOID_PROPERTY = namedNode('http://rdfs.org/ns/void#property'); + +export interface NamespaceNormalizationOptions { + /** Namespace URI prefix to match (e.g. `http://schema.org/`). */ + from: string; + /** Namespace URI prefix to replace with (e.g. `https://schema.org/`). */ + to: string; +} + +/** + * Creates a QuadTransform that rewrites namespace prefixes in `void:class` and + * `void:property` quad objects from {@link NamespaceNormalizationOptions.from} + * to {@link NamespaceNormalizationOptions.to}. + * + * `void:vocabulary` quads are left unchanged so consumers can see which + * namespace the source dataset actually uses. + */ +export function namespaceNormalizationTransform( + options: NamespaceNormalizationOptions, +): QuadTransform { + return (quads) => normalizeNamespace(quads, options); +} + +/** + * Pipeline plugin that normalizes namespace prefixes in `void:class` and + * `void:property` quad objects. + * + * `void:vocabulary` quads are left unchanged so consumers can see which + * namespace the source dataset actually uses. + */ +export function namespaceNormalizationPlugin( + options: NamespaceNormalizationOptions, +): PipelinePlugin { + return { + name: 'namespace-normalization', + beforeStageWrite: namespaceNormalizationTransform(options), + }; +} + +async function* normalizeNamespace( + quads: AsyncIterable, + {from, to}: NamespaceNormalizationOptions, +): AsyncIterable { + for await (const q of quads) { + if ( + (q.predicate.equals(VOID_CLASS) || q.predicate.equals(VOID_PROPERTY)) && + q.object.termType === 'NamedNode' && + q.object.value.startsWith(from) + ) { + yield quad( + q.subject, + q.predicate, + namedNode(to + q.object.value.slice(from.length)), + q.graph, + ); + } else { + yield q; + } + } +} diff --git a/packages/pipeline/src/plugin/schemaOrgNormalization.ts b/packages/pipeline/src/plugin/schemaOrgNormalization.ts index d98f0e4..5a69e7e 100644 --- a/packages/pipeline/src/plugin/schemaOrgNormalization.ts +++ b/packages/pipeline/src/plugin/schemaOrgNormalization.ts @@ -1,53 +1,42 @@ -import type { QuadTransform } from '../stage.js'; -import type { PipelinePlugin } from '../pipeline.js'; -import type { Quad } from '@rdfjs/types'; -import { DataFactory } from 'n3'; - -const { namedNode, quad } = DataFactory; - -const VOID_CLASS = namedNode('http://rdfs.org/ns/void#class'); -const VOID_PROPERTY = namedNode('http://rdfs.org/ns/void#property'); +import type {QuadTransform} from '../stage.js'; +import type {PipelinePlugin} from '../pipeline.js'; +import { + namespaceNormalizationPlugin, + namespaceNormalizationTransform, +} from './namespaceNormalization.js'; const HTTP_SCHEMA_ORG = 'http://schema.org/'; const HTTPS_SCHEMA_ORG = 'https://schema.org/'; +export interface SchemaOrgNormalizationOptions { + /** When true, normalizes `https://schema.org/` to `http://schema.org/` instead. */ + reverse?: boolean; +} + /** QuadTransform that normalizes `http://schema.org/` to `https://schema.org/` in `void:class` and `void:property` objects. */ -export const schemaOrgNormalizationTransform: QuadTransform = (quads) => - normalizeSchemaOrg(quads); +export const schemaOrgNormalizationTransform: QuadTransform = + namespaceNormalizationTransform({ + from: HTTP_SCHEMA_ORG, + to: HTTPS_SCHEMA_ORG, + }); /** - * Pipeline plugin that normalizes `http://schema.org/` to `https://schema.org/` - * in `void:class` and `void:property` quad objects. + * Pipeline plugin that normalizes Schema.org namespace prefixes in `void:class` + * and `void:property` quad objects. + * + * By default, rewrites `http://schema.org/` to `https://schema.org/`. Pass + * `{ reverse: true }` to normalize in the opposite direction. * * `void:vocabulary` quads are left unchanged so consumers can see which * namespace the source dataset actually uses. */ -export function schemaOrgNormalizationPlugin(): PipelinePlugin { +export function schemaOrgNormalizationPlugin( + options?: SchemaOrgNormalizationOptions, +): PipelinePlugin { + const from = options?.reverse ? HTTPS_SCHEMA_ORG : HTTP_SCHEMA_ORG; + const to = options?.reverse ? HTTP_SCHEMA_ORG : HTTPS_SCHEMA_ORG; return { + ...namespaceNormalizationPlugin({from, to}), name: 'schema-org-normalization', - beforeStageWrite: schemaOrgNormalizationTransform, }; } - -async function* normalizeSchemaOrg( - quads: AsyncIterable, -): AsyncIterable { - for await (const q of quads) { - if ( - (q.predicate.equals(VOID_CLASS) || q.predicate.equals(VOID_PROPERTY)) && - q.object.termType === 'NamedNode' && - q.object.value.startsWith(HTTP_SCHEMA_ORG) - ) { - yield quad( - q.subject, - q.predicate, - namedNode( - HTTPS_SCHEMA_ORG + q.object.value.slice(HTTP_SCHEMA_ORG.length), - ), - q.graph, - ); - } else { - yield q; - } - } -} diff --git a/packages/pipeline/test/plugin/namespaceNormalization.test.ts b/packages/pipeline/test/plugin/namespaceNormalization.test.ts new file mode 100644 index 0000000..bf77ead --- /dev/null +++ b/packages/pipeline/test/plugin/namespaceNormalization.test.ts @@ -0,0 +1,130 @@ +import { + namespaceNormalizationTransform, + namespaceNormalizationPlugin, +} from '../../src/index.js'; +import {Dataset} from '@lde/dataset'; +import {describe, it, expect} from 'vitest'; +import {DataFactory} from 'n3'; +import type {Quad} from '@rdfjs/types'; + +const {namedNode, quad} = DataFactory; + +const VOID = 'http://rdfs.org/ns/void#'; + +const dataset = new Dataset({ + iri: new URL('http://example.com/dataset/1'), + distributions: [], +}); + +const options = {from: 'http://example.org/', to: 'https://example.org/'}; + +async function collect(iter: AsyncIterable): Promise { + const result: Quad[] = []; + for await (const q of iter) { + result.push(q); + } + return result; +} + +function quadStream(quads: Quad[]): AsyncIterable { + return (async function* () { + yield* quads; + })(); +} + +describe('namespaceNormalizationTransform', () => { + const transform = namespaceNormalizationTransform(options); + + it('rewrites void:class objects matching the source namespace', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}class`), + namedNode('http://example.org/Person'), + ); + + const quads = await collect(transform(quadStream([input]), dataset)); + + expect(quads).toHaveLength(1); + expect(quads[0].object.value).toBe('https://example.org/Person'); + }); + + it('rewrites void:property objects matching the source namespace', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}property`), + namedNode('http://example.org/name'), + ); + + const quads = await collect(transform(quadStream([input]), dataset)); + + expect(quads).toHaveLength(1); + expect(quads[0].object.value).toBe('https://example.org/name'); + }); + + it('does not rewrite void:vocabulary', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}vocabulary`), + namedNode('http://example.org/'), + ); + + const quads = await collect(transform(quadStream([input]), dataset)); + + expect(quads).toHaveLength(1); + expect(quads[0].object.value).toBe('http://example.org/'); + }); + + it('does not rewrite non-matching URIs', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}class`), + namedNode('http://xmlns.com/foaf/0.1/Person'), + ); + + const quads = await collect(transform(quadStream([input]), dataset)); + + expect(quads).toHaveLength(1); + expect(quads[0].object.value).toBe('http://xmlns.com/foaf/0.1/Person'); + }); + + it('does not rewrite URIs already using the target namespace', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}class`), + namedNode('https://example.org/Person'), + ); + + const quads = await collect(transform(quadStream([input]), dataset)); + + expect(quads).toHaveLength(1); + expect(quads[0].object.value).toBe('https://example.org/Person'); + }); + + it('preserves subject and graph when rewriting', async () => { + const graphNode = namedNode('http://example.com/graph'); + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}class`), + namedNode('http://example.org/Event'), + graphNode, + ); + + const quads = await collect(transform(quadStream([input]), dataset)); + + expect(quads[0].subject.value).toBe(dataset.iri.toString()); + expect(quads[0].object.value).toBe('https://example.org/Event'); + expect(quads[0].graph.value).toBe('http://example.com/graph'); + }); +}); + +describe('namespaceNormalizationPlugin', () => { + it('returns a plugin with the correct name', () => { + const plugin = namespaceNormalizationPlugin(options); + expect(plugin.name).toBe('namespace-normalization'); + }); + + it('has a beforeStageWrite hook', () => { + const plugin = namespaceNormalizationPlugin(options); + expect(plugin.beforeStageWrite).toBeTypeOf('function'); + }); +}); diff --git a/packages/pipeline/test/plugin/schemaOrgNormalization.test.ts b/packages/pipeline/test/plugin/schemaOrgNormalization.test.ts index 415e11e..dafb0cb 100644 --- a/packages/pipeline/test/plugin/schemaOrgNormalization.test.ts +++ b/packages/pipeline/test/plugin/schemaOrgNormalization.test.ts @@ -133,6 +133,57 @@ describe('schemaOrgNormalizationPlugin', () => { it('has a beforeStageWrite hook', () => { const plugin = schemaOrgNormalizationPlugin(); - expect(plugin.beforeStageWrite).toBe(schemaOrgNormalizationTransform); + expect(plugin.beforeStageWrite).toBeTypeOf('function'); + }); + + it('normalizes http to https by default', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}class`), + namedNode('http://schema.org/Person'), + ); + + const quads = await collect( + schemaOrgNormalizationPlugin().beforeStageWrite!( + quadStream([input]), + dataset, + ), + ); + + expect(quads[0].object.value).toBe('https://schema.org/Person'); + }); + + it('normalizes https to http when reverse is true', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}class`), + namedNode('https://schema.org/Person'), + ); + + const quads = await collect( + schemaOrgNormalizationPlugin({reverse: true}).beforeStageWrite!( + quadStream([input]), + dataset, + ), + ); + + expect(quads[0].object.value).toBe('http://schema.org/Person'); + }); + + it('does not rewrite http URIs when reverse is true', async () => { + const input = quad( + namedNode(dataset.iri.toString()), + namedNode(`${VOID}class`), + namedNode('http://schema.org/Person'), + ); + + const quads = await collect( + schemaOrgNormalizationPlugin({reverse: true}).beforeStageWrite!( + quadStream([input]), + dataset, + ), + ); + + expect(quads[0].object.value).toBe('http://schema.org/Person'); }); }); diff --git a/packages/pipeline/vite.config.ts b/packages/pipeline/vite.config.ts index fd921c6..2e9fcdd 100644 --- a/packages/pipeline/vite.config.ts +++ b/packages/pipeline/vite.config.ts @@ -11,10 +11,10 @@ export default mergeConfig( coverage: { thresholds: { autoUpdate: true, - functions: 93.57, - lines: 93.74, - branches: 88.01, - statements: 93.15, + functions: 93.66, + lines: 93.76, + branches: 88.14, + statements: 93.18, }, }, },