Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/workerd/api/unsafe.c++
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,15 @@ jsg::Promise<void> UnsafeModule::abortAllDurableObjects(jsg::Lock& js) {
return js.resolvedPromise();
}

jsg::Promise<void> UnsafeModule::deleteAllDurableObjects(jsg::Lock& js) {
auto& context = IoContext::current();

auto exception = JSG_KJ_EXCEPTION(FAILED, Error, "Application called deleteAllDurableObjects().");
context.deleteAllActors(exception);

return js.resolvedPromise();
}

bool UnsafeModule::isTestAutogateEnabled() {
return util::Autogate::isEnabled(util::AutogateKey::TEST_WORKERD);
}
Expand Down
5 changes: 5 additions & 0 deletions src/workerd/api/unsafe.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,17 @@ class UnsafeModule: public jsg::Object {
UnsafeModule(jsg::Lock&, const jsg::Url&) {}
jsg::Promise<void> abortAllDurableObjects(jsg::Lock& js);

// Like abortAllDurableObjects(), but also deletes storage and cancels alarms so DOs
// restart with clean state. Namespaces with preventEviction are not affected.
jsg::Promise<void> deleteAllDurableObjects(jsg::Lock& js);

// Returns true if the TEST_WORKERD autogate is enabled.
// This is used to verify that the all-autogates test variant is working correctly.
bool isTestAutogateEnabled();

JSG_RESOURCE_TYPE(UnsafeModule) {
JSG_METHOD(abortAllDurableObjects);
JSG_METHOD(deleteAllDurableObjects);
JSG_METHOD(isTestAutogateEnabled);
}
};
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/io/io-channels.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,12 @@ class IoChannelFactory {
KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime");
}

// Aborts all actors, cancels all alarms, and deletes all underlying storage for evictable
// namespaces. After this, DOs can be recreated with clean state. Useful for test isolation.
virtual void deleteAllActors(kj::Maybe<kj::Exception&> reason) {
KJ_UNIMPLEMENTED("Only implemented by single-tenant workerd runtime");
}

// Use a dynamic Worker loader binding to obtain an Worker by name. If name is null, or if the named Worker doesn't already exist, the callback will be called to fetch the source code from which the Worker should be created.
virtual kj::Own<WorkerStubChannel> loadIsolate(uint loaderChannel,
kj::Maybe<kj::String> name,
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/io/io-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,10 @@ class IoContext final: public kj::Refcounted, private kj::TaskSet::ErrorHandler
getIoChannelFactory().abortAllActors(reason);
}

void deleteAllActors(kj::Maybe<kj::Exception&> reason) {
getIoChannelFactory().deleteAllActors(reason);
}

// Get an HttpClient to use for Cache API subrequests.
kj::Own<CacheClient> getCacheClient();

Expand Down
7 changes: 7 additions & 0 deletions src/workerd/server/alarm-scheduler.c++
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ bool AlarmScheduler::setAlarm(ActorKey actor, kj::Date scheduledTime) {
return query.changeCount() > 0;
}

void AlarmScheduler::deleteAll() {
// Cancel all in-memory alarm tasks.
alarms.clear();
// Wipe the persistent store.
db->run("DELETE FROM _cf_ALARM;");
}

bool AlarmScheduler::deleteAlarm(ActorKey actor) {
auto query = stmtDeleteAlarm.run(actor.actorId);

Expand Down
3 changes: 3 additions & 0 deletions src/workerd/server/alarm-scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class AlarmScheduler final: kj::TaskSet::ErrorHandler {
bool setAlarm(ActorKey actor, kj::Date scheduledTime);
bool deleteAlarm(ActorKey actor);

// Cancels all pending alarms and removes them from persistent storage.
void deleteAll();

private:
enum class AlarmStatus { WAITING, STARTED, FINISHED };
const kj::Clock& clock;
Expand Down
67 changes: 61 additions & 6 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,7 @@ class Server::WorkerService final: public Service,
using LinkCallback =
kj::Function<LinkedIoChannels(WorkerService&, Worker::ValidationErrorReporter&)>;
using AbortActorsCallback = kj::Function<void(kj::Maybe<const kj::Exception&> reason)>;
using DeleteActorsCallback = kj::Function<void(kj::Maybe<const kj::Exception&> reason)>;

WorkerService(ChannelTokenHandler& channelTokenHandler,
kj::Maybe<kj::StringPtr> serviceName,
Expand All @@ -1913,6 +1914,7 @@ class Server::WorkerService final: public Service,
kj::HashSet<kj::String> actorClassEntrypointsParam,
LinkCallback linkCallback,
AbortActorsCallback abortActorsCallback,
DeleteActorsCallback deleteActorsCallback,
kj::Maybe<kj::String> dockerPathParam,
kj::Maybe<kj::String> containerEgressInterceptorImageParam,
bool isDynamic)
Expand All @@ -1927,6 +1929,7 @@ class Server::WorkerService final: public Service,
actorClassEntrypoints(kj::mv(actorClassEntrypointsParam)),
waitUntilTasks(*this),
abortActorsCallback(kj::mv(abortActorsCallback)),
deleteActorsCallback(kj::mv(deleteActorsCallback)),
dockerPath(kj::mv(dockerPathParam)),
containerEgressInterceptorImage(kj::mv(containerEgressInterceptorImageParam)),
isDynamic(isDynamic) {}
Expand Down Expand Up @@ -2509,6 +2512,18 @@ class Server::WorkerService final: public Service,
}
}

// Resets the actor's SQLite database while the connection is still open,
// avoiding file-locking issues on Windows.
void resetStorage() {
KJ_IF_SOME(a, actor) {
KJ_IF_SOME(cache, a->getPersistent()) {
KJ_IF_SOME(db, cache.getSqliteDatabase()) {
kj::runCatchingExceptions([&]() { db.reset(); });
}
}
}
}

kj::Own<ActorContainer> getFacetContainer(
kj::String childKey, kj::Function<kj::Promise<StartInfo>()> getStartInfo) {
auto makeContainer = [&]() {
Expand Down Expand Up @@ -2973,6 +2988,22 @@ class Server::WorkerService final: public Service,
actors.clear();
}

// Resets all actor databases, aborts all actors, and cancels all alarms so DOs
// can be recreated with clean state.
void deleteAll(kj::Maybe<const kj::Exception&> reason) {
// Reset databases before aborting so connections are still open (avoids
// Windows file-locking issues with deferred handle release).
for (auto& actor: actors) {
actor.value->resetStorage();
}

abortAll(reason);

KJ_IF_SOME(scheduler, ownAlarmScheduler) {
scheduler->deleteAll();
}
}

private:
kj::Own<ActorClass> actorClass;
const ActorConfig& config;
Expand Down Expand Up @@ -3268,6 +3299,7 @@ class Server::WorkerService final: public Service,
kj::HashMap<kj::StringPtr, kj::Own<ActorNamespace>> actorNamespaces;
kj::TaskSet waitUntilTasks;
AbortActorsCallback abortActorsCallback;
DeleteActorsCallback deleteActorsCallback;
kj::Maybe<kj::String> dockerPath;
kj::Maybe<kj::String> containerEgressInterceptorImage;
bool isDynamic;
Expand Down Expand Up @@ -3484,6 +3516,10 @@ class Server::WorkerService final: public Service,
abortActorsCallback(reason);
}

void deleteAllActors(kj::Maybe<kj::Exception&> reason) override {
deleteActorsCallback(reason);
}

kj::Own<WorkerStubChannel> loadIsolate(uint loaderChannel,
kj::Maybe<kj::String> name,
kj::Function<kj::Promise<DynamicWorkerSource>()> fetchSource) override;
Expand Down Expand Up @@ -4017,6 +4053,25 @@ void Server::abortAllActors(kj::Maybe<const kj::Exception&> reason) {
}
}

void Server::deleteAllActors(kj::Maybe<const kj::Exception&> reason) {
for (auto& service: services) {
if (WorkerService* worker = dynamic_cast<WorkerService*>(&*service.value)) {
for (auto& [className, ns]: worker->getActorNamespaces()) {
bool isEvictable = true;
KJ_SWITCH_ONEOF(ns->getConfig()) {
KJ_CASE_ONEOF(c, Durable) {
isEvictable = c.isEvictable;
}
KJ_CASE_ONEOF(c, Ephemeral) {
isEvictable = c.isEvictable;
}
}
if (isEvictable) ns->deleteAll(reason);
}
}
}
}
Comment thread
penalosa marked this conversation as resolved.
Comment thread
penalosa marked this conversation as resolved.
Comment thread
penalosa marked this conversation as resolved.

// WorkerDef is an intermediate representation of everything from `config::Worker::Reader` that
// `Server::makeWorkerImpl()` needs. Similar to `WorkerSource`, we factor out this intermediate
// representation so that we can potentially build it dynamically from input that isn't a
Expand Down Expand Up @@ -4854,12 +4909,12 @@ kj::Promise<kj::Own<Server::WorkerService>> Server::makeWorkerImpl(kj::StringPtr
kj::Maybe<kj::StringPtr> serviceName;
if (!def.isDynamic) serviceName = name;

auto result =
kj::refcounted<WorkerService>(channelTokenHandler, serviceName, globalContext->threadContext,
monotonicClock, kj::mv(worker), kj::mv(errorReporter.defaultEntrypoint),
kj::mv(errorReporter.namedEntrypoints), kj::mv(errorReporter.actorClasses),
kj::mv(linkCallback), KJ_BIND_METHOD(*this, abortAllActors), kj::mv(dockerPath),
kj::mv(containerEgressInterceptorImage), def.isDynamic);
auto result = kj::refcounted<WorkerService>(channelTokenHandler, serviceName,
globalContext->threadContext, monotonicClock, kj::mv(worker),
kj::mv(errorReporter.defaultEntrypoint), kj::mv(errorReporter.namedEntrypoints),
kj::mv(errorReporter.actorClasses), kj::mv(linkCallback),
KJ_BIND_METHOD(*this, abortAllActors), KJ_BIND_METHOD(*this, deleteAllActors),
kj::mv(dockerPath), kj::mv(containerEgressInterceptorImage), def.isDynamic);
result->initActorNamespaces(def.localActorConfigs, network);
co_return result;
}
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ class Server final: private kj::TaskSet::ErrorHandler, private ChannelTokenHandl
// Aborts all actors in this server except those in namespaces marked with `preventEviction`.
void abortAllActors(kj::Maybe<const kj::Exception&> reason);

// Aborts all actors, cancels all alarms, and deletes all underlying storage for evictable
// namespaces. After this, DOs can be recreated with clean state. Useful for test isolation.
void deleteAllActors(kj::Maybe<const kj::Exception&> reason);

// Can only be called in the link stage.
//
// May return a new object or may return a fake-own around a long-lived object.
Expand Down
78 changes: 78 additions & 0 deletions src/workerd/server/tests/unsafe-module/unsafe-module-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// https://opensource.org/licenses/Apache-2.0
import assert from 'node:assert';
import unsafe from 'workerd:unsafe';
import { DurableObject } from 'cloudflare:workers';

function createTestObject(type) {
return class {
Expand All @@ -21,6 +22,30 @@ export const TestEphemeralObjectPreventEviction = createTestObject(
'ephemeral-prevent-eviction'
);

// DO with persistent storage for verifying deleteAllDurableObjects() wipes data.
export class StorageObject extends DurableObject {
async getValue() {
return (await this.ctx.storage.get('key')) ?? null;
}
async setValue(value) {
await this.ctx.storage.put('key', value);
}
}

// DO with an alarm for verifying deleteAllDurableObjects() cancels alarms.
let alarmTriggers = 0;
export class AlarmObject extends DurableObject {
get scheduledTime() {
return this.ctx.storage.getAlarm();
}
async scheduleIn(delay) {
await this.ctx.storage.setAlarm(Date.now() + delay);
}
alarm() {
alarmTriggers++;
}
}

export const test_abort_all_durable_objects = {
async test(ctrl, env, ctx) {
const durableId = env.DURABLE.newUniqueId();
Expand Down Expand Up @@ -88,3 +113,56 @@ export const test_abort_all_durable_objects = {
);
},
};

export const test_delete_all_durable_objects = {
async test(ctrl, env, ctx) {
// Write some data to a durable object.
const id = env.STORAGE.idFromName('test-delete');
let stub = env.STORAGE.get(id);
await stub.setValue('hello');
assert.strictEqual(await stub.getValue(), 'hello');

// Delete all durable objects.
await unsafe.deleteAllDurableObjects();

// Old stub should be broken.
await assert.rejects(() => stub.getValue(), {
name: 'Error',
message: 'Application called deleteAllDurableObjects().',
});

// Recreate the stub — storage should be gone (files were deleted from disk).
stub = env.STORAGE.get(id);
assert.strictEqual(await stub.getValue(), null);
},
};

export const test_delete_all_durable_objects_alarms = {
async test(ctrl, env, ctx) {
const id = env.ALARM.newUniqueId();
const stub = env.ALARM.get(id);

const alarmsBefore = alarmTriggers;
await stub.scheduleIn(500);
assert.notStrictEqual(await stub.scheduledTime, null);

// Delete everything — alarms should be cancelled and not fire.
await unsafe.deleteAllDurableObjects();
await scheduler.wait(1000);
assert.strictEqual(alarmTriggers, alarmsBefore); // alarm did not fire
},
};

export const test_delete_all_durable_objects_respects_prevent_eviction = {
async test(ctrl, env, ctx) {
const id = env.DURABLE_PREVENT_EVICTION.newUniqueId();
const stub = env.DURABLE_PREVENT_EVICTION.get(id);
const res1 = await (await stub.fetch('http://x')).text();

await unsafe.deleteAllDurableObjects();

// preventEviction namespace should be untouched — same response (same instance).
const res2 = await (await stub.fetch('http://x')).text();
assert.strictEqual(res1, res2);
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,29 @@ using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "TEST_TMPDIR", disk = (writable = true) ),
( name = "unsafe-module-test",
worker = (
modules = [
( name = "worker", esModule = embed "unsafe-module-test.js" ),
],
compatibilityFlags = ["nodejs_compat", "unsafe_module"],
compatibilityFlags = ["nodejs_compat", "unsafe_module", "rpc"],
durableObjectNamespaces = [
( className = "TestDurableObject", uniqueKey = "durable" ),
( className = "TestDurableObjectPreventEviction", uniqueKey = "durable-prevent-eviction", preventEviction = true ),
( className = "TestEphemeralObject", ephemeralLocal = void ),
( className = "TestEphemeralObjectPreventEviction", ephemeralLocal = void, preventEviction = true ),
( className = "AlarmObject", uniqueKey = "alarm" ),
( className = "StorageObject", uniqueKey = "storage" ),
],
durableObjectStorage = (inMemory = void),
durableObjectStorage = ( localDisk = "TEST_TMPDIR" ),
bindings = [
( name = "DURABLE", durableObjectNamespace = "TestDurableObject" ),
( name = "DURABLE_PREVENT_EVICTION", durableObjectNamespace = "TestDurableObjectPreventEviction" ),
( name = "EPHEMERAL", durableObjectNamespace = "TestEphemeralObject" ),
( name = "EPHEMERAL_PREVENT_EVICTION", durableObjectNamespace = "TestEphemeralObjectPreventEviction" ),
( name = "ALARM", durableObjectNamespace = "AlarmObject" ),
( name = "STORAGE", durableObjectNamespace = "StorageObject" ),
],
)
),
Expand Down
Loading