Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions genkit-tools/common/src/types/apis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,16 @@ import { TraceDataSchema } from './trace';
* It's used directly in the generation of the Reflection API OpenAPI spec.
*/

const PrimitiveSchema = z.union([z.string(), z.number()]);
const FilterValueSchema = z.union([PrimitiveSchema, z.array(PrimitiveSchema)]);

export const TraceQueryFilterSchema = z.object({
eq: z.record(z.string(), z.union([z.string(), z.number()])).optional(),
neq: z.record(z.string(), z.union([z.string(), z.number()])).optional(),
eq: z.record(z.string(), FilterValueSchema).optional(),
neq: z.record(z.string(), FilterValueSchema).optional(),
gt: z.record(z.string(), z.number()).optional(),
gte: z.record(z.string(), z.number()).optional(),
lt: z.record(z.string(), z.number()).optional(),
lte: z.record(z.string(), z.number()).optional(),
});

export type TraceQueryFilter = z.infer<typeof TraceQueryFilterSchema>;
Expand Down
79 changes: 58 additions & 21 deletions genkit-tools/telemetry-server/src/file-trace-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,27 +417,7 @@ export class Index {
return undefined;
}
})
.filter((d) => {
if (!d) return false;
if (!query?.filter) return true;
if (
query.filter.eq &&
Object.keys(query.filter.eq).find(
(k) => d[k] !== query.filter!.eq![k]
)
) {
return false;
}
if (
query.filter.neq &&
Object.keys(query.filter.neq).find(
(k) => d[k] === query.filter!.neq![k]
)
) {
return false;
}
return true;
})
.filter((d) => (!d ? false : isFilterMatch(d, query?.filter)))
.reverse() as Record<string, string | number>[];

fullData.push(...fileData);
Expand Down Expand Up @@ -473,6 +453,63 @@ export class Index {
}
}

function isFilterMatch(
d: Record<string, string | number>,
filter?: TraceQueryFilter
): boolean {
if (!filter) return true;
const { eq, neq, gt, gte, lt, lte } = filter;
if (eq) {
for (const k of Object.keys(eq)) {
const filterVal = eq[k];
const val = d[k];
if (
Array.isArray(filterVal)
? !filterVal.includes(val as any)
: val !== filterVal
)
return false;
}
}
if (neq) {
for (const k of Object.keys(neq)) {
const filterVal = neq[k];
const val = d[k];
if (
Array.isArray(filterVal)
? filterVal.includes(val as any)
: val === filterVal
)
return false;
}
}
if (gt) {
for (const k of Object.keys(gt)) {
const val = d[k];
if (typeof val !== 'number' || val <= gt[k]) return false;
}
}
if (gte) {
for (const k of Object.keys(gte)) {
const val = d[k];
if (typeof val !== 'number' || val < gte[k]) return false;
}
}
if (lt) {
for (const k of Object.keys(lt)) {
const val = d[k];
if (typeof val !== 'number' || val >= lt[k]) return false;
}
}
if (lte) {
for (const k of Object.keys(lte)) {
const val = d[k];
if (typeof val !== 'number' || val > lte[k]) return false;
}
}
return true;
}

function lockFile(file: string) {
return `${file}.lock`;
}
211 changes: 211 additions & 0 deletions genkit-tools/telemetry-server/tests/file_store_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,217 @@ describe('index', () => {
);
});

it('should support array filters (IN/NOT IN)', () => {
const spanA = span(TRACE_ID_1, SPAN_A, 100, 100);
spanA.displayName = 'flowA';
spanA.attributes['genkit:type'] = 'banana';

const spanB = span(TRACE_ID_2, SPAN_B, 200, 200);
spanB.displayName = 'flowB';
spanB.attributes['genkit:type'] = undefined;

const spanC = span(TRACE_ID_3, SPAN_C, 200, 200);
spanC.displayName = 'flowC';
spanC.attributes['genkit:type'] = undefined;

index.add({
traceId: TRACE_ID_1,
spans: {
[SPAN_A]: spanA,
},
} as TraceData);
index.add({
traceId: TRACE_ID_2,
spans: {
[SPAN_B]: spanB,
},
} as TraceData);
index.add({
traceId: TRACE_ID_3,
spans: {
[SPAN_C]: spanC,
},
} as TraceData);

// Test array filters (IN)
assert.deepStrictEqual(
index.search({
limit: 5,
filter: {
eq: { name: ['flowA', 'flowC'] },
},
}).data,
[
{
id: TRACE_ID_3,
type: 'UNKNOWN',
name: 'flowC',
start: 1,
end: 2,
status: 0,
},
{
id: TRACE_ID_1,
type: 'banana',
name: 'flowA',
start: 1,
end: 2,
status: 0,
},
]
);

// Test array filters (NOT IN)
assert.deepStrictEqual(
index.search({
limit: 5,
filter: {
neq: { name: ['flowB', 'flowC'] },
},
}).data,
[
{
id: TRACE_ID_1,
type: 'banana',
name: 'flowA',
start: 1,
end: 2,
status: 0,
},
]
);
});

it('should support mixed type array filters (number and string)', () => {
const spanA = span(TRACE_ID_1, SPAN_A, 100, 100);
spanA.displayName = 'flowA';
spanA.attributes['genkit:type'] = 'banana';

const spanB = span(TRACE_ID_2, SPAN_B, 200, 200);
spanB.displayName = 'flowB';
spanB.attributes['genkit:type'] = undefined;
// Set status to undefined so it indexes as 'UNKNOWN'
spanB.status = undefined;

const spanC = span(TRACE_ID_3, SPAN_C, 200, 200);
spanC.displayName = 'flowC';
spanC.attributes['genkit:type'] = 'flow';
spanC.status = { code: 1 }; // Status 1

index.add({
traceId: TRACE_ID_1,
spans: {
[SPAN_A]: spanA,
},
} as TraceData);
index.add({
traceId: TRACE_ID_2,
spans: {
[SPAN_B]: spanB,
},
} as TraceData);
index.add({
traceId: TRACE_ID_3,
spans: {
[SPAN_C]: spanC,
},
} as TraceData);

assert.deepStrictEqual(
index
.search({
limit: 5,
filter: {
eq: { status: [1, 'UNKNOWN'] },
},
})
.data.map((d) => ({ id: d.id, status: d.status })),
[
{ id: TRACE_ID_3, status: 1 },
{ id: TRACE_ID_2, status: 'UNKNOWN' },
]
);
});

it('should support numeric comparison filters (gt/gte/lt/lte)', () => {
// Traces with different start times
const span1 = span('t1', 's1', 10, 10);
span1.startTime = 100;
span1.endTime = 200;

const span2 = span('t2', 's2', 10, 10);
span2.startTime = 200;
span2.endTime = 300;

const span3 = span('t3', 's3', 10, 10);
span3.startTime = 300;
span3.endTime = 400;

const span4 = span('t4', 's4', 10, 10);
span4.startTime = 400;
span4.endTime = 500;

index.add({ traceId: 't1', spans: { s1: span1 } } as TraceData);
index.add({ traceId: 't2', spans: { s2: span2 } } as TraceData);
index.add({ traceId: 't3', spans: { s3: span3 } } as TraceData);
index.add({ traceId: 't4', spans: { s4: span4 } } as TraceData);

// gt: start > 200 -> t3 (300), t4 (400)
assert.deepStrictEqual(
index
.search({
limit: 10,
filter: { gt: { start: 200 } },
})
.data.map((d) => d.id),
['t4', 't3']
);

// gte: start >= 200 -> t2 (200), t3 (300), t4 (400)
assert.deepStrictEqual(
index
.search({
limit: 10,
filter: { gte: { start: 200 } },
})
.data.map((d) => d.id),
['t4', 't3', 't2']
);

// lt: start < 300 -> t1 (100), t2 (200)
assert.deepStrictEqual(
index
.search({
limit: 10,
filter: { lt: { start: 300 } },
})
.data.map((d) => d.id),
['t2', 't1']
);

// lte: start <= 300 -> t1 (100), t2 (200), t3 (300)
assert.deepStrictEqual(
index
.search({
limit: 10,
filter: { lte: { start: 300 } },
})
.data.map((d) => d.id),
['t3', 't2', 't1']
);

// Combined: start > 100 AND start < 400 -> t2 (200), t3 (300)
assert.deepStrictEqual(
index
.search({
limit: 10,
filter: { gt: { start: 100 }, lt: { start: 400 } },
})
.data.map((d) => d.id),
['t3', 't2']
);
});

it('should paginate search', () => {
for (let i = 0; i < 20; i++) {
const traceId = 'trace_' + i;
Expand Down
Loading