Skip to content
371 changes: 367 additions & 4 deletions packages/server/src/common/actions/action-dispatcher.spec.ts

Large diffs are not rendered by default.

274 changes: 255 additions & 19 deletions packages/server/src/common/actions/action-dispatcher.ts

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions packages/server/src/common/actions/client-action-handler.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (c) 2022-2023 STMicroelectronics and others.
* Copyright (c) 2022-2026 STMicroelectronics and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
Expand Down Expand Up @@ -54,6 +54,6 @@ export class ClientActionForwarder {
if (ClientAction.is(action)) {
return false;
}
return this.actionKinds.has(action.kind) || ResponseAction.is(action);
return this.actionKinds.has(action.kind) || ResponseAction.hasValidResponseId(action);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (c) 2022-2025 STMicroelectronics and others.
* Copyright (c) 2022-2026 STMicroelectronics and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/********************************************************************************
* Copyright (c) 2023 EclipseSource and others.
* Copyright (c) 2023-2026 EclipseSource and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/common/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ export * from './session/client-session-factory';
export * from './session/client-session-initializer';
export * from './session/client-session-listener';
export * from './session/client-session-manager';
export * from './utils/action-channel';
export * from './utils/args-util';
export * from './utils/client-options-util';
export * from './utils/console-logger';
Expand Down
36 changes: 36 additions & 0 deletions packages/server/src/common/protocol/glsp-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
InitializeResult,
MaybePromise,
MessageAction,
RejectAction,
RequestAction,
ResponseAction,
ServerActions,
distinctAdd,
remove
Expand Down Expand Up @@ -150,9 +153,42 @@
}
const action = message.action;
ClientAction.mark(action);
if (RequestAction.is(action)) {
this.handleClientRequest(clientSession, action, message.clientId);
return;
}
clientSession.actionDispatcher.dispatch(action).catch(error => this.handleProcessError(message, error));
}

// Fire-and-forget: intentionally not awaited by process()
protected async handleClientRequest(
clientSession: ClientSession,
action: RequestAction<ResponseAction>,
clientId: string
): Promise<void> {
try {
const response =
action.timeout !== undefined

Check failure on line 171 in packages/server/src/common/protocol/glsp-server.ts

View workflow job for this annotation

GitHub Actions / Lint

Property 'timeout' does not exist on type 'RequestAction<ResponseAction>'.

Check failure on line 171 in packages/server/src/common/protocol/glsp-server.ts

View workflow job for this annotation

GitHub Actions / Build

Property 'timeout' does not exist on type 'RequestAction<ResponseAction>'.

Check failure on line 171 in packages/server/src/common/protocol/glsp-server.ts

View workflow job for this annotation

GitHub Actions / Test

Property 'timeout' does not exist on type 'RequestAction<ResponseAction>'.
? await clientSession.actionDispatcher.requestUntil(action, action.timeout, true)

Check failure on line 172 in packages/server/src/common/protocol/glsp-server.ts

View workflow job for this annotation

GitHub Actions / Lint

Property 'timeout' does not exist on type 'RequestAction<ResponseAction>'.

Check failure on line 172 in packages/server/src/common/protocol/glsp-server.ts

View workflow job for this annotation

GitHub Actions / Build

Property 'timeout' does not exist on type 'RequestAction<ResponseAction>'.

Check failure on line 172 in packages/server/src/common/protocol/glsp-server.ts

View workflow job for this annotation

GitHub Actions / Test

Property 'timeout' does not exist on type 'RequestAction<ResponseAction>'.
: await clientSession.actionDispatcher.request(action);
if (response) {
this.sendResponseToClient(clientId, response);
}
} catch (error) {
const detail = error instanceof GLSPServerError ? error.cause?.toString?.() : error?.toString?.();
this.logger.error(`Failed to handle request '${action.kind}' (${action.requestId}):`, detail);
const reject = RejectAction.create(`Failed to handle request '${action.kind}' (${action.requestId})`, {
responseId: action.requestId,
detail
});
this.sendResponseToClient(clientId, reject);
}
}

protected sendResponseToClient(clientId: string, response: ResponseAction): void {
this.sendToClient({ clientId, action: response });
}

protected handleProcessError(message: ActionMessage, reason: any): void | PromiseLike<void> {
let errorMsg = `Could not process action: '${message.action.kind}`;
this.logger.error(errorMsg);
Expand Down
16 changes: 15 additions & 1 deletion packages/server/src/common/test/mock-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ import {
MaybeArray,
MaybePromise,
Point,
RequestAction,
RequestEditValidationAction,
ResponseAction,
ShapeTypeHint,
ValidationStatus
} from '@eclipse-glsp/protocol';
Expand Down Expand Up @@ -95,7 +97,7 @@ export function createClientSession(
export class StubActionHandler implements ActionHandler {
constructor(public actionKinds: string[]) {}

execute(action: Action): Action[] {
execute(action: Action): MaybePromise<Action[]> {
return [];
}
}
Expand Down Expand Up @@ -136,6 +138,18 @@ export class StubActionDispatcher implements ActionDispatcher {
dispatchAll(...actions: MaybeArray<Action>[]): Promise<void> {
return Promise.resolve();
}

request<Res extends ResponseAction>(action: RequestAction<Res>): Promise<Res> {
return Promise.reject(new Error('Not implemented in stub'));
}

requestUntil<Res extends ResponseAction>(
action: RequestAction<Res>,
timeoutMs?: number,
rejectOnTimeout?: boolean
): Promise<Res | undefined> {
return Promise.reject(new Error('Not implemented in stub'));
}
}

export class StubClientSessionFactory implements ClientSessionFactory {
Expand Down
120 changes: 120 additions & 0 deletions packages/server/src/common/utils/action-channel.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/********************************************************************************
* Copyright (c) 2026 EclipseSource and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/
import { expect } from 'chai';
import { expectToThrowAsync } from '../test/mock-util';
import { ActionChannel } from './action-channel';

describe('ActionChannel', () => {
it('yields pushed items in FIFO order', async () => {
const channel = new ActionChannel<number>();
const consumed: number[] = [];

const consumer = (async (): Promise<void> => {
for await (const entry of channel.consume()) {
consumed.push(entry.item);
entry.resolve();
}
})();

await Promise.all([channel.push(1), channel.push(2), channel.push(3)]);
channel.stop();
await consumer;

expect(consumed).to.deep.equal([1, 2, 3]);
});

it('resolves the push promise once the consumer resolves the entry', async () => {
const channel = new ActionChannel<string>();
let entryResolver: (() => void) | undefined;

const consumer = (async (): Promise<void> => {
for await (const entry of channel.consume()) {
entryResolver = entry.resolve;
return;
}
})();

const pushed = channel.push('a');
// Give the consumer a turn to pick up the entry.
await Promise.resolve();
await consumer;
expect(entryResolver).to.exist;
entryResolver!();
await pushed;
});

it('propagates reject() from the consumer back to the pushing caller', async () => {
const channel = new ActionChannel<number>();

const consumer = (async (): Promise<void> => {
for await (const entry of channel.consume()) {
entry.reject(new Error('boom'));
return;
}
})();

const pushed = channel.push(1);
await consumer;
await expectToThrowAsync(() => pushed, 'boom');
});

it('rejects push() after stop()', async () => {
const channel = new ActionChannel<number>();
channel.stop();
await expectToThrowAsync(() => channel.push(1), 'ActionChannel is stopped');
});

it('consumer exits after stop() and drain', async () => {
const channel = new ActionChannel<number>();
const consumed: number[] = [];

const consumer = (async (): Promise<void> => {
for await (const entry of channel.consume()) {
consumed.push(entry.item);
entry.resolve();
}
})();

await channel.push(1);
await channel.push(2);
channel.stop();
await consumer;

expect(consumed).to.deep.equal([1, 2]);
expect(channel.isStopped).to.be.true;
});

it('rejectPending() rejects all queued push() promises without stopping', async () => {
const channel = new ActionChannel<number>();
const pushes = [channel.push(1), channel.push(2)];
expect(channel.size).to.equal(2);

channel.rejectPending(new Error('cleared'));

await expectToThrowAsync(() => pushes[0], 'cleared');
await expectToThrowAsync(() => pushes[1], 'cleared');
expect(channel.size).to.equal(0);
expect(channel.isStopped).to.be.false;
});

it('size reflects the number of unconsumed entries', async () => {
const channel = new ActionChannel<number>();
channel.push(1);
channel.push(2);
channel.push(3);
expect(channel.size).to.equal(3);
});
});
101 changes: 101 additions & 0 deletions packages/server/src/common/utils/action-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/********************************************************************************
* Copyright (c) 2026 EclipseSource and others.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the Eclipse
* Public License v. 2.0 are satisfied: GNU General Public License, version 2
* with the GNU Classpath Exception which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
********************************************************************************/

/**
* An entry yielded by {@link ActionChannel.consume}. The consumer must call either
* `resolve()` or `reject(error)` exactly once after processing `item`.
*/
export interface ActionChannelEntry<T> {
item: T;
resolve: () => void;
reject: (error: unknown) => void;
}

/**
* Producer/consumer channel with a single async consumer loop. Mirrors the Java
* dispatcher's `BlockingQueue` + consumer thread architecture.
Comment thread
tortmayr marked this conversation as resolved.
Outdated
*
* Items pushed via {@link push} are yielded by {@link consume} in FIFO order.
* The promise returned by `push()` resolves or rejects when the consumer finishes
* processing the item (so producers can propagate errors back to callers).
*/
export class ActionChannel<T> {
protected queue: ActionChannelEntry<T>[] = [];
protected notify: (() => void) | undefined;
protected stopped = false;

/**
* Enqueues an item and returns a promise that settles when the consumer processes it.
* Rejects immediately if the channel has been stopped.
*/
push(item: T): Promise<void> {
if (this.stopped) {
return Promise.reject(new Error('ActionChannel is stopped'));
}
return new Promise((resolve, reject) => {
this.queue.push({ item, resolve, reject });
this.notify?.();
});
}

/**
* Yields pending entries in FIFO order, blocking on a notification promise when empty.
* Exits once the channel is stopped and the queue has been drained.
*/
async *consume(): AsyncGenerator<ActionChannelEntry<T>> {
while (!this.stopped || this.queue.length > 0) {
Comment thread
tortmayr marked this conversation as resolved.
Outdated
while (this.queue.length > 0) {
yield this.queue.shift()!;
}
if (this.stopped) {
return;
}
await new Promise<void>(resolve => {
this.notify = resolve;
});
this.notify = undefined;
}
}

/**
* Stops the channel. Further {@link push} calls reject. The consumer loop exits after
* the remaining queued entries have been yielded (or immediately if the queue is empty).
*/
stop(): void {
this.stopped = true;
this.notify?.();
}

/**
* Rejects all queued entries with the given reason so producers awaiting their
* `push()` promises do not hang. Does not stop the channel.
*/
rejectPending(reason: Error = new Error('ActionChannel cleared')): void {
const pending = this.queue;
this.queue = [];
for (const entry of pending) {
entry.reject(reason);
}
}

get size(): number {
return this.queue.length;
}

get isStopped(): boolean {
return this.stopped;
}
}
4 changes: 4 additions & 0 deletions packages/server/src/common/utils/promise-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export interface PromiseQueueElement<T = void> {
* of promises. Promises that are put in this queue are processed one by one.
* i.e. After the first promise in the queue is resolved, it will be removed from the queue and the resolving of the
* the next promise (if present) will start. The queue can only resolve one promise at a given time.
*
* @deprecated Since 2.7. The `DefaultActionDispatcher` no longer uses this queue. Kept for
* backwards compatibility; will be removed in a future release. New code should use
* {@link ActionChannel} or native async patterns instead.
*/
export class PromiseQueue<T = void> {
protected queue: PromiseQueueElement<T>[] = [];
Expand Down
Loading