Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/bright-trainers-tan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@evidence-dev/mssql': minor
---

Fix runtime error handling for MSSQL data source
49 changes: 41 additions & 8 deletions packages/datasources/mssql/index.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,51 @@ const runQuery = async (queryString, database = {}, batchSize = 100000) => {

const request = new mssql.Request();
request.stream = true;

// Promise that resolves when recordset is emitted or rejects on request error
const recordsetPromise = new Promise((resolve, reject) => {
request.once('recordset', resolve);
request.once('error', reject);
});

// Start the streaming query
request.query(queryString);

const columns = await new Promise((res) => request.once('recordset', res));
try {
const columns = await recordsetPromise;

const stream = request.toReadableStream();
const results = await asyncIterableToBatchedAsyncGenerator(stream, batchSize, {
closeConnection: () => pool.close()
});
results.columnTypes = mapResultsToEvidenceColumnTypes(columns);
results.expectedRowCount = expected_row_count;
const stream = request.toReadableStream();

// Ensure any stream errors are handled to avoid unhandled 'error' events
stream.on('error', async (streamErr) => {
try {
await pool.close();
} catch (_) {
// ignore close errors
}
// Nothing else to do here; the asyncIterableToBatchedAsyncGenerator
// consumer will observe the error as a rejection when iterating.
});

const results = await asyncIterableToBatchedAsyncGenerator(stream, batchSize, {
closeConnection: () => pool.close()
});
results.columnTypes = mapResultsToEvidenceColumnTypes(columns);
results.expectedRowCount = expected_row_count;

return results;
} catch (err) {
// Close pool when we hit errors from the request/stream and normalize the error
try {
await pool.close();
} catch (_) {}

return results;
if (err && err.message) {
throw err.message.replace(/\n|\r/g, ' ');
} else {
throw ('' + err).replace(/\n|\r/g, ' ');
}
}
} catch (err) {
if (err.message) {
throw err.message.replace(/\n|\r/g, ' ');
Expand Down
71 changes: 69 additions & 2 deletions packages/datasources/mssql/test/test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { test } from 'uvu';
import * as assert from 'uvu/assert';
import runQuery from '../index.cjs';
import { TypeFidelity, batchedAsyncGeneratorToArray } from '@evidence-dev/db-commons';
import { createRequire } from 'module';
import 'dotenv/config';

const require = createRequire(import.meta.url);

test('query runs', async () => {
if (process.env.MSSQL_DATABASE) {
try {
Expand Down Expand Up @@ -99,4 +100,70 @@ test('query batches results properly', async () => {
}
});

test('runQuery returns normalized error on SQL syntax error', async () => {
// Create a fake mssql module and inject before loading index.cjs
const fakeMssql = {
TYPES: {},
connect: async () => ({
request: () => ({
query: async (q) => {
// If it's the COUNT(*) wrapper, return 0
if (typeof q === 'string' && q.trim().toUpperCase().startsWith('SELECT COUNT(*)')) {
return { recordset: [{ expected_row_count: 0 }] };
}
// otherwise, return empty
return { recordset: [] };
}
}),
close: async () => {}
}),
Request: function () {
const EventEmitter = require('events');
const r = new EventEmitter();
r.stream = false;
r.query = function (q) {
// simulate an async error emitted by the request (syntax error)
process.nextTick(() => {
const err = new Error("Incorrect syntax near the keyword 'select'.");
err.code = 'EREQUEST';
r.emit('error', err);
});
};
r.toReadableStream = function () {
const { Readable } = require('stream');
// A readable that immediately errors when read
const s = new Readable({ objectMode: true, read() {} });
process.nextTick(() => s.emit('error', new Error('stream error')));
return s;
};
return r;
}
};

const path = require('path');
const mssqlModulePath = path.join(process.cwd(), 'node_modules', 'mssql', 'index.js');
require.cache[mssqlModulePath] = {
id: mssqlModulePath,
filename: mssqlModulePath,
loaded: true,
exports: fakeMssql
};

const runQuery = require('../index.cjs');

try {
await runQuery('select * from', {}, 10);
assert.unreachable('Expected runQuery to throw');
} catch (e) {
// The function normalizes errors to strings
assert.ok(
typeof e === 'string' || e instanceof String || e.message,
'error should be string or have message'
);
const msg = typeof e === 'string' ? e : e.message || String(e);
// Message should be non-empty and mention syntax/select or be an error code
assert.ok(msg && msg.length > 0, 'error message should be non-empty');
}
});

test.run();
Loading