Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
617fe33
feat(backend): add node_io table schema for storing node inputs/outputs
betterclever Jan 6, 2026
5647763
feat(backend): add NodeIO service and API endpoints for node I/O insp…
betterclever Jan 6, 2026
6fa1454
feat(worker): implement node I/O persistence and trace optimization
betterclever Jan 6, 2026
9e3e04a
feat(telemetry): implement node I/O persistence and inspection
betterclever Jan 6, 2026
447b160
feat(security): integrate AWS credential contract in prowler-scan and…
betterclever Jan 6, 2026
d5d05d9
feat(telemetry): implement large payload spilling to object storage f…
betterclever Jan 6, 2026
644bf9e
feat(telemetry): add preview support and log chunking for spilled nod…
betterclever Jan 6, 2026
70eba08
refactor(runner): replace stdout-based output with file-based output …
betterclever Jan 6, 2026
68bfe6b
refactor(telemetry): standardize spill markers and thresholds
betterclever Jan 6, 2026
86ce5ce
fix(node-io): add warning logs for preview fetch failures
betterclever Jan 6, 2026
c6b189e
feat(runner): add stdout fallback for Docker file-based output
betterclever Jan 6, 2026
520f108
feat: implement full view modal for spilled node I/O and refine inspe…
betterclever Jan 6, 2026
cf24b3c
fix: resolve race condition in node I/O status recording
betterclever Jan 6, 2026
8f5b77f
fix(worker): resolve PTY spawn permissions and fix E2BIG by using fil…
betterclever Jan 6, 2026
70892d9
chore: Update `node-pty`, `@ai-sdk`, and `@aws-sdk` dependencies.
betterclever Jan 6, 2026
4a796fe
fix(worker): bypass input coercion for spilled data markers
betterclever Jan 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/scripts/generate-openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async function generateOpenApi() {
.build();

const document = SwaggerModule.createDocument(app, config);
console.log('Document paths keys:', Object.keys(document.paths).filter(k => k.includes('human')));
console.log('Document paths keys:', Object.keys(document.paths));
const cleaned = cleanupOpenApiDoc(document);
const repoRootSpecPath = join(__dirname, '..', '..', 'openapi.json');
const payload = JSON.stringify(cleaned, null, 2);
Expand Down
2 changes: 1 addition & 1 deletion backend/src/database/schema/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ export * from './webhooks';

export * from './terminal-records';
export * from './agent-trace-events';

export * from './node-io';
60 changes: 60 additions & 0 deletions backend/src/database/schema/node-io.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import {
bigserial,
boolean,
index,
integer,
jsonb,
pgTable,
text,
timestamp,
uniqueIndex,
varchar,
} from 'drizzle-orm/pg-core';

/**
* Stores the inputs and outputs of each node execution for inspection and debugging.
*
* For small payloads (< 100KB), data is stored inline as JSONB.
* For large payloads, data is spilled to object storage and a reference is stored.
*/
export const nodeIOTable = pgTable(
'node_io',
{
id: bigserial('id', { mode: 'number' }).primaryKey(),
runId: text('run_id').notNull(),
nodeRef: text('node_ref').notNull(),
workflowId: text('workflow_id'),
organizationId: varchar('organization_id', { length: 191 }),
componentId: text('component_id').notNull(),

// Inputs received by this node
inputs: jsonb('inputs').$type<Record<string, unknown>>(),
inputsSize: integer('inputs_size').notNull().default(0),
inputsSpilled: boolean('inputs_spilled').notNull().default(false),
inputsStorageRef: text('inputs_storage_ref'), // Object storage path if spilled

// Outputs produced by this node
outputs: jsonb('outputs').$type<Record<string, unknown>>(),
outputsSize: integer('outputs_size').notNull().default(0),
outputsSpilled: boolean('outputs_spilled').notNull().default(false),
outputsStorageRef: text('outputs_storage_ref'), // Object storage path if spilled

// Metadata
startedAt: timestamp('started_at', { withTimezone: true }),
completedAt: timestamp('completed_at', { withTimezone: true }),
durationMs: integer('duration_ms'),
status: text('status').$type<'running' | 'completed' | 'failed' | 'skipped'>().notNull().default('running'),
errorMessage: text('error_message'),

createdAt: timestamp('created_at', { withTimezone: true }).defaultNow().notNull(),
updatedAt: timestamp('updated_at', { withTimezone: true }).defaultNow().notNull(),
},
(table) => ({
runNodeIndex: uniqueIndex('node_io_run_node_idx').on(table.runId, table.nodeRef),
runIndex: index('node_io_run_idx').on(table.runId),
workflowIndex: index('node_io_workflow_idx').on(table.workflowId),
}),
);

export type NodeIORecord = typeof nodeIOTable.$inferSelect;
export type NodeIOInsert = typeof nodeIOTable.$inferInsert;
3 changes: 3 additions & 0 deletions backend/src/node-io/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export * from './node-io.module';
export * from './node-io.service';
export * from './node-io.repository';
140 changes: 140 additions & 0 deletions backend/src/node-io/node-io-ingest.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import { Injectable, Logger, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { Consumer, Kafka } from 'kafkajs';

import { NodeIORepository } from './node-io.repository';

interface SerializedNodeIOEvent {
type: 'NODE_IO_START' | 'NODE_IO_COMPLETION';
runId: string;
nodeRef: string;
workflowId?: string;
organizationId?: string | null;
componentId?: string;
inputs?: Record<string, unknown>;
inputsSize?: number;
inputsSpilled?: boolean;
inputsStorageRef?: string | null;
outputs?: Record<string, unknown>;
outputsSize?: number;
outputsSpilled?: boolean;
outputsStorageRef?: string | null;
status?: 'completed' | 'failed' | 'skipped';
errorMessage?: string;
timestamp: string;
}

@Injectable()
export class NodeIOIngestService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(NodeIOIngestService.name);
private readonly kafkaBrokers: string[];
private readonly kafkaTopic: string;
private readonly kafkaGroupId: string;
private readonly kafkaClientId: string;
private consumer: Consumer | undefined;

constructor(private readonly nodeIORepository: NodeIORepository) {
const brokerEnv = process.env.LOG_KAFKA_BROKERS ?? '';
this.kafkaBrokers = brokerEnv
.split(',')
.map((broker) => broker.trim())
.filter(Boolean);
if (this.kafkaBrokers.length === 0) {
throw new Error('LOG_KAFKA_BROKERS must be configured for node I/O ingestion');
}

this.kafkaTopic = process.env.NODE_IO_KAFKA_TOPIC ?? 'telemetry.node-io';
this.kafkaGroupId = process.env.NODE_IO_KAFKA_GROUP_ID ?? 'shipsec-node-io-ingestor';
this.kafkaClientId = process.env.NODE_IO_KAFKA_CLIENT_ID ?? 'shipsec-backend-node-io';
}

async onModuleInit(): Promise<void> {
if (this.kafkaBrokers.length === 0) {
this.logger.warn('No Kafka brokers configured, skipping node I/O ingest service initialization');
return;
}

try {
const kafka = new Kafka({
clientId: this.kafkaClientId,
brokers: this.kafkaBrokers,
requestTimeout: 30000,
retry: {
retries: 5,
initialRetryTime: 100,
maxRetryTime: 30000,
},
});

this.consumer = kafka.consumer({
groupId: this.kafkaGroupId,
sessionTimeout: 30000,
heartbeatInterval: 3000,
});
await this.consumer.connect();
await this.consumer.subscribe({ topic: this.kafkaTopic, fromBeginning: true });
await this.consumer.run({
eachMessage: async ({ message, topic, partition }) => {
const messageOffset = message.offset;
if (!message.value) {
this.logger.warn(`Received empty message from ${topic}[${partition}]@${messageOffset}`);
return;
}
try {
const payload = JSON.parse(message.value.toString()) as SerializedNodeIOEvent;
this.logger.debug(
`Processing node I/O event: runId=${payload.runId}, nodeRef=${payload.nodeRef}, type=${payload.type}, offset=${messageOffset}`,
);
await this.persistEvent(payload);
} catch (error) {
this.logger.error(
`Failed to process node I/O event from Kafka (topic=${topic}, partition=${partition}, offset=${messageOffset})`,
error as Error,
);
}
},
});
this.logger.log(
`Kafka node I/O ingestion connected (${this.kafkaBrokers.join(', ')}) topic=${this.kafkaTopic}`,
);
} catch (error) {
this.logger.error('Failed to initialize Kafka node I/O ingestion', error as Error);
// Don't throw here to avoid crashing the whole backend if Kafka is just temporarily down
}
}

async onModuleDestroy(): Promise<void> {
if (this.consumer) {
await this.consumer.disconnect().catch((error) => {
this.logger.error('Failed to disconnect Kafka consumer', error as Error);
});
}
}

private async persistEvent(event: SerializedNodeIOEvent): Promise<void> {
if (event.type === 'NODE_IO_START') {
await this.nodeIORepository.recordStart({
runId: event.runId,
nodeRef: event.nodeRef,
workflowId: event.workflowId,
organizationId: event.organizationId,
componentId: event.componentId || 'unknown',
inputs: event.inputs || {},
inputsSize: event.inputsSize,
inputsSpilled: event.inputsSpilled,
inputsStorageRef: event.inputsStorageRef,
});
} else if (event.type === 'NODE_IO_COMPLETION') {
await this.nodeIORepository.recordCompletion({
runId: event.runId,
nodeRef: event.nodeRef,
componentId: event.componentId,
outputs: event.outputs || {},
status: event.status || 'completed',
errorMessage: event.errorMessage,
outputsSize: event.outputsSize,
outputsSpilled: event.outputsSpilled,
outputsStorageRef: event.outputsStorageRef,
});
}
}
}
19 changes: 19 additions & 0 deletions backend/src/node-io/node-io.module.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { Module } from '@nestjs/common';
import { NodeIORepository } from './node-io.repository';
import { NodeIOService } from './node-io.service';
import { NodeIOIngestService } from './node-io-ingest.service';
import { DatabaseModule } from '../database/database.module';
import { StorageModule } from '../storage/storage.module';

const ingestServicesEnabled =
(process.env.ENABLE_INGEST_SERVICES ?? 'true') === 'true' &&
process.env.SKIP_INGEST_SERVICES !== 'true';

const ingestServices = ingestServicesEnabled ? [NodeIOIngestService] : [];

@Module({
imports: [DatabaseModule, StorageModule],
providers: [NodeIORepository, NodeIOService, ...ingestServices],
exports: [NodeIOService, NodeIORepository],
})
export class NodeIOModule {}
Loading