diff --git a/src/do.ts b/src/do.ts index b6bb2b6..d6aecf0 100644 --- a/src/do.ts +++ b/src/do.ts @@ -59,10 +59,24 @@ export class StarbaseDBDurableObject extends DurableObject { "operator" TEXT DEFAULT '=' )` + const exportJobsStatement = ` + CREATE TABLE IF NOT EXISTS tmp_export_jobs ( + id TEXT PRIMARY KEY, + status TEXT NOT NULL, + started_at INTEGER NOT NULL, + completed_at INTEGER, + current_table TEXT, + current_row INTEGER, + total_tables INTEGER, + file_name TEXT NOT NULL, + error TEXT + )` + this.executeQuery({ sql: cacheStatement }) this.executeQuery({ sql: allowlistStatement }) this.executeQuery({ sql: allowlistRejectedStatement }) this.executeQuery({ sql: rlsStatement }) + this.executeQuery({ sql: exportJobsStatement }) } init() { diff --git a/src/export/dump-streaming.ts b/src/export/dump-streaming.ts new file mode 100644 index 0000000..6be1199 --- /dev/null +++ b/src/export/dump-streaming.ts @@ -0,0 +1,189 @@ +import { executeOperation } from '.' +import { StarbaseDBConfiguration } from '../handler' +import { DataSource } from '../types' +import { createResponse } from '../utils' +import { + createExportJob, + completeExportJob, + failExportJob, + updateExportJobProgress, +} from './job' + +const CHUNK_SIZE = 1000 // Number of rows to process per chunk +const MAX_EXECUTION_TIME = 25000 // 25 seconds, leaving buffer before 30s timeout + +/** + * Stream database dump to R2 using chunked writes to support large databases + */ +export async function streamDatabaseDumpToR2( + dataSource: DataSource, + config: StarbaseDBConfiguration +): Promise { + try { + // Check if R2 bucket is available + if (!dataSource.exportBucket) { + return createResponse( + undefined, + 'R2 bucket not configured. Please configure EXPORT_BUCKET binding in wrangler.toml', + 500 + ) + } + + // Start new export + const timestamp = new Date() + .toISOString() + .replace(/[:.]/g, '-') + .replace('T', '_') + .split('Z')[0] + const fileName = `dump_${timestamp}.sql` + + // Get all table names (excluding temp tables) + const tablesResult = await executeOperation( + [{ sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'tmp_%';" }], + dataSource, + config + ) + + const tables = tablesResult.map((row: any) => row.name) + + if (tables.length === 0) { + // Empty database - create minimal dump file + const emptyContent = 'SQLite format 3\0\n-- Empty database\n' + await dataSource.exportBucket.put(fileName, emptyContent) + return createResponse( + { fileName, status: 'completed', jobId: null }, + undefined, + 200 + ) + } + + const jobId = await createExportJob(dataSource, fileName) + const startTime = Date.now() + + // Build the dump content in memory with chunked processing + let dumpContent = 'SQLite format 3\0\n' + let currentTableIndex = 0 + + for (const tableName of tables) { + currentTableIndex++ + + // Check if we're approaching timeout + const elapsedTime = Date.now() - startTime + if (elapsedTime > MAX_EXECUTION_TIME) { + // For now, write what we have and indicate partial completion + // Future enhancement: implement alarm-based continuation + await dataSource.exportBucket.put(fileName, dumpContent) + await updateExportJobProgress( + dataSource, + jobId, + tableName, + 0, + tables.length + ) + + return createResponse( + { + jobId, + fileName, + status: 'in_progress', + message: 'Export in progress. Use /export/job/:jobId to check status.', + progress: { + currentTable: currentTableIndex, + totalTables: tables.length, + }, + }, + undefined, + 202 + ) + } + + // Get table schema + const schemaResult = await executeOperation( + [ + { + sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`, + params: [tableName], + }, + ], + dataSource, + config + ) + + if (schemaResult.length > 0) { + const schema = schemaResult[0].sql + dumpContent += `\n-- Table: ${tableName}\n${schema};\n\n` + } + + // Get table data in chunks to avoid loading all rows at once + let offset = 0 + let hasMoreRows = true + + while (hasMoreRows) { + const dataResult = await executeOperation( + [ + { + sql: `SELECT * FROM "${tableName}" LIMIT ? OFFSET ?;`, + params: [CHUNK_SIZE, offset], + }, + ], + dataSource, + config + ) + + if (dataResult.length > 0) { + // Generate INSERT statements for this chunk + for (const row of dataResult) { + const values = Object.values(row).map((value) => + typeof value === 'string' + ? `'${value.replace(/'/g, "''")}'` + : value === null + ? 'NULL' + : value + ) + dumpContent += `INSERT INTO "${tableName}" VALUES (${values.join(', ')});\n` + } + + // Update progress + await updateExportJobProgress( + dataSource, + jobId, + tableName, + offset + dataResult.length, + tables.length + ) + + offset += CHUNK_SIZE + + // If we got fewer rows than CHUNK_SIZE, we've reached the end + if (dataResult.length < CHUNK_SIZE) { + hasMoreRows = false + } + } else { + hasMoreRows = false + } + } + + dumpContent += '\n' + } + + // Write final content to R2 + await dataSource.exportBucket.put(fileName, dumpContent) + + // Mark job as completed + await completeExportJob(dataSource, jobId) + + return createResponse( + { + jobId, + fileName, + status: 'completed', + message: `Database exported successfully. Download at /export/download/${fileName}`, + }, + undefined, + 200 + ) + } catch (error: any) { + console.error('Database Dump Error:', error) + return createResponse(undefined, 'Failed to create database dump', 500) + } +} diff --git a/src/export/dump.ts b/src/export/dump.ts index 91a2e89..e73b2ec 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -2,12 +2,17 @@ import { executeOperation } from '.' import { StarbaseDBConfiguration } from '../handler' import { DataSource } from '../types' import { createResponse } from '../utils' +import { streamDatabaseDumpToR2 } from './dump-streaming' export async function dumpDatabaseRoute( dataSource: DataSource, config: StarbaseDBConfiguration ): Promise { try { + // Use streaming export to R2 if bucket is configured + if (dataSource.exportBucket) { + return streamDatabaseDumpToR2(dataSource, config) + } // Get all table names const tablesResult = await executeOperation( [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }], diff --git a/src/export/job.ts b/src/export/job.ts new file mode 100644 index 0000000..6ec1c88 --- /dev/null +++ b/src/export/job.ts @@ -0,0 +1,142 @@ +import { DataSource } from '../types' + +export interface ExportJob { + id: string + status: 'in_progress' | 'completed' | 'failed' + startedAt: number + completedAt?: number + currentTable?: string + currentRow?: number + totalTables?: number + fileName: string + error?: string +} + +/** + * Create a table to track export jobs in the Durable Object storage + */ +export async function initializeExportJobsTable(dataSource: DataSource): Promise { + const createTableSql = ` + CREATE TABLE IF NOT EXISTS tmp_export_jobs ( + id TEXT PRIMARY KEY, + status TEXT NOT NULL, + started_at INTEGER NOT NULL, + completed_at INTEGER, + current_table TEXT, + current_row INTEGER, + total_tables INTEGER, + file_name TEXT NOT NULL, + error TEXT + ); + ` + await dataSource.rpc.executeQuery({ sql: createTableSql }) +} + +/** + * Create a new export job + */ +export async function createExportJob( + dataSource: DataSource, + fileName: string +): Promise { + const jobId = `export_${Date.now()}_${Math.random().toString(36).substring(7)}` + const sql = ` + INSERT INTO tmp_export_jobs (id, status, started_at, file_name) + VALUES (?, ?, ?, ?); + ` + await dataSource.rpc.executeQuery({ + sql, + params: [jobId, 'in_progress', Date.now(), fileName], + }) + return jobId +} + +/** + * Update export job progress + */ +export async function updateExportJobProgress( + dataSource: DataSource, + jobId: string, + currentTable: string, + currentRow: number, + totalTables: number +): Promise { + const sql = ` + UPDATE tmp_export_jobs + SET current_table = ?, current_row = ?, total_tables = ? + WHERE id = ?; + ` + await dataSource.rpc.executeQuery({ + sql, + params: [currentTable, currentRow, totalTables, jobId], + }) +} + +/** + * Mark export job as completed + */ +export async function completeExportJob( + dataSource: DataSource, + jobId: string +): Promise { + const sql = ` + UPDATE tmp_export_jobs + SET status = 'completed', completed_at = ? + WHERE id = ?; + ` + await dataSource.rpc.executeQuery({ + sql, + params: [Date.now(), jobId], + }) +} + +/** + * Mark export job as failed + */ +export async function failExportJob( + dataSource: DataSource, + jobId: string, + error: string +): Promise { + const sql = ` + UPDATE tmp_export_jobs + SET status = 'failed', completed_at = ?, error = ? + WHERE id = ?; + ` + await dataSource.rpc.executeQuery({ + sql, + params: [Date.now(), error, jobId], + }) +} + +/** + * Get export job status + */ +export async function getExportJob( + dataSource: DataSource, + jobId: string +): Promise { + const sql = `SELECT * FROM tmp_export_jobs WHERE id = ?;` + const result = (await dataSource.rpc.executeQuery({ + sql, + params: [jobId], + isRaw: false, + })) as any[] + + if (!result || result.length === 0) { + return null + } + + const row = result[0] + return { + id: row.id as string, + status: row.status as 'in_progress' | 'completed' | 'failed', + startedAt: row.started_at as number, + completedAt: row.completed_at as number | undefined, + currentTable: row.current_table as string | undefined, + currentRow: row.current_row as number | undefined, + totalTables: row.total_tables as number | undefined, + fileName: row.file_name as string, + error: row.error as string | undefined, + } +} diff --git a/src/handler.ts b/src/handler.ts index 3fa0085..4ea9d87 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -9,6 +9,7 @@ import { createResponse, QueryRequest, QueryTransactionRequest } from './utils' import { dumpDatabaseRoute } from './export/dump' import { exportTableToJsonRoute } from './export/json' import { exportTableToCsvRoute } from './export/csv' +import { getExportJob } from './export/job' import { importDumpRoute } from './import/dump' import { importTableFromJsonRoute } from './import/json' import { importTableFromCsvRoute } from './import/csv' @@ -151,6 +152,54 @@ export class StarbaseDB { ) } ) + + // Get export job status + this.app.get('/export/job/:jobId', this.isInternalSource, async (c) => { + try { + const jobId = c.req.param('jobId') + const job = await getExportJob(this.dataSource, jobId) + + if (!job) { + return createResponse(undefined, 'Export job not found', 404) + } + + return createResponse(job, undefined, 200) + } catch (error: any) { + console.error('Get export job error:', error) + return createResponse(undefined, 'Failed to get export job status', 500) + } + }) + + // Download export file from R2 + this.app.get('/export/download/:fileName', this.isInternalSource, async (c) => { + try { + const fileName = c.req.param('fileName') + + if (!this.dataSource.exportBucket) { + return createResponse( + undefined, + 'R2 bucket not configured', + 500 + ) + } + + const file = await this.dataSource.exportBucket.get(fileName) + + if (!file) { + return createResponse(undefined, 'Export file not found', 404) + } + + const headers = new Headers({ + 'Content-Type': 'application/x-sqlite3', + 'Content-Disposition': `attachment; filename="${fileName}"`, + }) + + return new Response(file.body, { headers }) + } catch (error: any) { + console.error('Download export file error:', error) + return createResponse(undefined, 'Failed to download export file', 500) + } + }) } if (this.getFeature('import')) { diff --git a/src/index.ts b/src/index.ts index 4d08932..9919315 100644 --- a/src/index.ts +++ b/src/index.ts @@ -55,6 +55,7 @@ export interface Env { AUTH_JWKS_ENDPOINT?: string HYPERDRIVE: Hyperdrive + EXPORT_BUCKET?: R2Bucket // ## DO NOT REMOVE: TEMPLATE INTERFACE ## } @@ -121,6 +122,7 @@ export default { ...context, }, executionContext: ctx, + exportBucket: env.EXPORT_BUCKET, } if (env.EXTERNAL_DB_TYPE === 'postgresql') { diff --git a/src/types.ts b/src/types.ts index b0a59f4..d91ea90 100644 --- a/src/types.ts +++ b/src/types.ts @@ -64,6 +64,7 @@ export type DataSource = { cacheTTL?: number registry?: StarbasePluginRegistry executionContext?: ExecutionContext + exportBucket?: R2Bucket } export enum RegionLocationHint { diff --git a/worker-configuration.d.ts b/worker-configuration.d.ts index 6c35c6f..a562b5b 100644 --- a/worker-configuration.d.ts +++ b/worker-configuration.d.ts @@ -13,4 +13,5 @@ interface Env { DATABASE_DURABLE_OBJECT: DurableObjectNamespace< import('./src/index').StarbaseDBDurableObject > + EXPORT_BUCKET: R2Bucket } diff --git a/wrangler.toml b/wrangler.toml index 395c4ac..27acc7b 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -14,6 +14,12 @@ rules = [ # Service Bindings ## DO NOT REMOVE: TEMPLATE SERVICES ## +# R2 Bucket for large database exports +# Docs: https://developers.cloudflare.com/r2/ +[[r2_buckets]] +binding = "EXPORT_BUCKET" +bucket_name = "starbasedb-exports" + # Workers Logs # Docs: https://developers.cloudflare.com/workers/observability/logs/workers-logs/ # Configuration: https://developers.cloudflare.com/workers/observability/logs/workers-logs/#enable-workers-logs