Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions packages/pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -229,21 +229,47 @@ Writes generated quads to a destination:

Plugins hook into the pipeline lifecycle via the `PipelinePlugin` interface. Register them in the `plugins` array when constructing a `Pipeline`.

#### `namespaceNormalizationPlugin(options)`

Generic plugin that rewrites namespace prefixes in `void:class` and `void:property` quad objects. Accepts `from` and `to` options specifying the source and target namespace URI prefixes. `void:vocabulary` quads are left unchanged so consumers can see which namespace the source dataset actually uses.

```typescript
import { namespaceNormalizationPlugin } from ‘@lde/pipeline’;

new Pipeline({
// ...
plugins: [
namespaceNormalizationPlugin({
from: ‘http://example.org/’,
to: ‘https://example.org/’,
}),
],
});
```

#### `provenancePlugin()`

Appends [PROV-O](https://www.w3.org/TR/prov-o/) provenance quads (`prov:Entity`, `prov:Activity`, `prov:startedAtTime`, `prov:endedAtTime`) to every stage’s output.

#### `schemaOrgNormalizationPlugin()`
#### `schemaOrgNormalizationPlugin(options?)`

Normalizes Schema.org namespace prefixes in `void:class` and `void:property` quad objects. By default, rewrites `http://schema.org/` to `https://schema.org/`. Pass `{ reverse: true }` to normalize in the opposite direction (`https://` to `http://`). `void:vocabulary` quads are left unchanged so consumers can see which namespace the source dataset actually uses.

Normalizes `http://schema.org/` to `https://schema.org/` in `void:class` and `void:property` quad objects, so downstream consumers can rely on a single canonical namespace. `void:vocabulary` quads are left unchanged so consumers can see which namespace the source dataset actually uses.
This is a convenience wrapper around `namespaceNormalizationPlugin`.

```typescript
import { schemaOrgNormalizationPlugin, provenancePlugin } from '@lde/pipeline';
import { schemaOrgNormalizationPlugin, provenancePlugin } from @lde/pipeline;

new Pipeline({
// ...
plugins: [schemaOrgNormalizationPlugin(), provenancePlugin()],
});

// Or reverse: normalize https to http
new Pipeline({
// ...
plugins: [schemaOrgNormalizationPlugin({reverse: true})],
});
```

## Usage
Expand Down
1 change: 1 addition & 0 deletions packages/pipeline/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ export * from './stageOutputResolver.js';
export * from './sparql/index.js';
export * from './distribution/index.js';
export * from './writer/index.js';
export * from './plugin/namespaceNormalization.js';
export * from './plugin/provenance.js';
export * from './plugin/schemaOrgNormalization.js';
68 changes: 68 additions & 0 deletions packages/pipeline/src/plugin/namespaceNormalization.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import type {QuadTransform} from '../stage.js';
import type {PipelinePlugin} from '../pipeline.js';
import type {Quad} from '@rdfjs/types';
import {DataFactory} from 'n3';

const {namedNode, quad} = DataFactory;

const VOID_CLASS = namedNode('http://rdfs.org/ns/void#class');
const VOID_PROPERTY = namedNode('http://rdfs.org/ns/void#property');

export interface NamespaceNormalizationOptions {
/** Namespace URI prefix to match (e.g. `http://schema.org/`). */
from: string;
/** Namespace URI prefix to replace with (e.g. `https://schema.org/`). */
to: string;
}

/**
* Creates a QuadTransform that rewrites namespace prefixes in `void:class` and
* `void:property` quad objects from {@link NamespaceNormalizationOptions.from}
* to {@link NamespaceNormalizationOptions.to}.
*
* `void:vocabulary` quads are left unchanged so consumers can see which
* namespace the source dataset actually uses.
*/
export function namespaceNormalizationTransform(
options: NamespaceNormalizationOptions,
): QuadTransform {
return (quads) => normalizeNamespace(quads, options);
}

/**
* Pipeline plugin that normalizes namespace prefixes in `void:class` and
* `void:property` quad objects.
*
* `void:vocabulary` quads are left unchanged so consumers can see which
* namespace the source dataset actually uses.
*/
export function namespaceNormalizationPlugin(
options: NamespaceNormalizationOptions,
): PipelinePlugin {
return {
name: 'namespace-normalization',
beforeStageWrite: namespaceNormalizationTransform(options),
};
}

async function* normalizeNamespace(
quads: AsyncIterable<Quad>,
{from, to}: NamespaceNormalizationOptions,
): AsyncIterable<Quad> {
for await (const q of quads) {
if (
(q.predicate.equals(VOID_CLASS) || q.predicate.equals(VOID_PROPERTY)) &&
q.object.termType === 'NamedNode' &&
q.object.value.startsWith(from)
) {
yield quad(
q.subject,
q.predicate,
namedNode(to + q.object.value.slice(from.length)),
q.graph,
);
} else {
yield q;
}
}
}
65 changes: 27 additions & 38 deletions packages/pipeline/src/plugin/schemaOrgNormalization.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,42 @@
import type { QuadTransform } from '../stage.js';
import type { PipelinePlugin } from '../pipeline.js';
import type { Quad } from '@rdfjs/types';
import { DataFactory } from 'n3';

const { namedNode, quad } = DataFactory;

const VOID_CLASS = namedNode('http://rdfs.org/ns/void#class');
const VOID_PROPERTY = namedNode('http://rdfs.org/ns/void#property');
import type {QuadTransform} from '../stage.js';
import type {PipelinePlugin} from '../pipeline.js';
import {
namespaceNormalizationPlugin,
namespaceNormalizationTransform,
} from './namespaceNormalization.js';

const HTTP_SCHEMA_ORG = 'http://schema.org/';
const HTTPS_SCHEMA_ORG = 'https://schema.org/';

export interface SchemaOrgNormalizationOptions {
/** When true, normalizes `https://schema.org/` to `http://schema.org/` instead. */
reverse?: boolean;
}

/** QuadTransform that normalizes `http://schema.org/` to `https://schema.org/` in `void:class` and `void:property` objects. */
export const schemaOrgNormalizationTransform: QuadTransform = (quads) =>
normalizeSchemaOrg(quads);
export const schemaOrgNormalizationTransform: QuadTransform =
namespaceNormalizationTransform({
from: HTTP_SCHEMA_ORG,
to: HTTPS_SCHEMA_ORG,
});

/**
* Pipeline plugin that normalizes `http://schema.org/` to `https://schema.org/`
* in `void:class` and `void:property` quad objects.
* Pipeline plugin that normalizes Schema.org namespace prefixes in `void:class`
* and `void:property` quad objects.
*
* By default, rewrites `http://schema.org/` to `https://schema.org/`. Pass
* `{ reverse: true }` to normalize in the opposite direction.
*
* `void:vocabulary` quads are left unchanged so consumers can see which
* namespace the source dataset actually uses.
*/
export function schemaOrgNormalizationPlugin(): PipelinePlugin {
export function schemaOrgNormalizationPlugin(
options?: SchemaOrgNormalizationOptions,
): PipelinePlugin {
const from = options?.reverse ? HTTPS_SCHEMA_ORG : HTTP_SCHEMA_ORG;
const to = options?.reverse ? HTTP_SCHEMA_ORG : HTTPS_SCHEMA_ORG;
return {
...namespaceNormalizationPlugin({from, to}),
name: 'schema-org-normalization',
beforeStageWrite: schemaOrgNormalizationTransform,
};
}

async function* normalizeSchemaOrg(
quads: AsyncIterable<Quad>,
): AsyncIterable<Quad> {
for await (const q of quads) {
if (
(q.predicate.equals(VOID_CLASS) || q.predicate.equals(VOID_PROPERTY)) &&
q.object.termType === 'NamedNode' &&
q.object.value.startsWith(HTTP_SCHEMA_ORG)
) {
yield quad(
q.subject,
q.predicate,
namedNode(
HTTPS_SCHEMA_ORG + q.object.value.slice(HTTP_SCHEMA_ORG.length),
),
q.graph,
);
} else {
yield q;
}
}
}
130 changes: 130 additions & 0 deletions packages/pipeline/test/plugin/namespaceNormalization.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import {
namespaceNormalizationTransform,
namespaceNormalizationPlugin,
} from '../../src/index.js';
import {Dataset} from '@lde/dataset';
import {describe, it, expect} from 'vitest';
import {DataFactory} from 'n3';
import type {Quad} from '@rdfjs/types';

const {namedNode, quad} = DataFactory;

const VOID = 'http://rdfs.org/ns/void#';

const dataset = new Dataset({
iri: new URL('http://example.com/dataset/1'),
distributions: [],
});

const options = {from: 'http://example.org/', to: 'https://example.org/'};

async function collect(iter: AsyncIterable<Quad>): Promise<Quad[]> {
const result: Quad[] = [];
for await (const q of iter) {
result.push(q);
}
return result;
}

function quadStream(quads: Quad[]): AsyncIterable<Quad> {
return (async function* () {
yield* quads;
})();
}

describe('namespaceNormalizationTransform', () => {
const transform = namespaceNormalizationTransform(options);

it('rewrites void:class objects matching the source namespace', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}class`),
namedNode('http://example.org/Person'),
);

const quads = await collect(transform(quadStream([input]), dataset));

expect(quads).toHaveLength(1);
expect(quads[0].object.value).toBe('https://example.org/Person');
});

it('rewrites void:property objects matching the source namespace', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}property`),
namedNode('http://example.org/name'),
);

const quads = await collect(transform(quadStream([input]), dataset));

expect(quads).toHaveLength(1);
expect(quads[0].object.value).toBe('https://example.org/name');
});

it('does not rewrite void:vocabulary', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}vocabulary`),
namedNode('http://example.org/'),
);

const quads = await collect(transform(quadStream([input]), dataset));

expect(quads).toHaveLength(1);
expect(quads[0].object.value).toBe('http://example.org/');
});

it('does not rewrite non-matching URIs', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}class`),
namedNode('http://xmlns.com/foaf/0.1/Person'),
);

const quads = await collect(transform(quadStream([input]), dataset));

expect(quads).toHaveLength(1);
expect(quads[0].object.value).toBe('http://xmlns.com/foaf/0.1/Person');
});

it('does not rewrite URIs already using the target namespace', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}class`),
namedNode('https://example.org/Person'),
);

const quads = await collect(transform(quadStream([input]), dataset));

expect(quads).toHaveLength(1);
expect(quads[0].object.value).toBe('https://example.org/Person');
});

it('preserves subject and graph when rewriting', async () => {
const graphNode = namedNode('http://example.com/graph');
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}class`),
namedNode('http://example.org/Event'),
graphNode,
);

const quads = await collect(transform(quadStream([input]), dataset));

expect(quads[0].subject.value).toBe(dataset.iri.toString());
expect(quads[0].object.value).toBe('https://example.org/Event');
expect(quads[0].graph.value).toBe('http://example.com/graph');
});
});

describe('namespaceNormalizationPlugin', () => {
it('returns a plugin with the correct name', () => {
const plugin = namespaceNormalizationPlugin(options);
expect(plugin.name).toBe('namespace-normalization');
});

it('has a beforeStageWrite hook', () => {
const plugin = namespaceNormalizationPlugin(options);
expect(plugin.beforeStageWrite).toBeTypeOf('function');
});
});
53 changes: 52 additions & 1 deletion packages/pipeline/test/plugin/schemaOrgNormalization.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,57 @@ describe('schemaOrgNormalizationPlugin', () => {

it('has a beforeStageWrite hook', () => {
const plugin = schemaOrgNormalizationPlugin();
expect(plugin.beforeStageWrite).toBe(schemaOrgNormalizationTransform);
expect(plugin.beforeStageWrite).toBeTypeOf('function');
});

it('normalizes http to https by default', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}class`),
namedNode('http://schema.org/Person'),
);

const quads = await collect(
schemaOrgNormalizationPlugin().beforeStageWrite!(
quadStream([input]),
dataset,
),
);

expect(quads[0].object.value).toBe('https://schema.org/Person');
});

it('normalizes https to http when reverse is true', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}class`),
namedNode('https://schema.org/Person'),
);

const quads = await collect(
schemaOrgNormalizationPlugin({reverse: true}).beforeStageWrite!(
quadStream([input]),
dataset,
),
);

expect(quads[0].object.value).toBe('http://schema.org/Person');
});

it('does not rewrite http URIs when reverse is true', async () => {
const input = quad(
namedNode(dataset.iri.toString()),
namedNode(`${VOID}class`),
namedNode('http://schema.org/Person'),
);

const quads = await collect(
schemaOrgNormalizationPlugin({reverse: true}).beforeStageWrite!(
quadStream([input]),
dataset,
),
);

expect(quads[0].object.value).toBe('http://schema.org/Person');
});
});
Loading