Files
ingey-eager/src/app/duckdb.service.ts
vato007 4def7b8daf
All checks were successful
build / build (push) Successful in 1m31s
Add basic column selection
2025-07-08 19:10:31 +09:30

278 lines
8.4 KiB
TypeScript

import { Injectable, OnInit } from '@angular/core';
import * as duckdb from '@duckdb/duckdb-wasm';
import { z } from 'zod';
export const Column = z.object({
name: z.string(),
type: z.string(),
enabled: z.boolean().default(true),
});
export const SortColumn = z.object({
name: z.string(),
sortType: z.enum(['asc', 'desc']),
});
export const FilterOperator = z.enum([
'startsWith',
'contains',
'notContains',
'endsWith',
'equals',
'notEquals',
]);
export const FilterValue = z.object({
value: z.string().trim(),
matchType: FilterOperator,
});
export const Filter = z.object({
column: z.string(),
value: FilterValue.optional().array(),
operator: z.enum(['and', 'or']),
});
export const aggregateTypes = ['avg', 'sum', 'min', 'max'] as const;
export const AggregateType = z.enum(aggregateTypes);
export const Aggregate = z.object({ column: z.string(), type: AggregateType });
export const AggregateValue = z.object({
column: z.string(),
value: z.number(),
});
export const RowsResponse = z.object({
rows: z.any(),
totalRows: z.bigint().nonnegative(),
aggregateValues: AggregateValue.array(),
});
export type Column = z.infer<typeof Column>;
export type SortColumn = z.infer<typeof SortColumn>;
export type FilterValue = z.infer<typeof FilterValue>;
export type FilterOperator = z.infer<typeof FilterOperator>;
export type Filter = z.infer<typeof Filter>;
export type AggregateType = z.infer<typeof AggregateType>;
export type Aggregate = z.infer<typeof Aggregate>;
export type AggregateValue = z.infer<typeof AggregateValue>;
export type RowsResponse = z.infer<typeof RowsResponse>;
const sanitisedFileName = (file: File) =>
file.name.toLowerCase().replaceAll("'", '').replaceAll(/\s*/g, '');
const sqlOperator = (operator: FilterOperator) => {
switch (operator) {
case 'startsWith':
case 'endsWith':
case 'contains':
case 'equals':
return 'ILIKE';
case 'notContains':
case 'notEquals':
return 'NOT ILIKE';
}
};
const prefix = (operator: FilterOperator) => {
switch (operator) {
case 'endsWith':
case 'contains':
case 'notContains':
return '%';
default:
return '';
}
};
const suffix = (operator: FilterOperator) => {
switch (operator) {
case 'startsWith':
case 'contains':
case 'notContains':
return '%';
default:
return '';
}
};
//https://www.npmjs.com/package/@duckdb/duckdb-wasm
@Injectable({
providedIn: 'root',
})
export class DuckdbService {
private db!: duckdb.AsyncDuckDB;
async init() {
const JSDELIVR_BUNDLES = duckdb.getJsDelivrBundles();
// Select a bundle based on browser checks
const bundle = await duckdb.selectBundle(JSDELIVR_BUNDLES);
const worker_url = URL.createObjectURL(
new Blob([`importScripts("${bundle.mainWorker!}");`], {
type: 'text/javascript',
}),
);
// Instantiate the asynchronous version of DuckDB-wasm
const worker = new Worker(worker_url);
const logger = new duckdb.ConsoleLogger();
this.db = new duckdb.AsyncDuckDB(logger, worker);
await this.db.instantiate(bundle.mainModule, bundle.pthreadWorker);
URL.revokeObjectURL(worker_url);
}
// TODO: Consider adding this as a table into the db for performance improvements
async addFile(file: File) {
// Technically only queries require lowercase file names but keeping it consistent
await this.db.registerFileText(sanitisedFileName(file), await file.text());
}
// We ignore sql injection as it's all on the user's machine and only applies to this session and can't touch the filesystem
async getColumns(file: File): Promise<Column[]> {
const conn = await this.db.connect();
try {
const response = await conn.query(
`DESCRIBE SELECT * FROM '${sanitisedFileName(file)}'`,
);
const cols: Column[] = [];
const numCols = response.numRows;
for (let i = 0; i < numCols; i++) {
const jsonData = response.get(i)?.toJSON()!;
cols.push({
name: jsonData['column_name'],
type: jsonData['column_type'],
enabled: true,
});
}
return cols;
} finally {
conn.close();
}
}
async getDistinctValuesForColumn(
file: File,
column: string,
limit = 2000,
): Promise<string[]> {
if (limit < 0 || !limit) {
throw 'Limit must be provided';
}
const conn = await this.db.connect();
try {
const response = await conn.query(
`SELECT DISTINCT ${column} FROM '${sanitisedFileName(file)}' ORDER BY ${column} LIMIT ${limit}`,
);
// return response.data.map();
return [];
} finally {
conn.close();
}
}
async getRows(
file: File,
start: number,
numRows: number,
columns: Column[],
sorts: SortColumn[],
filters: Filter[],
aggregates: Aggregate[],
): Promise<RowsResponse> {
const conn = await this.db.connect();
try {
const whereClause = this.getWhereClause(filters);
const mappedFilterValues = filters.flatMap((filter) =>
filter.value
.filter((value) => value?.value)
.map(
(value) =>
`${prefix(value?.matchType!)}${value?.value}${suffix(value?.matchType!)}`,
),
);
let aggregatesQuery = 'SELECT COUNT(1) totalRows';
if (aggregates.length > 0) {
for (const aggregate of aggregates) {
aggregatesQuery += `, ${aggregate.type}("${aggregate.column}") "${aggregate.column}"`;
}
}
aggregatesQuery += ` FROM ${sanitisedFileName(file)} ${whereClause}`;
const totalRowStmt = await conn.prepare(aggregatesQuery);
const totalRowResponse = await totalRowStmt.query(...mappedFilterValues);
const aggregatesJson = totalRowResponse.get(0)?.toJSON()!;
const totalRows = aggregatesJson['totalRows'];
const aggregateValues: AggregateValue[] = Object.entries(aggregatesJson)
.filter(([key]) => key !== 'totalRows')
.map(([key, value]) => AggregateValue.parse({ column: key, value }));
let query = `SELECT ${columns.map((column) => `"${column.name}"`).join(', ')} FROM ${sanitisedFileName(file)} ${whereClause}`;
if (sorts.length > 0) {
query += ` ORDER BY ${sorts.map((sort) => `"${sort.name}" ${sort.sortType}`).join(', ')}`;
}
query += ` LIMIT ${numRows} OFFSET ${start}`;
const stmt = await conn.prepare(query);
const rows = [];
for await (const batch of await stmt.send(...mappedFilterValues)) {
for (const row of batch) {
rows.push(row.toJSON()!);
}
}
return { rows, totalRows, aggregateValues };
} catch (err) {
console.error(err);
return { rows: [], totalRows: 0n, aggregateValues: [] };
} finally {
conn.close();
}
}
// Where clause that gets attached to the main and rowCount selects.
private getWhereClause(filters: Filter[]) {
let query = '';
if (filters.length > 0) {
let and = 'WHERE';
for (const filter of filters) {
if (filter.value.find((value) => value?.value)) {
query += ` ${and} (`;
let or = '';
for (const value of filter.value) {
if (value?.value) {
query += ` ${or} "${filter.column}" ${sqlOperator(value?.matchType!)} ? `;
or = filter.operator;
}
}
query += ') ';
and = 'and';
}
}
}
return query;
}
async getAggregateValue(file: File, filters: Filter[], aggregate: Aggregate) {
const conn = await this.db.connect();
try {
const whereClause = this.getWhereClause(filters);
const mappedFilterValues = filters.flatMap((filter) =>
filter.value
.filter((value) => value?.value)
.map(
(value) =>
`${prefix(value?.matchType!)}${value?.value}${suffix(value?.matchType!)}`,
),
);
const aggregatesQuery = `SELECT ${aggregate.type}("${aggregate.column}") "${aggregate.column}" FROM ${sanitisedFileName(file)} ${whereClause}`;
const totalRowStmt = await conn.prepare(aggregatesQuery);
const totalRowResponse = await totalRowStmt.query(...mappedFilterValues);
const aggregatesJson = totalRowResponse.get(0)?.toJSON()!;
return aggregatesJson[aggregate.column];
} catch (err) {
console.error(err);
return { rows: [], totalRows: 0n, aggregateValues: [] };
} finally {
conn.close();
}
}
}