diff --git a/genkit-tools/common/src/types/apis.ts b/genkit-tools/common/src/types/apis.ts index e9e6830266..841184c107 100644 --- a/genkit-tools/common/src/types/apis.ts +++ b/genkit-tools/common/src/types/apis.ts @@ -33,9 +33,16 @@ import { TraceDataSchema } from './trace'; * It's used directly in the generation of the Reflection API OpenAPI spec. */ +const PrimitiveSchema = z.union([z.string(), z.number()]); +const FilterValueSchema = z.union([PrimitiveSchema, z.array(PrimitiveSchema)]); + export const TraceQueryFilterSchema = z.object({ - eq: z.record(z.string(), z.union([z.string(), z.number()])).optional(), - neq: z.record(z.string(), z.union([z.string(), z.number()])).optional(), + eq: z.record(z.string(), FilterValueSchema).optional(), + neq: z.record(z.string(), FilterValueSchema).optional(), + gt: z.record(z.string(), z.number()).optional(), + gte: z.record(z.string(), z.number()).optional(), + lt: z.record(z.string(), z.number()).optional(), + lte: z.record(z.string(), z.number()).optional(), }); export type TraceQueryFilter = z.infer; diff --git a/genkit-tools/telemetry-server/src/file-trace-store.ts b/genkit-tools/telemetry-server/src/file-trace-store.ts index 49e3690289..590cfd67a3 100644 --- a/genkit-tools/telemetry-server/src/file-trace-store.ts +++ b/genkit-tools/telemetry-server/src/file-trace-store.ts @@ -417,27 +417,7 @@ export class Index { return undefined; } }) - .filter((d) => { - if (!d) return false; - if (!query?.filter) return true; - if ( - query.filter.eq && - Object.keys(query.filter.eq).find( - (k) => d[k] !== query.filter!.eq![k] - ) - ) { - return false; - } - if ( - query.filter.neq && - Object.keys(query.filter.neq).find( - (k) => d[k] === query.filter!.neq![k] - ) - ) { - return false; - } - return true; - }) + .filter((d) => (!d ? false : isFilterMatch(d, query?.filter))) .reverse() as Record[]; fullData.push(...fileData); @@ -473,6 +453,63 @@ export class Index { } } +function isFilterMatch( + d: Record, + filter?: TraceQueryFilter +): boolean { + if (!filter) return true; + const { eq, neq, gt, gte, lt, lte } = filter; + if (eq) { + for (const k of Object.keys(eq)) { + const filterVal = eq[k]; + const val = d[k]; + if ( + Array.isArray(filterVal) + ? !filterVal.includes(val as any) + : val !== filterVal + ) + return false; + } + } + if (neq) { + for (const k of Object.keys(neq)) { + const filterVal = neq[k]; + const val = d[k]; + if ( + Array.isArray(filterVal) + ? filterVal.includes(val as any) + : val === filterVal + ) + return false; + } + } + if (gt) { + for (const k of Object.keys(gt)) { + const val = d[k]; + if (typeof val !== 'number' || val <= gt[k]) return false; + } + } + if (gte) { + for (const k of Object.keys(gte)) { + const val = d[k]; + if (typeof val !== 'number' || val < gte[k]) return false; + } + } + if (lt) { + for (const k of Object.keys(lt)) { + const val = d[k]; + if (typeof val !== 'number' || val >= lt[k]) return false; + } + } + if (lte) { + for (const k of Object.keys(lte)) { + const val = d[k]; + if (typeof val !== 'number' || val > lte[k]) return false; + } + } + return true; +} + function lockFile(file: string) { return `${file}.lock`; } diff --git a/genkit-tools/telemetry-server/tests/file_store_test.ts b/genkit-tools/telemetry-server/tests/file_store_test.ts index 6089705e01..8e6defc756 100644 --- a/genkit-tools/telemetry-server/tests/file_store_test.ts +++ b/genkit-tools/telemetry-server/tests/file_store_test.ts @@ -520,6 +520,217 @@ describe('index', () => { ); }); + it('should support array filters (IN/NOT IN)', () => { + const spanA = span(TRACE_ID_1, SPAN_A, 100, 100); + spanA.displayName = 'flowA'; + spanA.attributes['genkit:type'] = 'banana'; + + const spanB = span(TRACE_ID_2, SPAN_B, 200, 200); + spanB.displayName = 'flowB'; + spanB.attributes['genkit:type'] = undefined; + + const spanC = span(TRACE_ID_3, SPAN_C, 200, 200); + spanC.displayName = 'flowC'; + spanC.attributes['genkit:type'] = undefined; + + index.add({ + traceId: TRACE_ID_1, + spans: { + [SPAN_A]: spanA, + }, + } as TraceData); + index.add({ + traceId: TRACE_ID_2, + spans: { + [SPAN_B]: spanB, + }, + } as TraceData); + index.add({ + traceId: TRACE_ID_3, + spans: { + [SPAN_C]: spanC, + }, + } as TraceData); + + // Test array filters (IN) + assert.deepStrictEqual( + index.search({ + limit: 5, + filter: { + eq: { name: ['flowA', 'flowC'] }, + }, + }).data, + [ + { + id: TRACE_ID_3, + type: 'UNKNOWN', + name: 'flowC', + start: 1, + end: 2, + status: 0, + }, + { + id: TRACE_ID_1, + type: 'banana', + name: 'flowA', + start: 1, + end: 2, + status: 0, + }, + ] + ); + + // Test array filters (NOT IN) + assert.deepStrictEqual( + index.search({ + limit: 5, + filter: { + neq: { name: ['flowB', 'flowC'] }, + }, + }).data, + [ + { + id: TRACE_ID_1, + type: 'banana', + name: 'flowA', + start: 1, + end: 2, + status: 0, + }, + ] + ); + }); + + it('should support mixed type array filters (number and string)', () => { + const spanA = span(TRACE_ID_1, SPAN_A, 100, 100); + spanA.displayName = 'flowA'; + spanA.attributes['genkit:type'] = 'banana'; + + const spanB = span(TRACE_ID_2, SPAN_B, 200, 200); + spanB.displayName = 'flowB'; + spanB.attributes['genkit:type'] = undefined; + // Set status to undefined so it indexes as 'UNKNOWN' + spanB.status = undefined; + + const spanC = span(TRACE_ID_3, SPAN_C, 200, 200); + spanC.displayName = 'flowC'; + spanC.attributes['genkit:type'] = 'flow'; + spanC.status = { code: 1 }; // Status 1 + + index.add({ + traceId: TRACE_ID_1, + spans: { + [SPAN_A]: spanA, + }, + } as TraceData); + index.add({ + traceId: TRACE_ID_2, + spans: { + [SPAN_B]: spanB, + }, + } as TraceData); + index.add({ + traceId: TRACE_ID_3, + spans: { + [SPAN_C]: spanC, + }, + } as TraceData); + + assert.deepStrictEqual( + index + .search({ + limit: 5, + filter: { + eq: { status: [1, 'UNKNOWN'] }, + }, + }) + .data.map((d) => ({ id: d.id, status: d.status })), + [ + { id: TRACE_ID_3, status: 1 }, + { id: TRACE_ID_2, status: 'UNKNOWN' }, + ] + ); + }); + + it('should support numeric comparison filters (gt/gte/lt/lte)', () => { + // Traces with different start times + const span1 = span('t1', 's1', 10, 10); + span1.startTime = 100; + span1.endTime = 200; + + const span2 = span('t2', 's2', 10, 10); + span2.startTime = 200; + span2.endTime = 300; + + const span3 = span('t3', 's3', 10, 10); + span3.startTime = 300; + span3.endTime = 400; + + const span4 = span('t4', 's4', 10, 10); + span4.startTime = 400; + span4.endTime = 500; + + index.add({ traceId: 't1', spans: { s1: span1 } } as TraceData); + index.add({ traceId: 't2', spans: { s2: span2 } } as TraceData); + index.add({ traceId: 't3', spans: { s3: span3 } } as TraceData); + index.add({ traceId: 't4', spans: { s4: span4 } } as TraceData); + + // gt: start > 200 -> t3 (300), t4 (400) + assert.deepStrictEqual( + index + .search({ + limit: 10, + filter: { gt: { start: 200 } }, + }) + .data.map((d) => d.id), + ['t4', 't3'] + ); + + // gte: start >= 200 -> t2 (200), t3 (300), t4 (400) + assert.deepStrictEqual( + index + .search({ + limit: 10, + filter: { gte: { start: 200 } }, + }) + .data.map((d) => d.id), + ['t4', 't3', 't2'] + ); + + // lt: start < 300 -> t1 (100), t2 (200) + assert.deepStrictEqual( + index + .search({ + limit: 10, + filter: { lt: { start: 300 } }, + }) + .data.map((d) => d.id), + ['t2', 't1'] + ); + + // lte: start <= 300 -> t1 (100), t2 (200), t3 (300) + assert.deepStrictEqual( + index + .search({ + limit: 10, + filter: { lte: { start: 300 } }, + }) + .data.map((d) => d.id), + ['t3', 't2', 't1'] + ); + + // Combined: start > 100 AND start < 400 -> t2 (200), t3 (300) + assert.deepStrictEqual( + index + .search({ + limit: 10, + filter: { gt: { start: 100 }, lt: { start: 400 } }, + }) + .data.map((d) => d.id), + ['t3', 't2'] + ); + }); + it('should paginate search', () => { for (let i = 0; i < 20; i++) { const traceId = 'trace_' + i;