Skip to content

Adds Pipeline error/warning log indicator to list page#2216

Draft
eblairmckee wants to merge 4 commits intomasterfrom
feat/pipeline-list-log-indicators
Draft

Adds Pipeline error/warning log indicator to list page#2216
eblairmckee wants to merge 4 commits intomasterfrom
feat/pipeline-list-log-indicators

Conversation

@eblairmckee
Copy link
Contributor

@eblairmckee eblairmckee commented Feb 4, 2026

Added pipeline log indicators to the pipelines list page. Note: the data layer implementation is a temporary solution until we have an Oxla integration that provides proper database queries for logs filtered by pipelineId, log level, etc. and/or we get additional backend functionality from Cloud.

Pipeline log indicators and related react-query calls are gated behind the enableNewPipelineLogs feature flag.

Screenshot 2026-02-04 at 3 09 41 PM Screenshot 2026-02-04 at 3 10 40 PM Screenshot 2026-02-04 at 3 12 15 PM

Why This Is Temporary

Right now, we just have the Cloud API messages backend (same API we use for the Topics messages viewer). We're using this because all Connect pipelines on a cluster produce logs to the __redpanda.connect.logs where the key is the pipelineId and the value is the log. To get any meaningful value out of pipeline logs we need to do a lot of filtering, however the only filtering that exists on this API is a custom javascript function you can execute server-side. The rest has to happen at run-time.

Ideally, we want to store logs in a database, so we can use Oxla to make complex queries like:

SELECT * FROM logs WHERE pipeline_id = ? AND level IN ('WARN', 'ERROR')

Current Implementation

Architecture

┌─────────────────────────────────────────────────────────────────────┐
│ __redpanda.connect.logs (Kafka Topic)                               │
│ All pipeline logs written here with pipeline_id as message key      │
└───────────────────────────────┬─────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────────┐
│ useStreamingPipelineLogCounts Hook                                  │
│ - Streams from topic with server side JS filter                     │
│ - Fetches last N hours of logs                                      │
│ - maxResults = pipelineIds.length × 100                             │
└───────────────────────────────┬─────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────────┐
│ aggregateMessages                                                   │
│ - Filters for WARN/ERROR levels (client-side)                       │
│ - Categorizes by scope: input | output | root                       │
│ - Groups counts by pipeline_id                                      │
└───────────────────────────────┬─────────────────────────────────────┘
                                │
                                ▼
┌─────────────────────────────────────────────────────────────────────┐
│ PipelineLogIndicator Component                                      │
│ - Displays error/warning badges with counts                         │
│ - Shown in Name, Input, and Output columns                          │
└─────────────────────────────────────────────────────────────────────┘

Log Scope Categorization

Logs are categorized by their path field:

Path Pattern Scope Column
root.input.* input Input
root.output.* output Output
Everything else root Pipeline Name

This allows users to see at a glance whether issues are occurring in a pipeline's input connector, output connector, or elsewhere.

Files Changed

File Purpose
list.tsx Main list component, enriches pipelines with log counts
pipeline-messages.tsx useStreamingPipelineLogCounts hook
messages.tsx useListMessagesStream hook for Kafka streaming
pipeline-log-indicator.tsx Badge component with error/warning counts
logs/constants.ts LOG_PATH_INPUT, LOG_PATH_OUTPUT constants

The "Noisy Pipeline" Problem

Critical Caveat: All pipeline logs are written to a single Kafka topic (__redpanda.connect.logs). When streaming logs:

  1. We request maxResults = pipelineCount × 100 messages (max on the backend is 500, but that's currently being increased by Julin)
  2. Messages are returned in order (newest first or by partition)
  3. A single pipeline generating many logs can dominate the sample

Example: If Pipeline A generates 1000 logs/minute and Pipeline B generates 10 logs/minute:

  • With maxResults = 200, Pipeline A's logs may entirely fill the sample
  • Pipeline B's error/warning indicators may show as 0 even if issues exist

Alternative Approaches Considered

1. Increase Sample Size (Current Direction)

Approach: Increase maxResults multiplier from 100 to 500+ per pipeline.

Pros Cons
Simple change More data transferred
Better representation Still susceptible to noisy pipelines
Memory pressure on client

2. Parallel Queries Per Pipeline

Approach: Run one query per pipeline, fetching 100 most recent logs each.

// Pseudocode
const results = await Promise.all(
  pipelineIds.map((id) => fetchLogs({ pipelineId: id, maxResults: 100 }))
);
Pros Cons
Each pipeline gets fair representation N queries instead of 1
No noisy pipeline problem Not performant for 20+ pipelines
Server load concerns

3. Time-Windowed Sampling

Approach: Fetch logs from multiple time windows and merge.

Pros Cons
Better distribution over time Increased complexity
Multiple queries

4. Server-Side Aggregation (Ideal)

Approach: Backend endpoint that returns aggregated counts.

GET / api / pipelines / log - counts;
// Returns: { pipelineId: { errors: N, warnings: N } }
Pros Cons
Minimal data transfer Requires backend work
Accurate counts Still limited without Oxla
Scalable

Smart Polling for Status Updates

The list view implements smart polling to keep pipeline states current:

useListPipelinesQuery(undefined, {
  enableSmartPolling: true,
});

How It Works

  1. After data fetch, check if any pipeline is in a transitional state (STARTING or STOPPING)
  2. If yes: poll every 2 seconds (SHORT_POLLING_INTERVAL)
  3. If no: disable polling (use React Query's default cache behavior)

This ensures users see timely state transitions without unnecessary polling when all pipelines are stable.

Transitional States

// pipeline/constants.ts
export const TRANSITIONAL_STATES = [
  Pipeline_State.STARTING,
  Pipeline_State.STOPPING,
] as const;

The actions dropdown also shows "Retry start" / "Retry stop" options for pipelines stuck in transitional states.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have an action item for myself to upstream this (as well as a few other components that are missing ! bangs for border colors)

<Popover testId={testId}>
<PopoverTrigger asChild>
<Button variant="secondary-outline" size="sm" icon={<FilterIcon />}>
<Button variant="outline" size="sm" icon={<ChevronDown size={12} className="text-muted-foreground" />}>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been noodling on this for awhile, and even when we used a FilterIcon here, I would still forget that these were filters. Changes to a chevron/dropdown icon, and I really liked the way it looks in both the new Pipelines list table, as well as other tables that use it. Eventually I want to come up with a better pattern for filtered tables, but that is getting punted until phase 1 of this project is done.

Screenshot 2026-02-04 at 3 19 56 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me the most intuitive way is https://ui.bazza.dev/docs/data-table-filter

@eblairmckee eblairmckee force-pushed the feat/pipeline-list-log-indicators branch from 856bda0 to b2ce591 Compare February 4, 2026 23:30
Comment on lines +44 to +52
<span
className={cn(
'absolute -top-1.5 -right-1.5 flex h-4 min-w-4 items-center justify-center rounded-full px-1',
variant === 'destructive-inverted' && 'bg-background-error-strong',
variant === 'warning-inverted' && 'bg-background-warning-strong'
)}
>
<Text className="text-inverse" variant="captionStrongMedium">{formatCount(count)}</Text>
</span>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eventually I want to upstream this into the ui-registry (Dot and a docs example of Badge with Dot)

Comment on lines +21 to +36
const PulsingStatusIcon = ({ variant }: PulsingStatusIconProps) => {
return (
<div className="relative flex items-center justify-center size-3">
{variant === 'disabled' ? null : <div className={cn(pulsingIconVariants({ variant }), 'size-3 opacity-75 animate-ping')} />}
<div className={cn(pulsingIconVariants({ variant }), 'absolute top-1/2 left-1/2 transform -translate-x-1/2 -translate-y-1/2 size-2.5')} />
</div>
);
};

export const PipelineStatusBadge = ({ state }: { state?: Pipeline_State }) => {
const statusConfig = useMemo(() => {
switch (state) {
case Pipeline_State.RUNNING:
return {
icon: <PulsingStatusIcon variant="success" />,
text: 'Running',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also want to upstream this into design system as Status or StatusBadge

},
{
accessorKey: 'state',
header: 'Status',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be State? or the filter be Status? I see in the screenshot that the filter label is State but the column name is Status

Comment on lines +133 to +309
/**
* Hook for streaming messages from a Kafka topic using Connect RPC server streaming.
* Modernized version of createMessageSearch
*/
export const useListMessagesStream = (options: ListMessagesStreamOptions): ListMessagesStreamResult => {
const [state, setState] = useState<ListMessagesStreamState>(createInitialState);
const abortControllerRef = useRef<AbortController | null>(null);
const prevOptionsKeyRef = useRef<string | null>(null);
const retryTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);

const {
topic,
startOffset,
startTimestamp,
partitionId = -1,
maxResults = 500,
filterInterpreterCode = '',
troubleshoot = false,
includeOriginalRawPayload = false,
keyDeserializer,
valueDeserializer,
ignoreMaxSizeLimit = false,
enabled = true,
retryCount = 0,
retryDelay = 1000,
} = options;

// Create stable options key for change detection
const optionsKey = createOptionsKey(options);

const cancel = useCallback(() => {
// Clear any pending retry
if (retryTimeoutRef.current) {
clearTimeout(retryTimeoutRef.current);
retryTimeoutRef.current = null;
}
// Abort current stream
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}
}, []);

const handleStreamError = useCallback(
(err: unknown, attempt: number, scheduleRetry: (nextAttempt: number) => void) => {
// Handle abort error silently
if (err instanceof Error && err.name === 'AbortError') {
setState((prev) => ({ ...prev, isStreaming: false, isComplete: true }));
return;
}

// Check if we should retry
if (attempt < retryCount) {
setState((prev) => ({
...prev,
phase: `Retrying (${attempt + 1}/${retryCount})...`,
retryAttempt: attempt + 1,
}));
scheduleRetry(attempt + 1);
return;
}

// No more retries, set error state
const message = err instanceof Error ? err.message : 'Unknown error occurred';
setState((prev) => ({ ...prev, ...createErrorState(message) }));
},
[retryCount]
);

const startWithRetry = useCallback(
async (attempt = 0) => {
const consoleClient = config.consoleClient;
if (!consoleClient) {
setState((prev) => ({ ...prev, ...createErrorState('Console client not available') }));
return;
}

// Cancel any existing stream
if (abortControllerRef.current) {
abortControllerRef.current.abort();
abortControllerRef.current = null;
}

// Reset state with retry attempt info
setState({ ...createInitialState(), isStreaming: true, retryAttempt: attempt });

// Create abort controller for this stream
const abortController = new AbortController();
abortControllerRef.current = abortController;

const request = create(ListMessagesRequestSchema, {
topic,
startOffset,
startTimestamp: startTimestamp ?? BigInt(0),
partitionId,
maxResults,
filterInterpreterCode,
troubleshoot,
includeOriginalRawPayload,
keyDeserializer,
valueDeserializer,
ignoreMaxSizeLimit,
});

try {
const stream = consoleClient.listMessages(request, { signal: abortController.signal });

for await (const response of stream) {
if (abortController.signal.aborted) {
break;
}
processResponse(response, setState);
}

setState((prev) => ({ ...prev, isStreaming: false, isComplete: true }));
} catch (err) {
handleStreamError(err, attempt, (nextAttempt) => {
retryTimeoutRef.current = setTimeout(() => startWithRetry(nextAttempt), retryDelay);
});
}
},
[
topic,
startOffset,
startTimestamp,
partitionId,
maxResults,
filterInterpreterCode,
troubleshoot,
includeOriginalRawPayload,
keyDeserializer,
valueDeserializer,
ignoreMaxSizeLimit,
retryDelay,
handleStreamError,
]
);

const start = useCallback(() => {
cancel();
startWithRetry(0);
}, [cancel, startWithRetry]);

const reset = useCallback(() => {
// Just call start() - it already handles canceling the previous stream
// and resetting state before starting a new one
start();
}, [start]);

// Auto-start when enabled or when options change
useEffect(() => {
if (!enabled) {
// If disabled, cancel any running stream
cancel();
prevOptionsKeyRef.current = null;
return;
}

// Check if options changed
const optionsChanged = prevOptionsKeyRef.current !== optionsKey;

if (optionsChanged) {
prevOptionsKeyRef.current = optionsKey;
start();
}
}, [enabled, optionsKey, start, cancel]);

// Cleanup on unmount
useEffect(() => cancel, [cancel]);

return {
...state,
start,
cancel,
reset,
};
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be careful with this. This needs QA effort to ensure it works because streaming with rpc requires special handling

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants