Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ build/
lib/
/*.tgz
test/yarn.lock
test/package.json
23 changes: 16 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ registerThread();
Watchdog thread:

```ts
const { captureStackTrace } = require("@sentry-internal/node-native-stacktrace");
const {
captureStackTrace,
} = require("@sentry-internal/node-native-stacktrace");

const stacks = captureStackTrace();
console.log(stacks);
Expand Down Expand Up @@ -87,15 +89,20 @@ In the main or worker threads if you call `registerThread()` regularly, times
are recorded.

```ts
const { registerThread } = require("@sentry-internal/node-native-stacktrace");
const {
registerThread,
threadPoll,
} = require("@sentry-internal/node-native-stacktrace");

registerThread();

setInterval(() => {
registerThread();
threadPoll({ optional_state: "some_value" });
}, 200);
```

In the watchdog thread you can call `getThreadsLastSeen()` to get how long it's
been in milliseconds since each thread registered.
been in milliseconds since each thread polled.

If any thread has exceeded a threshold, you can call `captureStackTrace()` to
get the stack traces for all threads.
Expand All @@ -111,11 +118,13 @@ const THRESHOLD = 1000; // 1 second
setInterval(() => {
for (const [thread, time] in Object.entries(getThreadsLastSeen())) {
if (time > THRESHOLD) {
const stacks = captureStackTrace();
const blockedThread = stacks[thread];
const threads = captureStackTrace();
const blockedThread = threads[thread];
const { frames, state } = blockedThread;
console.log(
`Thread '${thread}' blocked more than ${THRESHOLD}ms`,
blockedThread,
frames,
state,
);
}
}
Expand Down
122 changes: 99 additions & 23 deletions module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ struct ThreadInfo {
std::string thread_name;
// Last time this thread was seen in milliseconds since epoch
milliseconds last_seen;
// Some JSON serialized state for the thread
std::string state;
};

static std::mutex threads_mutex;
Expand All @@ -32,6 +34,12 @@ struct JsStackFrame {
// Type alias for a vector of JsStackFrame
using JsStackTrace = std::vector<JsStackFrame>;

struct ThreadResult {
std::string thread_name;
std::string state;
JsStackTrace stack_frames;
};

// Function to be called when an isolate's execution is interrupted
static void ExecutionInterrupted(Isolate *isolate, void *data) {
auto promise = static_cast<std::promise<JsStackTrace> *>(data);
Expand Down Expand Up @@ -91,7 +99,6 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
auto capture_from_isolate = args.GetIsolate();
auto current_context = capture_from_isolate->GetCurrentContext();

using ThreadResult = std::tuple<std::string, JsStackTrace>;
std::vector<std::future<ThreadResult>> futures;

{
Expand All @@ -100,35 +107,38 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
if (thread_isolate == capture_from_isolate)
continue;
auto thread_name = thread_info.thread_name;
auto state = thread_info.state;

futures.emplace_back(std::async(
std::launch::async,
[thread_name](Isolate *isolate) -> ThreadResult {
return std::make_tuple(thread_name, CaptureStackTrace(isolate));
[thread_name, state](Isolate *isolate) -> ThreadResult {
return ThreadResult{thread_name, state, CaptureStackTrace(isolate)};
},
thread_isolate));
}
}

Local<Object> result = Object::New(capture_from_isolate);
Local<Object> output = Object::New(capture_from_isolate);

for (auto &future : futures) {
auto [thread_name, frames] = future.get();
auto key = String::NewFromUtf8(capture_from_isolate, thread_name.c_str(),
NewStringType::kNormal)
.ToLocalChecked();

Local<Array> jsFrames = Array::New(capture_from_isolate, frames.size());
for (size_t i = 0; i < frames.size(); ++i) {
const auto &f = frames[i];
auto result = future.get();
auto key =
String::NewFromUtf8(capture_from_isolate, result.thread_name.c_str(),
NewStringType::kNormal)
.ToLocalChecked();

Local<Array> jsFrames =
Array::New(capture_from_isolate, result.stack_frames.size());
for (size_t i = 0; i < result.stack_frames.size(); ++i) {
const auto &frame = result.stack_frames[i];
Local<Object> frameObj = Object::New(capture_from_isolate);
frameObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "function",
NewStringType::kInternalized)
.ToLocalChecked(),
String::NewFromUtf8(capture_from_isolate,
f.function_name.c_str(),
frame.function_name.c_str(),
NewStringType::kNormal)
.ToLocalChecked())
.Check();
Expand All @@ -137,7 +147,8 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
String::NewFromUtf8(capture_from_isolate, "filename",
NewStringType::kInternalized)
.ToLocalChecked(),
String::NewFromUtf8(capture_from_isolate, f.filename.c_str(),
String::NewFromUtf8(capture_from_isolate,
frame.filename.c_str(),
NewStringType::kNormal)
.ToLocalChecked())
.Check();
Expand All @@ -146,23 +157,52 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
String::NewFromUtf8(capture_from_isolate, "lineno",
NewStringType::kInternalized)
.ToLocalChecked(),
Integer::New(capture_from_isolate, f.lineno))
Integer::New(capture_from_isolate, frame.lineno))
.Check();
frameObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "colno",
NewStringType::kInternalized)
.ToLocalChecked(),
Integer::New(capture_from_isolate, f.colno))
Integer::New(capture_from_isolate, frame.colno))
.Check();
jsFrames->Set(current_context, static_cast<uint32_t>(i), frameObj)
.Check();
}

result->Set(current_context, key, jsFrames).Check();
// Create a thread object with a 'frames' property and optional 'state'
Local<Object> threadObj = Object::New(capture_from_isolate);
threadObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "frames",
NewStringType::kInternalized)
.ToLocalChecked(),
jsFrames)
.Check();

if (!result.state.empty()) {
v8::MaybeLocal<v8::String> stateStr = v8::String::NewFromUtf8(
capture_from_isolate, result.state.c_str(), NewStringType::kNormal);
if (!stateStr.IsEmpty()) {
v8::MaybeLocal<v8::Value> maybeStateVal =
v8::JSON::Parse(current_context, stateStr.ToLocalChecked());
v8::Local<v8::Value> stateVal;
if (maybeStateVal.ToLocal(&stateVal)) {
threadObj
->Set(current_context,
String::NewFromUtf8(capture_from_isolate, "state",
NewStringType::kInternalized)
.ToLocalChecked(),
stateVal)
.Check();
}
}
}

output->Set(current_context, key, threadObj).Check();
}

args.GetReturnValue().Set(result);
args.GetReturnValue().Set(output);
}

// Cleanup function to remove the thread from the map when the isolate is
Expand All @@ -179,9 +219,9 @@ void RegisterThread(const FunctionCallbackInfo<Value> &args) {

if (args.Length() != 1 || !args[0]->IsString()) {
isolate->ThrowException(Exception::Error(
String::NewFromUtf8(
isolate, "registerThread(name) requires a single name argument",
NewStringType::kInternalized)
String::NewFromUtf8(isolate,
"threadStart(name) requires a single name argument",
Comment thread
timfish marked this conversation as resolved.
Outdated
NewStringType::kInternalized)
.ToLocalChecked()));

return;
Expand All @@ -194,13 +234,39 @@ void RegisterThread(const FunctionCallbackInfo<Value> &args) {
std::lock_guard<std::mutex> lock(threads_mutex);
auto found = threads.find(isolate);
if (found == threads.end()) {
threads.emplace(isolate, ThreadInfo{thread_name, milliseconds::zero()});
threads.emplace(isolate,
ThreadInfo{thread_name, milliseconds::zero(), ""});
// Register a cleanup hook to remove this thread when the isolate is
// destroyed
node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate);
}
}
}

// Function to track a thread and set its state
void ThreadPoll(const FunctionCallbackInfo<Value> &args) {
auto isolate = args.GetIsolate();
auto context = isolate->GetCurrentContext();

std::string state_str;
if (args.Length() == 1 && args[0]->IsValue()) {
MaybeLocal<String> maybe_json = v8::JSON::Stringify(context, args[0]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if a return a huge json? Should we account for size limits in some way?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to think about this but I think the worse case is too much memory use or OOM.

if (!maybe_json.IsEmpty()) {
v8::String::Utf8Value utf8_state(isolate, maybe_json.ToLocalChecked());
state_str = *utf8_state ? *utf8_state : "";
} else {
state_str = "";
}
} else {
state_str = "";
}

{
std::lock_guard<std::mutex> lock(threads_mutex);
auto found = threads.find(isolate);
if (found != threads.end()) {
auto &thread_info = found->second;
thread_info.thread_name = thread_name;
thread_info.state = state_str;
thread_info.last_seen =
duration_cast<milliseconds>(system_clock::now().time_since_epoch());
}
Expand Down Expand Up @@ -257,6 +323,16 @@ NODE_MODULE_INITIALIZER(Local<Object> exports, Local<Value> module,
.ToLocalChecked())
.Check();

exports
->Set(context,
String::NewFromUtf8(isolate, "threadPoll",
NewStringType::kInternalized)
.ToLocalChecked(),
FunctionTemplate::New(isolate, ThreadPoll)
->GetFunction(context)
.ToLocalChecked())
.Check();

exports
->Set(context,
String::NewFromUtf8(isolate, "getThreadsLastSeen",
Expand Down
25 changes: 22 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const arch = process.env['BUILD_ARCH'] || _arch();
const abi = getAbi(versions.node, 'node');
const identifier = [platform, arch, stdlib, abi].filter(c => c !== undefined && c !== null).join('-');

type Thread<S = unknown> = {
frames: StackFrame[];
state?: S
}

type StackFrame = {
function: string;
filename: string;
Expand All @@ -20,7 +25,8 @@ type StackFrame = {

interface Native {
registerThread(threadName: string): void;
captureStackTrace(): Record<string, StackFrame[]>;
threadPoll(state?: object): void;
captureStackTrace<S = unknown>(): Record<string, Thread<S>>;
getThreadsLastSeen(): Record<string, number>;
}

Expand Down Expand Up @@ -177,11 +183,24 @@ export function registerThread(threadName: string = String(threadId)): void {
native.registerThread(threadName);
}

/**
* Tells the native module that the thread is still running and updates the state.
*
* @param state Optional state to pass to the native module.
*/
export function threadPoll(state?: object): void {
if (typeof state === 'object') {
native.threadPoll(state);
} else {
native.threadPoll();
}
}

/**
* Captures stack traces for all registered threads.
*/
export function captureStackTrace(): Record<string, StackFrame[]> {
return native.captureStackTrace();
export function captureStackTrace<S = unknown>(): Record<string, Thread<S>> {
return native.captureStackTrace<S>();
}

/**
Expand Down
10 changes: 6 additions & 4 deletions test/e2e.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {

const stacks = JSON.parse(result.stdout.toString());

expect(stacks['0']).toEqual(expect.arrayContaining([
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
{
function: 'pbkdf2Sync',
filename: expect.any(String),
Expand All @@ -34,7 +34,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {
},
]));

expect(stacks['2']).toEqual(expect.arrayContaining([
expect(stacks['2'].frames).toEqual(expect.arrayContaining([
{
function: 'pbkdf2Sync',
filename: expect.any(String),
Expand Down Expand Up @@ -64,7 +64,7 @@ describe('e2e Tests', { timeout: 20000 }, () => {

const stacks = JSON.parse(result.stdout.toString());

expect(stacks['0']).toEqual(expect.arrayContaining([
expect(stacks['0'].frames).toEqual(expect.arrayContaining([
{
function: 'pbkdf2Sync',
filename: expect.any(String),
Expand All @@ -85,6 +85,8 @@ describe('e2e Tests', { timeout: 20000 }, () => {
},
]));

expect(stacks['2'].length).toEqual(1);
expect(stacks['0'].state).toEqual({ some_property: 'some_value' });

expect(stacks['2'].frames.length).toEqual(1);
});
});
7 changes: 0 additions & 7 deletions test/package.json

This file was deleted.

6 changes: 4 additions & 2 deletions test/stalled.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
const { Worker } = require('node:worker_threads');
const { longWork } = require('./long-work.js');
const { registerThread } = require('@sentry-internal/node-native-stacktrace');
const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace');

registerThread();

setInterval(() => {
registerThread();
threadPoll({ some_property: 'some_value' });
}, 200).unref();

const watchdog = new Worker('./test/stalled-watchdog.js');
Expand Down
8 changes: 4 additions & 4 deletions test/worker-do-nothing.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
const { longWork } = require('./long-work');
const { registerThread } = require('@sentry-internal/node-native-stacktrace');
const { registerThread, threadPoll } = require('@sentry-internal/node-native-stacktrace');

registerThread();

setInterval(() => {
registerThread();
threadPoll();
}, 200);