Skip to content

Commit 993e658

Browse files
committed
Support providers with in-memory I/O buffers
1 parent bc1a102 commit 993e658

File tree

18 files changed

+486
-78
lines changed

18 files changed

+486
-78
lines changed

Cargo.lock

Lines changed: 48 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ members = [
88
"tests/fixtures/messagepack-valid",
99
"tests/fixtures/messagepack-invalid",
1010
"tests/fixtures/wasm_api_v1",
11+
"tests/fixtures/wasm_api_v2",
1112
]
1213

1314
[package]

providers/shopify_function_v2.wasm

73.4 KB
Binary file not shown.
1.11 MB
Binary file not shown.

src/engine.rs

Lines changed: 48 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
use anyhow::{anyhow, Result};
2-
use rust_embed::RustEmbed;
1+
use anyhow::{anyhow, bail, Result};
2+
use std::path::PathBuf;
33
use std::string::String;
4-
use std::{collections::HashSet, path::PathBuf};
54
use wasmtime::{AsContextMut, Config, Engine, Linker, Module, ResourceLimiter, Store};
6-
use wasmtime_wasi::pipe::{MemoryInputPipe, MemoryOutputPipe};
75
use wasmtime_wasi::preview1::WasiP1Ctx;
8-
use wasmtime_wasi::{I32Exit, WasiCtxBuilder};
6+
use wasmtime_wasi::I32Exit;
97

108
use crate::function_run_result::FunctionRunResult;
9+
use crate::io::{IOHandler, OutputAndLogs};
1110
use crate::{BytesContainer, BytesContainerType};
1211

1312
#[derive(Clone)]
@@ -16,44 +15,15 @@ pub struct ProfileOpts {
1615
pub out: PathBuf,
1716
}
1817

19-
#[derive(RustEmbed)]
20-
#[folder = "providers/"]
21-
struct StandardProviders;
22-
2318
pub fn uses_msgpack_provider(module: &Module) -> bool {
2419
module.imports().map(|i| i.module()).any(|module| {
25-
module.starts_with("shopify_function_v") || module == "shopify_functions_javy_v2"
20+
module.starts_with("shopify_function_v")
21+
|| module
22+
.strip_prefix("shopify_functions_javy_v")
23+
.is_some_and(|v| v.parse::<usize>().is_ok_and(|v| v >= 2))
2624
})
2725
}
2826

29-
fn import_modules<T>(
30-
module: &Module,
31-
engine: &Engine,
32-
linker: &mut Linker<T>,
33-
mut store: &mut Store<T>,
34-
) {
35-
let imported_modules: HashSet<String> =
36-
module.imports().map(|i| i.module().to_string()).collect();
37-
38-
imported_modules.iter().for_each(|module_name| {
39-
let provider_path = format!("{module_name}.wasm");
40-
let imported_module_bytes = StandardProviders::get(&provider_path);
41-
42-
if let Some(bytes) = imported_module_bytes {
43-
let imported_module = Module::from_binary(engine, &bytes.data)
44-
.unwrap_or_else(|_| panic!("Failed to load module {module_name}"));
45-
46-
let imported_module_instance = linker
47-
.instantiate(&mut store, &imported_module)
48-
.expect("Failed to instantiate imported instance");
49-
50-
linker
51-
.instance(&mut store, module_name, imported_module_instance)
52-
.expect("Failed to import module");
53-
}
54-
});
55-
}
56-
5727
pub struct FunctionRunParams<'a> {
5828
pub function_path: PathBuf,
5929
pub input: BytesContainer,
@@ -68,12 +38,12 @@ const STARTING_FUEL: u64 = u64::MAX;
6838
const MAXIMUM_MEMORIES: usize = 2; // 1 for the module, 1 for Javy's provider
6939

7040
struct FunctionContext {
71-
wasi: WasiP1Ctx,
41+
wasi: Option<WasiP1Ctx>,
7242
limiter: MemoryLimiter,
7343
}
7444

7545
impl FunctionContext {
76-
fn new(wasi: WasiP1Ctx) -> Self {
46+
fn new(wasi: Option<WasiP1Ctx>) -> Self {
7747
Self {
7848
wasi,
7949
limiter: Default::default(),
@@ -128,33 +98,31 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
12898
module,
12999
} = params;
130100

131-
let input_stream = MemoryInputPipe::new(input.raw.clone());
132-
let output_stream = MemoryOutputPipe::new(usize::MAX);
133-
let error_stream = MemoryOutputPipe::new(usize::MAX);
101+
validate_module(&module)?;
102+
103+
let mut io_handler = IOHandler::new(module, input.clone());
134104

135105
let mut error_logs: String = String::new();
136106

137107
let mut linker = Linker::new(&engine);
138-
wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| {
139-
&mut ctx.wasi
140-
})?;
141-
deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?;
142-
let mut wasi_builder = WasiCtxBuilder::new();
143-
wasi_builder.stdin(input_stream);
144-
wasi_builder.stdout(output_stream.clone());
145-
wasi_builder.stderr(error_stream.clone());
146-
deterministic_wasi_ctx::add_determinism_to_wasi_ctx_builder(&mut wasi_builder);
147-
let wasi = wasi_builder.build_p1();
108+
let wasi = io_handler.wasi();
109+
if wasi.is_some() {
110+
wasmtime_wasi::preview1::add_to_linker_sync(&mut linker, |ctx: &mut FunctionContext| {
111+
ctx.wasi.as_mut().expect("Should have WASI context")
112+
})?;
113+
deterministic_wasi_ctx::replace_scheduling_functions(&mut linker)?;
114+
}
115+
148116
let function_context = FunctionContext::new(wasi);
149117
let mut store = Store::new(&engine, function_context);
150118
store.limiter(|s| &mut s.limiter);
119+
120+
io_handler.initialize(&engine, &mut linker, &mut store)?;
121+
151122
store.set_fuel(STARTING_FUEL)?;
152123
store.set_epoch_deadline(1);
153124

154-
import_modules(&module, &engine, &mut linker, &mut store);
155-
156-
linker.module(&mut store, "Function", &module)?;
157-
let instance = linker.instantiate(&mut store, &module)?;
125+
let instance = linker.instantiate(&mut store, io_handler.module())?;
158126

159127
let func = instance.get_typed_func::<(), ()>(store.as_context_mut(), export)?;
160128

@@ -163,7 +131,6 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
163131
.frequency(profile_opts.interval)
164132
.weight_unit(wasmprof::WeightUnit::Fuel)
165133
.profile(|store| func.call(store.as_context_mut(), ()));
166-
167134
(
168135
result,
169136
Some(profile_data.into_collapsed_stacks().to_string()),
@@ -191,18 +158,14 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
191158
}
192159
}
193160

194-
drop(store);
195-
196-
let mut logs = error_stream
197-
.try_into_inner()
198-
.expect("Log stream reference still exists");
161+
let OutputAndLogs {
162+
output: raw_output,
163+
mut logs,
164+
} = io_handler.finalize(store)?;
199165

200166
logs.extend_from_slice(error_logs.as_bytes());
201167

202168
let output_codec = input.codec;
203-
let raw_output = output_stream
204-
.try_into_inner()
205-
.expect("Output stream reference still exists");
206169
let output = BytesContainer::new(
207170
BytesContainerType::Output,
208171
output_codec,
@@ -244,6 +207,25 @@ pub fn new_engine() -> Result<Engine> {
244207
Engine::new(&config)
245208
}
246209

210+
fn validate_module(module: &Module) -> Result<()> {
211+
// Need to track with deterministic order so don't use a hash
212+
let mut imports = vec![];
213+
for import in module.imports().map(|i| i.module().to_string()) {
214+
if !imports.contains(&import) {
215+
imports.push(import);
216+
}
217+
}
218+
219+
let uses_wasi = imports.contains(&"wasi_snapshot_preview1".to_string());
220+
for import in imports {
221+
if crate::io::is_mem_io_provider(&import) && uses_wasi {
222+
bail!("Invalid Function, cannot use `{import}` and import WASI. If using Rust, change the build target to `wasm32-unknown-unknown.");
223+
}
224+
}
225+
226+
Ok(())
227+
}
228+
247229
#[cfg(test)]
248230
mod tests {
249231
use colored::Colorize;

src/function_run_result.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use colored::Colorize;
33
use serde::{Deserialize, Serialize};
44
use std::fmt;
55

6-
const FUNCTION_LOG_LIMIT: usize = 1_000;
6+
pub(crate) const FUNCTION_LOG_LIMIT: usize = 1_000;
77

88
#[derive(Serialize, Deserialize, Clone, Debug)]
99
pub struct InvalidOutput {

0 commit comments

Comments
 (0)