Skip to content

Commit 4ae67bf

Browse files
JiT compilation stage implementation
Implement JiT compilation stage in Core: - a new entry point jitCompiler(rpcCallback), which instantiates a single compile method, performing a compilation against a supplied RPC implementation. - table/operations/incremental table context(s) construction and invocation of the JiT code.
1 parent 649fd9e commit 4ae67bf

5 files changed

Lines changed: 276 additions & 2 deletions

File tree

core/BUILD

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ ts_library(
3333
"contextables.ts",
3434
"extension.ts",
3535
"index.ts",
36+
"jit_compiler.ts",
3637
"jit_context.ts",
3738
"main.ts",
3839
"path.ts",
@@ -77,6 +78,7 @@ ts_test_suite(
7778
"actions/table_test.ts",
7879
"actions/test_test.ts",
7980
"actions/view_test.ts",
81+
"jit_compiler_test.ts",
8082
"jit_context_test.ts",
8183
],
8284
data = [

core/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { compile as compiler } from "df/core/compilers";
22
import { IDataformExtension } from "df/core/extension";
3+
import { IJitCompiler, jitCompiler } from "df/core/jit_compiler";
34
import { main } from "df/core/main";
45
import { Session } from "df/core/session";
56
import { version } from "df/core/version";
@@ -29,4 +30,4 @@ function indexFileGenerator() {
2930
// These exports constitute the public API of @dataform/core.
3031
// They must also be listed in packages/@dataform/core/index.ts.
3132
// Changes to these will break @dataform/cli, so take care!
32-
export { compiler, IDataformExtension, indexFileGenerator, main, session, supportedFeatures, version };
33+
export { compiler, IDataformExtension, indexFileGenerator, IJitCompiler, jitCompiler, main, session, supportedFeatures, version };

core/jit_compiler.ts

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import * as $protobuf from "protobufjs";
2+
3+
import { JitOperationResult } from "df/core/actions/operation";
4+
import { JitTableResult } from "df/core/actions/table";
5+
import { IActionContext, ITableContext, JitContext } from "df/core/contextables";
6+
import { IncrementalTableJitContext, SqlActionJitContext, TableJitContext } from "df/core/jit_context";
7+
import { dataform } from "df/protos/ts";
8+
9+
function makeMainBody<Context, T>(code: string): (jctx: JitContext<Context>) => Promise<T> {
10+
return (
11+
jctx => {
12+
const mainAsync = async () => {
13+
// tslint:disable-next-line: tsr-detect-eval-with-expression
14+
const body = new Function(
15+
"jctx", `const f = ${code};\nreturn f(jctx);`
16+
) as (jctx: JitContext<Context>) => Promise<T>;
17+
return body(jctx);
18+
};
19+
return mainAsync();
20+
});
21+
}
22+
23+
function makeJitTableResult(result: JitTableResult): dataform.IJitTableResult {
24+
let jitResult: dataform.IJitTableResult = {};
25+
if (typeof result === "string") {
26+
jitResult.query = result;
27+
} else {
28+
jitResult = result;
29+
}
30+
31+
return dataform.JitTableResult.create(jitResult);
32+
}
33+
34+
function jitCompileOperation(
35+
request: dataform.IJitCompilationRequest,
36+
adapter: dataform.DbAdapter,
37+
): Promise<dataform.IJitOperationResult> {
38+
const mainBody = makeMainBody<IActionContext, JitOperationResult>(request.jitCode);
39+
40+
const jctx: JitContext<IActionContext> = new SqlActionJitContext(
41+
adapter, request,
42+
);
43+
return mainBody(jctx).then(mainResult => {
44+
let queries: string[] | null = [];
45+
if (typeof mainResult === "string") {
46+
queries.push(mainResult);
47+
} else if (Array.isArray(mainResult)) {
48+
queries.push(...mainResult);
49+
} else {
50+
queries = mainResult.queries;
51+
}
52+
53+
return dataform.JitOperationResult.create({ queries });
54+
});
55+
}
56+
57+
function jitCompileTable(
58+
request: dataform.IJitCompilationRequest,
59+
adapter: dataform.DbAdapter,
60+
): Promise<dataform.IJitTableResult> {
61+
const mainBody = makeMainBody<ITableContext, JitTableResult>(request.jitCode);
62+
63+
const jctx: JitContext<ITableContext> = new TableJitContext(
64+
adapter, request,
65+
);
66+
return mainBody(jctx).then(makeJitTableResult);
67+
}
68+
69+
function jitCompileIncrementalTable(
70+
request: dataform.IJitCompilationRequest,
71+
adapter: dataform.DbAdapter,
72+
): Promise<dataform.IJitIncrementalTableResult> {
73+
const mainBody = makeMainBody<ITableContext, JitTableResult>(request.jitCode);
74+
75+
const incrementalJctx = new IncrementalTableJitContext(
76+
adapter, request, true,
77+
);
78+
const regularJctx = new IncrementalTableJitContext(
79+
adapter, request, false,
80+
);
81+
82+
return Promise.all([
83+
mainBody(incrementalJctx),
84+
mainBody(regularJctx),
85+
]).then(([incrementalResult, regularResult]) => {
86+
return dataform.JitIncrementalTableResult.create({
87+
incremental: makeJitTableResult(incrementalResult),
88+
regular: makeJitTableResult(regularResult),
89+
});
90+
});
91+
}
92+
93+
export interface IJitCompiler {
94+
compile: (request: Uint8Array) => Promise<Uint8Array>;
95+
}
96+
97+
/** RPC callback, implementing DbAdapter. */
98+
export type RpcCallback = (method: string, request: Uint8Array, callback: (error: Error | null, response: Uint8Array) => void) => void;
99+
100+
export function jitCompile(request: dataform.IJitCompilationRequest, rpcCallback: RpcCallback): Promise<dataform.IJitCompilationResponse> {
101+
const rpcImpl: $protobuf.RPCImpl = (method, internalRequest, callback) => {
102+
rpcCallback(method.name, internalRequest, callback);
103+
};
104+
const dbAdapter = dataform.DbAdapter.create(rpcImpl);
105+
106+
switch (request.compilationTargetType) {
107+
case dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION:
108+
return jitCompileOperation(request, dbAdapter).then(
109+
operation => dataform.JitCompilationResponse.create({ operation }));
110+
case dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_TABLE:
111+
return jitCompileTable(request, dbAdapter).then(
112+
table => dataform.JitCompilationResponse.create({ table }));
113+
case dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_INCREMENTAL_TABLE:
114+
return jitCompileIncrementalTable(request, dbAdapter).then(
115+
incrementalTable => dataform.JitCompilationResponse.create({ incrementalTable }));
116+
default:
117+
throw new Error(`Unrecognized compilation target type: ${request.compilationTargetType}`);
118+
}
119+
}
120+
121+
/** Main entry point for the JiT compiler. */
122+
export function jitCompiler(rpcCallback: RpcCallback): IJitCompiler {
123+
return {
124+
compile: (request: Uint8Array) => {
125+
const requestMessage = dataform.JitCompilationRequest.decode(request);
126+
return jitCompile(requestMessage, rpcCallback).then(
127+
response => dataform.JitCompilationResponse.encode(response).finish()
128+
);
129+
}
130+
};
131+
}

core/jit_compiler_test.ts

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import { expect } from "chai";
2+
3+
import { jitCompile } from "df/core/jit_compiler";
4+
import { dataform } from "df/protos/ts";
5+
import { suite, test } from "df/testing";
6+
7+
suite("jit_compiler", () => {
8+
const rpcCallback: (method: string, request: Uint8Array, callback: (error: Error | null, response: Uint8Array) => void) => void =
9+
(method, request, callback) => { callback(null, new Uint8Array()); };
10+
11+
const target = dataform.Target.create({
12+
database: "db",
13+
schema: "schema",
14+
name: "name"
15+
});
16+
17+
suite("jitCompileOperation", () => {
18+
test("compiles operation returning string", async () => {
19+
const request = dataform.JitCompilationRequest.create({
20+
jitCode: `async (ctx) => "SELECT 1"`,
21+
target,
22+
jitData: {},
23+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
24+
});
25+
const result = await jitCompile(request, rpcCallback);
26+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
27+
});
28+
29+
test("compiles operation returning array", async () => {
30+
const request = dataform.JitCompilationRequest.create({
31+
jitCode: `async (ctx) => ["SELECT 1", "SELECT 2"]`,
32+
target,
33+
jitData: {},
34+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
35+
});
36+
const result = await jitCompile(request, rpcCallback);
37+
expect(result.operation.queries).to.deep.equal(["SELECT 1", "SELECT 2"]);
38+
});
39+
40+
test("compiles operation returning object", async () => {
41+
const request = dataform.JitCompilationRequest.create({
42+
jitCode: `async (ctx) => ({ queries: ["SELECT 1"] })`,
43+
target,
44+
jitData: {},
45+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
46+
});
47+
const result = await jitCompile(request, rpcCallback);
48+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
49+
});
50+
51+
test("compiles operation using context", async () => {
52+
const request = dataform.JitCompilationRequest.create({
53+
jitCode: `async (ctx) => ({ queries: [\`SELECT "\${ctx.name()}"\`] })`,
54+
target,
55+
jitData: {},
56+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
57+
});
58+
const result = await jitCompile(request, rpcCallback);
59+
expect(result.operation.queries).to.deep.equal(['SELECT "name"']);
60+
});
61+
62+
test("compiles operation with arrow function returning promise", async () => {
63+
const request = dataform.JitCompilationRequest.create({
64+
jitCode: `(ctx) => Promise.resolve("SELECT 1")`,
65+
target,
66+
jitData: {},
67+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
68+
});
69+
const result = await jitCompile(request, rpcCallback);
70+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
71+
});
72+
73+
test("compiles operation with async function", async () => {
74+
const request = dataform.JitCompilationRequest.create({
75+
jitCode: `async function(ctx) { return "SELECT 1"; }`,
76+
target,
77+
jitData: {},
78+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
79+
});
80+
const result = await jitCompile(request, rpcCallback);
81+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
82+
});
83+
84+
test("compiles operation with regular function returning promise", async () => {
85+
const request = dataform.JitCompilationRequest.create({
86+
jitCode: `function(ctx) { return Promise.resolve("SELECT 1"); }`,
87+
target,
88+
jitData: {},
89+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_OPERATION,
90+
});
91+
const result = await jitCompile(request, rpcCallback);
92+
expect(result.operation.queries).to.deep.equal(["SELECT 1"]);
93+
});
94+
});
95+
96+
suite("jitCompileTable", () => {
97+
test("compiles table returning string", async () => {
98+
const request = dataform.JitCompilationRequest.create({
99+
jitCode: `async (ctx) => "SELECT 1"`,
100+
target,
101+
jitData: {},
102+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_TABLE,
103+
});
104+
const result = await jitCompile(request, rpcCallback);
105+
expect(result.table.query).to.equal("SELECT 1");
106+
});
107+
108+
test("compiles table returning object", async () => {
109+
const request = dataform.JitCompilationRequest.create({
110+
jitCode: `async (ctx) => ({ query: "SELECT 1", preOps: ["PRE"], postOps: ["POST"] })`,
111+
target,
112+
jitData: {},
113+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_TABLE,
114+
});
115+
const result = await jitCompile(request, rpcCallback);
116+
expect(result.table.query).to.equal("SELECT 1");
117+
expect(result.table.preOps).to.deep.equal(["PRE"]);
118+
expect(result.table.postOps).to.deep.equal(["POST"]);
119+
});
120+
});
121+
122+
suite("jitCompileIncrementalTable", () => {
123+
test("compiles incremental table", async () => {
124+
const request = dataform.JitCompilationRequest.create({
125+
jitCode: `async (ctx) => {
126+
if (ctx.incremental()) {
127+
return { query: "SELECT INC" };
128+
}
129+
return { query: "SELECT REG" };
130+
}`,
131+
target,
132+
jitData: {},
133+
compilationTargetType: dataform.JitCompilationTargetType.JIT_COMPILATION_TARGET_TYPE_INCREMENTAL_TABLE,
134+
});
135+
const result = await jitCompile(request, rpcCallback);
136+
expect(result.incrementalTable.incremental?.query).to.equal("SELECT INC");
137+
expect(result.incrementalTable.regular?.query).to.equal("SELECT REG");
138+
});
139+
});
140+
});

packages/@dataform/core/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
export { compiler, IDataformExtension, indexFileGenerator, main, session, supportedFeatures, version } from "df/core";
1+
export { compiler, IDataformExtension, indexFileGenerator, IJitCompiler, jitCompiler, main, session, supportedFeatures, version } from "df/core";

0 commit comments

Comments
 (0)