From bcf0a07471c4c7a0aa10992dfd8fec7ff2f3eec8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20Qu=C3=A8ze?= Date: Fri, 20 Mar 2026 12:48:21 +0100 Subject: [PATCH] Bug 2024449 - Add importer for streamed profile format (JSON Lines) Add support for importing the "streamed profile" format, a JSON Lines format where each line is a JSON object: a meta line, a thread declaration line, then marker lines. This format is produced by tools like resourcemonitor.py for streaming resource-usage profiles. The importer passes through meta and thread objects as-is and respects the input's preprocessedProfileVersion, so the standard profile upgraders handle all format migrations. stringArray placement is version-aware: per-thread for versions < 56, in profile.shared for versions >= 56. Also adds early detection in _extractJsonFromArrayBuffer so that JSON Lines content reaches the string-based format detection instead of failing JSON.parse. --- src/actions/receive-profile.ts | 15 +- src/profile-logic/import/streamed-profile.ts | 154 +++++++++ src/profile-logic/process-profile.ts | 6 + src/test/unit/profile-conversion.test.ts | 326 +++++++++++++++++++ 4 files changed, 499 insertions(+), 2 deletions(-) create mode 100644 src/profile-logic/import/streamed-profile.ts diff --git a/src/actions/receive-profile.ts b/src/actions/receive-profile.ts index 15a7903dea..41b62174ce 100644 --- a/src/actions/receive-profile.ts +++ b/src/actions/receive-profile.ts @@ -1174,7 +1174,9 @@ async function _extractZipFromResponse( } /** - * Parse JSON from an optionally gzipped array buffer. + * Decode an optionally gzipped array buffer into a profile-shaped value. + * Returns parsed JSON for normal profiles, or the raw text string for + * streamed profiles (JSON Lines) which are not valid single-object JSON. */ async function _extractJsonFromArrayBuffer( arrayBuffer: ArrayBuffer @@ -1186,7 +1188,16 @@ async function _extractJsonFromArrayBuffer( } const textDecoder = new TextDecoder(); - return JSON.parse(textDecoder.decode(profileBytes)); + const text = textDecoder.decode(profileBytes); + + // Streamed profiles (JSON Lines) start with {"type":"meta" and are not + // valid single-object JSON. Return the text directly so that the string + // format detection in unserializeProfileOfArbitraryFormat can handle it. + if (text.startsWith('{"type":"meta"')) { + return text; + } + + return JSON.parse(text); } /** diff --git a/src/profile-logic/import/streamed-profile.ts b/src/profile-logic/import/streamed-profile.ts new file mode 100644 index 0000000000..e0e3c26a9d --- /dev/null +++ b/src/profile-logic/import/streamed-profile.ts @@ -0,0 +1,154 @@ +/* This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ + +/** + * Importer for the "streamed profile" format (JSON Lines / .jsonl). + * + * This format is produced by tools like resourcemonitor.py and streams one + * JSON object per line: + * + * Line 1: {"type":"meta", ...} — profile metadata (markerSchema, categories, etc.) + * Line 2: {"type":"thread", ...} — thread declaration and structure + * Line 3+: {"type":"marker", ...} — one marker per line + * + * Threads must be declared before any markers that belong to them. The + * importer passes through meta and thread objects from the input, only + * adding the parsed marker columnar arrays and the stringArray populated + * from marker names. The producing tool is responsible for emitting a + * structure that matches its declared preprocessedProfileVersion, including + * any required tables (stackTable, frameTable, etc.). The standard profile + * upgraders then migrate the result to the current version. + * + * ## Future extensibility (comments only — nothing is implemented yet): + * + * - In the future there may be one JSON Lines file streamed *per process*, + * not just one global file. The importer would then need to merge multiple + * files or accept a list of streams. + * + * - "type": "counter" and "type": "sample" lines are expected to be added + * when streaming profiles that contain more than just markers (e.g. CPU + * sampling data, performance counters). + * + * - A `tid` (thread ID) attribute is expected to be included in each line + * in the future to route markers to different threads. When `tid` is + * absent, the marker belongs to the first declared thread. + * + * - The current resource-usage profiles are a simple case: single process, + * single thread. But the format is designed to support multi-process, + * multi-thread profiles in the future. + */ + +import type { MarkerPhase } from 'firefox-profiler/types/gecko-profile'; +import { INSTANT, INTERVAL } from 'firefox-profiler/app-logic/constants'; +import { StringTable } from 'firefox-profiler/utils/string-table'; + +/** + * Detect whether the input string is a streamed profile in JSON Lines format. + * The first line always starts with {"type":"meta" (the "type" key is + * guaranteed to be the first key), so we can detect the format by checking + * for this prefix without parsing the entire line. + */ +export function isStreamedProfileFormat(profile: string): boolean { + return profile.startsWith('{"type":"meta"'); +} + +/** + * Convert a streamed profile (JSON Lines) string into a profile object + * that the standard profile upgraders can process. The meta and thread + * objects are passed through from the input; the importer only builds + * the marker columnar arrays and the stringArray. + */ +export function convertStreamedProfile(profileText: string): any { + const lines = profileText.split('\n').filter((line) => line.trim() !== ''); + + if (lines.length === 0) { + throw new Error('Streamed profile is empty.'); + } + + // --- Parse meta line --- + const metaObj = JSON.parse(lines[0]); + if (metaObj.type !== 'meta') { + throw new Error('First line of streamed profile must be a "meta" object.'); + } + + const { type: _metaType, ...meta } = metaObj; + + // --- Parse remaining lines --- + // Threads must be declared (via type=thread lines) before markers can + // reference them. Currently there is only one thread per file; in the + // future, markers will use a `tid` field to target a specific thread. + const version = meta.preprocessedProfileVersion ?? 0; + + // Marker names in the streamed format are human-readable strings. The + // importer interns them into a stringArray with numeric indices, as + // expected by the processed profile format. Before version 56 the + // stringArray lives on each thread; from version 56 onward it is shared + // across all threads in profile.shared.stringArray. + const useSharedStringArray = version >= 56; + const stringArray: string[] = []; + const stringTable = StringTable.withBackingArray(stringArray); + + let thread: Record | null = null; + + for (let i = 1; i < lines.length; i++) { + const lineObj = JSON.parse(lines[i]); + + switch (lineObj.type) { + case 'thread': { + const { type: _type, ...threadObj } = lineObj; + if (!useSharedStringArray) { + threadObj.stringArray = stringArray; + } + threadObj.markers = { + name: [] as number[], + startTime: [] as Array, + endTime: [] as Array, + phase: [] as MarkerPhase[], + category: [] as number[], + data: [] as Array, + length: 0, + }; + thread = threadObj; + break; + } + case 'marker': { + if (thread === null) { + throw new Error( + 'Streamed profile contains a marker before any thread declaration.' + ); + } + // Future: use lineObj.tid to look up the target thread. + const { markers } = thread; + markers.name.push(stringTable.indexForString(lineObj.name)); + markers.startTime.push(lineObj.startTime ?? null); + const endTime: number | null = lineObj.endTime ?? null; + markers.endTime.push(endTime); + markers.phase.push(endTime === null ? INSTANT : INTERVAL); + markers.category.push(lineObj.category ?? 0); + markers.data.push(lineObj.data ?? null); + markers.length++; + break; + } + default: + // Future: handle "counter", "sample", and other line types here. + break; + } + } + + if (thread === null) { + throw new Error('Streamed profile contains no thread declaration.'); + } + + const profile: any = { + meta, + libs: [], + threads: [thread], + }; + + if (useSharedStringArray) { + profile.shared = { stringArray }; + } + + return profile; +} diff --git a/src/profile-logic/process-profile.ts b/src/profile-logic/process-profile.ts index fbf4c99cb5..886498eb30 100644 --- a/src/profile-logic/process-profile.ts +++ b/src/profile-logic/process-profile.ts @@ -25,6 +25,10 @@ import { convertFlameGraphProfile, } from './import/flame-graph'; import { isArtTraceFormat, convertArtTraceProfile } from './import/art-trace'; +import { + isStreamedProfileFormat, + convertStreamedProfile, +} from './import/streamed-profile'; import { PROCESSED_PROFILE_VERSION, INTERVAL, @@ -1993,6 +1997,8 @@ export async function unserializeProfileOfArbitraryFormat( arbitraryFormat = convertPerfScriptProfile(arbitraryFormat); } else if (isFlameGraphFormat(arbitraryFormat)) { arbitraryFormat = convertFlameGraphProfile(arbitraryFormat); + } else if (isStreamedProfileFormat(arbitraryFormat)) { + arbitraryFormat = convertStreamedProfile(arbitraryFormat); } else { // Try parsing as JSON. arbitraryFormat = JSON.parse(arbitraryFormat); diff --git a/src/test/unit/profile-conversion.test.ts b/src/test/unit/profile-conversion.test.ts index 02dcd33642..96fbbfcc2f 100644 --- a/src/test/unit/profile-conversion.test.ts +++ b/src/test/unit/profile-conversion.test.ts @@ -4,6 +4,10 @@ import { unserializeProfileOfArbitraryFormat } from '../../profile-logic/process-profile'; import { isPerfScriptFormat } from '../../profile-logic/import/linux-perf'; import { isFlameGraphFormat } from '../../profile-logic/import/flame-graph'; +import { + isStreamedProfileFormat, + convertStreamedProfile, +} from '../../profile-logic/import/streamed-profile'; import { GECKO_PROFILE_VERSION } from '../../app-logic/constants'; import { storeWithProfile } from '../fixtures/stores'; @@ -581,3 +585,325 @@ describe('converting flamegraph profile', function () { expect(profile).toMatchSnapshot(); }); }); + +describe('converting streamed profile', function () { + const metaLine = JSON.stringify({ + type: 'meta', + product: 'mach', + interval: 500, + startTime: 1000000, + logicalCPUs: 4, + physicalCPUs: 2, + version: 27, + preprocessedProfileVersion: 47, + categories: [{ name: 'Other', color: 'grey', subcategories: ['Other'] }], + markerSchema: [ + { + name: 'Text', + tooltipLabel: '{marker.name}', + display: ['marker-chart', 'marker-table'], + data: [{ key: 'text', label: 'Description', format: 'string' }], + }, + ], + }); + + const threadLine = '{"type":"thread"}'; + + it('should detect streamed profile format', function () { + const input = + metaLine + + '\n' + + '{"type":"marker","name":"test","startTime":0,"endTime":null,"data":null}'; + expect(isStreamedProfileFormat(input)).toBe(true); + }); + + it('should not detect object without type:meta first key', function () { + const input = JSON.stringify({ markerSchema: [], categories: [] }); + expect(isStreamedProfileFormat(input)).toBe(false); + }); + + it('should not detect regular JSON object', function () { + expect(isStreamedProfileFormat('{"meta": {"product": "Firefox"}}')).toBe( + false + ); + }); + + it('should not detect non-JSON', function () { + expect(isStreamedProfileFormat('hello world')).toBe(false); + }); + + it('should not detect empty string', function () { + expect(isStreamedProfileFormat('')).toBe(false); + }); + + it('should apply thread configuration from type=thread line', function () { + const lines = [ + metaLine, + JSON.stringify({ + type: 'thread', + name: '', + processName: 'mach', + isMainThread: false, + showMarkersInTimeline: true, + pid: '0', + tid: 0, + }), + JSON.stringify({ + type: 'marker', + name: 'test', + startTime: 0, + endTime: 100, + data: null, + }), + ]; + const profile = convertStreamedProfile(lines.join('\n')); + const thread = profile.threads[0]; + + expect(thread.name).toBe(''); + expect(thread.processName).toBe('mach'); + expect(thread.isMainThread).toBe(false); + expect(thread.showMarkersInTimeline).toBe(true); + expect(thread.pid).toBe('0'); + expect(thread.tid).toBe(0); + expect(thread.markers.length).toBe(1); + }); + + it('should error if no thread is declared', function () { + const lines = [metaLine]; + expect(() => convertStreamedProfile(lines.join('\n'))).toThrow( + 'no thread declaration' + ); + }); + + it('should error if a marker appears before any thread', function () { + const lines = [ + metaLine, + JSON.stringify({ + type: 'marker', + name: 'test', + startTime: 0, + endTime: null, + data: null, + }), + ]; + expect(() => convertStreamedProfile(lines.join('\n'))).toThrow( + 'before any thread declaration' + ); + }); + + it('should convert a simple streamed profile', function () { + const lines = [ + metaLine, + threadLine, + JSON.stringify({ + type: 'marker', + name: 'test_start', + startTime: 100, + endTime: null, + data: { type: 'Text', text: 'Starting test' }, + }), + JSON.stringify({ + type: 'marker', + name: 'test', + startTime: 100, + endTime: 200, + data: { + type: 'Test', + test: 'test.js', + name: 'test.js', + status: 'PASS', + color: 'green', + }, + }), + ]; + const profile = convertStreamedProfile(lines.join('\n')); + + expect(profile.meta.product).toBe('mach'); + expect(profile.meta.interval).toBe(500); + expect(profile.meta.startTime).toBe(1000000); + expect(profile.threads).toHaveLength(1); + + const thread = profile.threads[0]; + expect(thread.markers.length).toBe(2); + + // First marker: instant (endTime is null) + expect(thread.markers.startTime[0]).toBe(100); + expect(thread.markers.endTime[0]).toBe(null); + expect(thread.markers.phase[0]).toBe(0); // INSTANT + + // Second marker: interval + expect(thread.markers.startTime[1]).toBe(100); + expect(thread.markers.endTime[1]).toBe(200); + expect(thread.markers.phase[1]).toBe(1); // INTERVAL + + // All markers have category 0 + expect(thread.markers.category[0]).toBe(0); + expect(thread.markers.category[1]).toBe(0); + + // Marker data is preserved + expect(thread.markers.data[0]).toEqual({ + type: 'Text', + text: 'Starting test', + }); + }); + + it('should import via unserializeProfileOfArbitraryFormat', async function () { + // The full thread structure required at version 47 so that upgraders + // can process it. The producing tool emits this at the version it declares. + const fullThreadLine = JSON.stringify({ + type: 'thread', + name: '', + processType: 'default', + processStartupTime: 0, + processShutdownTime: null, + registerTime: 0, + unregisterTime: null, + pausedRanges: [], + isMainThread: false, + pid: '0', + tid: 0, + samples: { + weightType: 'samples', + weight: null, + eventDelay: [], + stack: [], + time: [], + length: 0, + }, + stackTable: { frame: [], prefix: [], length: 0 }, + frameTable: { + address: [], + inlineDepth: [], + category: [], + subcategory: [], + func: [], + nativeSymbol: [], + innerWindowID: [], + line: [], + column: [], + length: 0, + }, + funcTable: { + isJS: [], + relevantForJS: [], + name: [], + resource: [], + fileName: [], + lineNumber: [], + columnNumber: [], + length: 0, + }, + resourceTable: { lib: [], name: [], host: [], type: [], length: 0 }, + nativeSymbols: { + libIndex: [], + address: [], + name: [], + functionSize: [], + length: 0, + }, + }); + const lines = [ + metaLine, + fullThreadLine, + JSON.stringify({ + type: 'marker', + name: 'CPU Use', + startTime: 0, + endTime: 500, + data: { type: 'CPU', cpuPercent: '4.2%' }, + }), + ]; + const profile = await unserializeProfileOfArbitraryFormat(lines.join('\n')); + + expect(profile.threads).toHaveLength(1); + expect(profile.threads[0].markers.length).toBe(1); + }); + + it('should skip unknown line types', function () { + const lines = [ + metaLine, + threadLine, + JSON.stringify({ type: 'unknown_future_type', data: {} }), + JSON.stringify({ + type: 'marker', + name: 'test', + startTime: 0, + endTime: 100, + data: null, + }), + ]; + const profile = convertStreamedProfile(lines.join('\n')); + expect(profile.threads[0].markers.length).toBe(1); + }); + + it('should use per-marker category when provided', function () { + const lines = [ + metaLine, + threadLine, + JSON.stringify({ + type: 'marker', + name: 'Phase', + startTime: 0, + endTime: 100, + data: null, + category: 2, + }), + JSON.stringify({ + type: 'marker', + name: 'test', + startTime: 0, + endTime: null, + data: null, + }), + JSON.stringify({ + type: 'marker', + name: 'Phase', + startTime: 100, + endTime: 200, + data: null, + category: 1, + }), + ]; + const profile = convertStreamedProfile(lines.join('\n')); + const { markers } = profile.threads[0]; + + expect(markers.category[0]).toBe(2); + expect(markers.category[1]).toBe(0); // default when absent + expect(markers.category[2]).toBe(1); + }); + + it('should deduplicate marker name strings', function () { + const lines = [ + metaLine, + threadLine, + JSON.stringify({ + type: 'marker', + name: 'CPU Use', + startTime: 0, + endTime: 500, + data: null, + }), + JSON.stringify({ + type: 'marker', + name: 'CPU Use', + startTime: 500, + endTime: 1000, + data: null, + }), + JSON.stringify({ + type: 'marker', + name: 'Memory', + startTime: 0, + endTime: 500, + data: null, + }), + ]; + const profile = convertStreamedProfile(lines.join('\n')); + const { markers } = profile.threads[0]; + + // Both "CPU Use" markers should share the same name index + expect(markers.name[0]).toBe(markers.name[1]); + // "Memory" should have a different index + expect(markers.name[2]).not.toBe(markers.name[0]); + }); +});