Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/do.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,24 @@ export class StarbaseDBDurableObject extends DurableObject {
"operator" TEXT DEFAULT '='
)`

const exportJobsStatement = `
CREATE TABLE IF NOT EXISTS tmp_export_jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL,
started_at INTEGER NOT NULL,
completed_at INTEGER,
current_table TEXT,
current_row INTEGER,
total_tables INTEGER,
file_name TEXT NOT NULL,
error TEXT
)`

this.executeQuery({ sql: cacheStatement })
this.executeQuery({ sql: allowlistStatement })
this.executeQuery({ sql: allowlistRejectedStatement })
this.executeQuery({ sql: rlsStatement })
this.executeQuery({ sql: exportJobsStatement })
}

init() {
Expand Down
189 changes: 189 additions & 0 deletions src/export/dump-streaming.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import { executeOperation } from '.'
import { StarbaseDBConfiguration } from '../handler'
import { DataSource } from '../types'
import { createResponse } from '../utils'
import {
createExportJob,
completeExportJob,
failExportJob,
updateExportJobProgress,
} from './job'

const CHUNK_SIZE = 1000 // Number of rows to process per chunk
const MAX_EXECUTION_TIME = 25000 // 25 seconds, leaving buffer before 30s timeout

/**
* Stream database dump to R2 using chunked writes to support large databases
*/
export async function streamDatabaseDumpToR2(
dataSource: DataSource,
config: StarbaseDBConfiguration
): Promise<Response> {
try {
// Check if R2 bucket is available
if (!dataSource.exportBucket) {
return createResponse(
undefined,
'R2 bucket not configured. Please configure EXPORT_BUCKET binding in wrangler.toml',
500
)
}

// Start new export
const timestamp = new Date()
.toISOString()
.replace(/[:.]/g, '-')
.replace('T', '_')
.split('Z')[0]
const fileName = `dump_${timestamp}.sql`

// Get all table names (excluding temp tables)
const tablesResult = await executeOperation(
[{ sql: "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'tmp_%';" }],
dataSource,
config
)

const tables = tablesResult.map((row: any) => row.name)

if (tables.length === 0) {
// Empty database - create minimal dump file
const emptyContent = 'SQLite format 3\0\n-- Empty database\n'
await dataSource.exportBucket.put(fileName, emptyContent)
return createResponse(
{ fileName, status: 'completed', jobId: null },
undefined,
200
)
}

const jobId = await createExportJob(dataSource, fileName)
const startTime = Date.now()

// Build the dump content in memory with chunked processing
let dumpContent = 'SQLite format 3\0\n'
let currentTableIndex = 0

for (const tableName of tables) {
currentTableIndex++

// Check if we're approaching timeout
const elapsedTime = Date.now() - startTime
if (elapsedTime > MAX_EXECUTION_TIME) {
// For now, write what we have and indicate partial completion
// Future enhancement: implement alarm-based continuation
await dataSource.exportBucket.put(fileName, dumpContent)
await updateExportJobProgress(
dataSource,
jobId,
tableName,
0,
tables.length
)

return createResponse(
{
jobId,
fileName,
status: 'in_progress',
message: 'Export in progress. Use /export/job/:jobId to check status.',
progress: {
currentTable: currentTableIndex,
totalTables: tables.length,
},
},
undefined,
202
)
}

// Get table schema
const schemaResult = await executeOperation(
[
{
sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`,
params: [tableName],
},
],
dataSource,
config
)

if (schemaResult.length > 0) {
const schema = schemaResult[0].sql
dumpContent += `\n-- Table: ${tableName}\n${schema};\n\n`
}

// Get table data in chunks to avoid loading all rows at once
let offset = 0
let hasMoreRows = true

while (hasMoreRows) {
const dataResult = await executeOperation(
[
{
sql: `SELECT * FROM "${tableName}" LIMIT ? OFFSET ?;`,
params: [CHUNK_SIZE, offset],
},
],
dataSource,
config
)

if (dataResult.length > 0) {
// Generate INSERT statements for this chunk
for (const row of dataResult) {
const values = Object.values(row).map((value) =>
typeof value === 'string'
? `'${value.replace(/'/g, "''")}'`
: value === null
? 'NULL'
: value
)
dumpContent += `INSERT INTO "${tableName}" VALUES (${values.join(', ')});\n`
}

// Update progress
await updateExportJobProgress(
dataSource,
jobId,
tableName,
offset + dataResult.length,
tables.length
)

offset += CHUNK_SIZE

// If we got fewer rows than CHUNK_SIZE, we've reached the end
if (dataResult.length < CHUNK_SIZE) {
hasMoreRows = false
}
} else {
hasMoreRows = false
}
}

dumpContent += '\n'
}

// Write final content to R2
await dataSource.exportBucket.put(fileName, dumpContent)

// Mark job as completed
await completeExportJob(dataSource, jobId)

return createResponse(
{
jobId,
fileName,
status: 'completed',
message: `Database exported successfully. Download at /export/download/${fileName}`,
},
undefined,
200
)
} catch (error: any) {
console.error('Database Dump Error:', error)
return createResponse(undefined, 'Failed to create database dump', 500)
}
}
5 changes: 5 additions & 0 deletions src/export/dump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ import { executeOperation } from '.'
import { StarbaseDBConfiguration } from '../handler'
import { DataSource } from '../types'
import { createResponse } from '../utils'
import { streamDatabaseDumpToR2 } from './dump-streaming'

export async function dumpDatabaseRoute(
dataSource: DataSource,
config: StarbaseDBConfiguration
): Promise<Response> {
try {
// Use streaming export to R2 if bucket is configured
if (dataSource.exportBucket) {
return streamDatabaseDumpToR2(dataSource, config)
}
// Get all table names
const tablesResult = await executeOperation(
[{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }],
Expand Down
142 changes: 142 additions & 0 deletions src/export/job.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { DataSource } from '../types'

export interface ExportJob {
id: string
status: 'in_progress' | 'completed' | 'failed'
startedAt: number
completedAt?: number
currentTable?: string
currentRow?: number
totalTables?: number
fileName: string
error?: string
}

/**
* Create a table to track export jobs in the Durable Object storage
*/
export async function initializeExportJobsTable(dataSource: DataSource): Promise<void> {
const createTableSql = `
CREATE TABLE IF NOT EXISTS tmp_export_jobs (
id TEXT PRIMARY KEY,
status TEXT NOT NULL,
started_at INTEGER NOT NULL,
completed_at INTEGER,
current_table TEXT,
current_row INTEGER,
total_tables INTEGER,
file_name TEXT NOT NULL,
error TEXT
);
`
await dataSource.rpc.executeQuery({ sql: createTableSql })
}

/**
* Create a new export job
*/
export async function createExportJob(
dataSource: DataSource,
fileName: string
): Promise<string> {
const jobId = `export_${Date.now()}_${Math.random().toString(36).substring(7)}`
const sql = `
INSERT INTO tmp_export_jobs (id, status, started_at, file_name)
VALUES (?, ?, ?, ?);
`
await dataSource.rpc.executeQuery({
sql,
params: [jobId, 'in_progress', Date.now(), fileName],
})
return jobId
}

/**
* Update export job progress
*/
export async function updateExportJobProgress(
dataSource: DataSource,
jobId: string,
currentTable: string,
currentRow: number,
totalTables: number
): Promise<void> {
const sql = `
UPDATE tmp_export_jobs
SET current_table = ?, current_row = ?, total_tables = ?
WHERE id = ?;
`
await dataSource.rpc.executeQuery({
sql,
params: [currentTable, currentRow, totalTables, jobId],
})
}

/**
* Mark export job as completed
*/
export async function completeExportJob(
dataSource: DataSource,
jobId: string
): Promise<void> {
const sql = `
UPDATE tmp_export_jobs
SET status = 'completed', completed_at = ?
WHERE id = ?;
`
await dataSource.rpc.executeQuery({
sql,
params: [Date.now(), jobId],
})
}

/**
* Mark export job as failed
*/
export async function failExportJob(
dataSource: DataSource,
jobId: string,
error: string
): Promise<void> {
const sql = `
UPDATE tmp_export_jobs
SET status = 'failed', completed_at = ?, error = ?
WHERE id = ?;
`
await dataSource.rpc.executeQuery({
sql,
params: [Date.now(), error, jobId],
})
}

/**
* Get export job status
*/
export async function getExportJob(
dataSource: DataSource,
jobId: string
): Promise<ExportJob | null> {
const sql = `SELECT * FROM tmp_export_jobs WHERE id = ?;`
const result = (await dataSource.rpc.executeQuery({
sql,
params: [jobId],
isRaw: false,
})) as any[]

if (!result || result.length === 0) {
return null
}

const row = result[0]
return {
id: row.id as string,
status: row.status as 'in_progress' | 'completed' | 'failed',
startedAt: row.started_at as number,
completedAt: row.completed_at as number | undefined,
currentTable: row.current_table as string | undefined,
currentRow: row.current_row as number | undefined,
totalTables: row.total_tables as number | undefined,
fileName: row.file_name as string,
error: row.error as string | undefined,
}
}
Loading