Adds Pipeline error/warning log indicator to list page#2216
Adds Pipeline error/warning log indicator to list page#2216eblairmckee wants to merge 4 commits intomasterfrom
Conversation
There was a problem hiding this comment.
have an action item for myself to upstream this (as well as a few other components that are missing ! bangs for border colors)
| <Popover testId={testId}> | ||
| <PopoverTrigger asChild> | ||
| <Button variant="secondary-outline" size="sm" icon={<FilterIcon />}> | ||
| <Button variant="outline" size="sm" icon={<ChevronDown size={12} className="text-muted-foreground" />}> |
There was a problem hiding this comment.
I've been noodling on this for awhile, and even when we used a FilterIcon here, I would still forget that these were filters. Changes to a chevron/dropdown icon, and I really liked the way it looks in both the new Pipelines list table, as well as other tables that use it. Eventually I want to come up with a better pattern for filtered tables, but that is getting punted until phase 1 of this project is done.
There was a problem hiding this comment.
To me the most intuitive way is https://ui.bazza.dev/docs/data-table-filter
856bda0 to
b2ce591
Compare
| <span | ||
| className={cn( | ||
| 'absolute -top-1.5 -right-1.5 flex h-4 min-w-4 items-center justify-center rounded-full px-1', | ||
| variant === 'destructive-inverted' && 'bg-background-error-strong', | ||
| variant === 'warning-inverted' && 'bg-background-warning-strong' | ||
| )} | ||
| > | ||
| <Text className="text-inverse" variant="captionStrongMedium">{formatCount(count)}</Text> | ||
| </span> |
There was a problem hiding this comment.
eventually I want to upstream this into the ui-registry (Dot and a docs example of Badge with Dot)
| const PulsingStatusIcon = ({ variant }: PulsingStatusIconProps) => { | ||
| return ( | ||
| <div className="relative flex items-center justify-center size-3"> | ||
| {variant === 'disabled' ? null : <div className={cn(pulsingIconVariants({ variant }), 'size-3 opacity-75 animate-ping')} />} | ||
| <div className={cn(pulsingIconVariants({ variant }), 'absolute top-1/2 left-1/2 transform -translate-x-1/2 -translate-y-1/2 size-2.5')} /> | ||
| </div> | ||
| ); | ||
| }; | ||
|
|
||
| export const PipelineStatusBadge = ({ state }: { state?: Pipeline_State }) => { | ||
| const statusConfig = useMemo(() => { | ||
| switch (state) { | ||
| case Pipeline_State.RUNNING: | ||
| return { | ||
| icon: <PulsingStatusIcon variant="success" />, | ||
| text: 'Running', |
There was a problem hiding this comment.
also want to upstream this into design system as Status or StatusBadge
| }, | ||
| { | ||
| accessorKey: 'state', | ||
| header: 'Status', |
There was a problem hiding this comment.
Should it be State? or the filter be Status? I see in the screenshot that the filter label is State but the column name is Status
| /** | ||
| * Hook for streaming messages from a Kafka topic using Connect RPC server streaming. | ||
| * Modernized version of createMessageSearch | ||
| */ | ||
| export const useListMessagesStream = (options: ListMessagesStreamOptions): ListMessagesStreamResult => { | ||
| const [state, setState] = useState<ListMessagesStreamState>(createInitialState); | ||
| const abortControllerRef = useRef<AbortController | null>(null); | ||
| const prevOptionsKeyRef = useRef<string | null>(null); | ||
| const retryTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null); | ||
|
|
||
| const { | ||
| topic, | ||
| startOffset, | ||
| startTimestamp, | ||
| partitionId = -1, | ||
| maxResults = 500, | ||
| filterInterpreterCode = '', | ||
| troubleshoot = false, | ||
| includeOriginalRawPayload = false, | ||
| keyDeserializer, | ||
| valueDeserializer, | ||
| ignoreMaxSizeLimit = false, | ||
| enabled = true, | ||
| retryCount = 0, | ||
| retryDelay = 1000, | ||
| } = options; | ||
|
|
||
| // Create stable options key for change detection | ||
| const optionsKey = createOptionsKey(options); | ||
|
|
||
| const cancel = useCallback(() => { | ||
| // Clear any pending retry | ||
| if (retryTimeoutRef.current) { | ||
| clearTimeout(retryTimeoutRef.current); | ||
| retryTimeoutRef.current = null; | ||
| } | ||
| // Abort current stream | ||
| if (abortControllerRef.current) { | ||
| abortControllerRef.current.abort(); | ||
| abortControllerRef.current = null; | ||
| } | ||
| }, []); | ||
|
|
||
| const handleStreamError = useCallback( | ||
| (err: unknown, attempt: number, scheduleRetry: (nextAttempt: number) => void) => { | ||
| // Handle abort error silently | ||
| if (err instanceof Error && err.name === 'AbortError') { | ||
| setState((prev) => ({ ...prev, isStreaming: false, isComplete: true })); | ||
| return; | ||
| } | ||
|
|
||
| // Check if we should retry | ||
| if (attempt < retryCount) { | ||
| setState((prev) => ({ | ||
| ...prev, | ||
| phase: `Retrying (${attempt + 1}/${retryCount})...`, | ||
| retryAttempt: attempt + 1, | ||
| })); | ||
| scheduleRetry(attempt + 1); | ||
| return; | ||
| } | ||
|
|
||
| // No more retries, set error state | ||
| const message = err instanceof Error ? err.message : 'Unknown error occurred'; | ||
| setState((prev) => ({ ...prev, ...createErrorState(message) })); | ||
| }, | ||
| [retryCount] | ||
| ); | ||
|
|
||
| const startWithRetry = useCallback( | ||
| async (attempt = 0) => { | ||
| const consoleClient = config.consoleClient; | ||
| if (!consoleClient) { | ||
| setState((prev) => ({ ...prev, ...createErrorState('Console client not available') })); | ||
| return; | ||
| } | ||
|
|
||
| // Cancel any existing stream | ||
| if (abortControllerRef.current) { | ||
| abortControllerRef.current.abort(); | ||
| abortControllerRef.current = null; | ||
| } | ||
|
|
||
| // Reset state with retry attempt info | ||
| setState({ ...createInitialState(), isStreaming: true, retryAttempt: attempt }); | ||
|
|
||
| // Create abort controller for this stream | ||
| const abortController = new AbortController(); | ||
| abortControllerRef.current = abortController; | ||
|
|
||
| const request = create(ListMessagesRequestSchema, { | ||
| topic, | ||
| startOffset, | ||
| startTimestamp: startTimestamp ?? BigInt(0), | ||
| partitionId, | ||
| maxResults, | ||
| filterInterpreterCode, | ||
| troubleshoot, | ||
| includeOriginalRawPayload, | ||
| keyDeserializer, | ||
| valueDeserializer, | ||
| ignoreMaxSizeLimit, | ||
| }); | ||
|
|
||
| try { | ||
| const stream = consoleClient.listMessages(request, { signal: abortController.signal }); | ||
|
|
||
| for await (const response of stream) { | ||
| if (abortController.signal.aborted) { | ||
| break; | ||
| } | ||
| processResponse(response, setState); | ||
| } | ||
|
|
||
| setState((prev) => ({ ...prev, isStreaming: false, isComplete: true })); | ||
| } catch (err) { | ||
| handleStreamError(err, attempt, (nextAttempt) => { | ||
| retryTimeoutRef.current = setTimeout(() => startWithRetry(nextAttempt), retryDelay); | ||
| }); | ||
| } | ||
| }, | ||
| [ | ||
| topic, | ||
| startOffset, | ||
| startTimestamp, | ||
| partitionId, | ||
| maxResults, | ||
| filterInterpreterCode, | ||
| troubleshoot, | ||
| includeOriginalRawPayload, | ||
| keyDeserializer, | ||
| valueDeserializer, | ||
| ignoreMaxSizeLimit, | ||
| retryDelay, | ||
| handleStreamError, | ||
| ] | ||
| ); | ||
|
|
||
| const start = useCallback(() => { | ||
| cancel(); | ||
| startWithRetry(0); | ||
| }, [cancel, startWithRetry]); | ||
|
|
||
| const reset = useCallback(() => { | ||
| // Just call start() - it already handles canceling the previous stream | ||
| // and resetting state before starting a new one | ||
| start(); | ||
| }, [start]); | ||
|
|
||
| // Auto-start when enabled or when options change | ||
| useEffect(() => { | ||
| if (!enabled) { | ||
| // If disabled, cancel any running stream | ||
| cancel(); | ||
| prevOptionsKeyRef.current = null; | ||
| return; | ||
| } | ||
|
|
||
| // Check if options changed | ||
| const optionsChanged = prevOptionsKeyRef.current !== optionsKey; | ||
|
|
||
| if (optionsChanged) { | ||
| prevOptionsKeyRef.current = optionsKey; | ||
| start(); | ||
| } | ||
| }, [enabled, optionsKey, start, cancel]); | ||
|
|
||
| // Cleanup on unmount | ||
| useEffect(() => cancel, [cancel]); | ||
|
|
||
| return { | ||
| ...state, | ||
| start, | ||
| cancel, | ||
| reset, | ||
| }; | ||
| }; |
There was a problem hiding this comment.
Let's be careful with this. This needs QA effort to ensure it works because streaming with rpc requires special handling
Added pipeline log indicators to the pipelines list page. Note: the data layer implementation is a temporary solution until we have an Oxla integration that provides proper database queries for logs filtered by
pipelineId, log level, etc. and/or we get additional backend functionality from Cloud.Pipeline log indicators and related react-query calls are gated behind the
enableNewPipelineLogsfeature flag.Why This Is Temporary
Right now, we just have the Cloud API messages backend (same API we use for the Topics messages viewer). We're using this because all Connect pipelines on a cluster produce logs to the
__redpanda.connect.logswhere the key is the pipelineId and the value is the log. To get any meaningful value out of pipeline logs we need to do a lot of filtering, however the only filtering that exists on this API is a custom javascript function you can execute server-side. The rest has to happen at run-time.Ideally, we want to store logs in a database, so we can use Oxla to make complex queries like:
Current Implementation
Architecture
Log Scope Categorization
Logs are categorized by their
pathfield:root.input.*root.output.*This allows users to see at a glance whether issues are occurring in a pipeline's input connector, output connector, or elsewhere.
Files Changed
list.tsxpipeline-messages.tsxuseStreamingPipelineLogCountshookmessages.tsxuseListMessagesStreamhook for Kafka streamingpipeline-log-indicator.tsxlogs/constants.tsLOG_PATH_INPUT,LOG_PATH_OUTPUTconstantsThe "Noisy Pipeline" Problem
Critical Caveat: All pipeline logs are written to a single Kafka topic (
__redpanda.connect.logs). When streaming logs:maxResults = pipelineCount × 100messages (max on the backend is 500, but that's currently being increased by Julin)Example: If Pipeline A generates 1000 logs/minute and Pipeline B generates 10 logs/minute:
maxResults = 200, Pipeline A's logs may entirely fill the sampleAlternative Approaches Considered
1. Increase Sample Size (Current Direction)
Approach: Increase
maxResultsmultiplier from 100 to 500+ per pipeline.2. Parallel Queries Per Pipeline
Approach: Run one query per pipeline, fetching 100 most recent logs each.
3. Time-Windowed Sampling
Approach: Fetch logs from multiple time windows and merge.
4. Server-Side Aggregation (Ideal)
Approach: Backend endpoint that returns aggregated counts.
Smart Polling for Status Updates
The list view implements smart polling to keep pipeline states current:
How It Works
STARTINGorSTOPPING)SHORT_POLLING_INTERVAL)This ensures users see timely state transitions without unnecessary polling when all pipelines are stable.
Transitional States
The actions dropdown also shows "Retry start" / "Retry stop" options for pipelines stuck in transitional states.