11use anyhow:: { anyhow, Result } ;
2- use rust_embed :: RustEmbed ;
2+ use std :: path :: PathBuf ;
33use std:: string:: String ;
4- use std:: { collections:: HashSet , path:: PathBuf } ;
54use wasmtime:: { AsContextMut , Config , Engine , Linker , Module , ResourceLimiter , Store } ;
6- use wasmtime_wasi:: pipe:: { MemoryInputPipe , MemoryOutputPipe } ;
75use wasmtime_wasi:: preview1:: WasiP1Ctx ;
8- use wasmtime_wasi:: { I32Exit , WasiCtxBuilder } ;
6+ use wasmtime_wasi:: I32Exit ;
97
108use crate :: function_run_result:: FunctionRunResult ;
9+ use crate :: io:: { IOHandler , OutputAndLogs } ;
1110use crate :: { BytesContainer , BytesContainerType } ;
1211
1312#[ derive( Clone ) ]
@@ -16,44 +15,12 @@ pub struct ProfileOpts {
1615 pub out : PathBuf ,
1716}
1817
19- #[ derive( RustEmbed ) ]
20- #[ folder = "providers/" ]
21- struct StandardProviders ;
22-
2318pub fn uses_msgpack_provider ( module : & Module ) -> bool {
2419 module. imports ( ) . map ( |i| i. module ( ) ) . any ( |module| {
2520 module. starts_with ( "shopify_function_v" ) || module == "shopify_functions_javy_v2"
2621 } )
2722}
2823
29- fn import_modules < T > (
30- module : & Module ,
31- engine : & Engine ,
32- linker : & mut Linker < T > ,
33- mut store : & mut Store < T > ,
34- ) {
35- let imported_modules: HashSet < String > =
36- module. imports ( ) . map ( |i| i. module ( ) . to_string ( ) ) . collect ( ) ;
37-
38- imported_modules. iter ( ) . for_each ( |module_name| {
39- let provider_path = format ! ( "{module_name}.wasm" ) ;
40- let imported_module_bytes = StandardProviders :: get ( & provider_path) ;
41-
42- if let Some ( bytes) = imported_module_bytes {
43- let imported_module = Module :: from_binary ( engine, & bytes. data )
44- . unwrap_or_else ( |_| panic ! ( "Failed to load module {module_name}" ) ) ;
45-
46- let imported_module_instance = linker
47- . instantiate ( & mut store, & imported_module)
48- . expect ( "Failed to instantiate imported instance" ) ;
49-
50- linker
51- . instance ( & mut store, module_name, imported_module_instance)
52- . expect ( "Failed to import module" ) ;
53- }
54- } ) ;
55- }
56-
5724pub struct FunctionRunParams < ' a > {
5825 pub function_path : PathBuf ,
5926 pub input : BytesContainer ,
@@ -68,12 +35,12 @@ const STARTING_FUEL: u64 = u64::MAX;
6835const MAXIMUM_MEMORIES : usize = 2 ; // 1 for the module, 1 for Javy's provider
6936
7037struct FunctionContext {
71- wasi : WasiP1Ctx ,
38+ wasi : Option < WasiP1Ctx > ,
7239 limiter : MemoryLimiter ,
7340}
7441
7542impl FunctionContext {
76- fn new ( wasi : WasiP1Ctx ) -> Self {
43+ fn new ( wasi : Option < WasiP1Ctx > ) -> Self {
7744 Self {
7845 wasi,
7946 limiter : Default :: default ( ) ,
@@ -128,33 +95,29 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
12895 module,
12996 } = params;
13097
131- let input_stream = MemoryInputPipe :: new ( input. raw . clone ( ) ) ;
132- let output_stream = MemoryOutputPipe :: new ( usize:: MAX ) ;
133- let error_stream = MemoryOutputPipe :: new ( usize:: MAX ) ;
98+ let mut io_handler = IOHandler :: new ( module, input. clone ( ) ) ;
13499
135100 let mut error_logs: String = String :: new ( ) ;
136101
137102 let mut linker = Linker :: new ( & engine) ;
138- wasmtime_wasi:: preview1:: add_to_linker_sync ( & mut linker, |ctx : & mut FunctionContext | {
139- & mut ctx. wasi
140- } ) ?;
141- deterministic_wasi_ctx:: replace_scheduling_functions ( & mut linker) ?;
142- let mut wasi_builder = WasiCtxBuilder :: new ( ) ;
143- wasi_builder. stdin ( input_stream) ;
144- wasi_builder. stdout ( output_stream. clone ( ) ) ;
145- wasi_builder. stderr ( error_stream. clone ( ) ) ;
146- deterministic_wasi_ctx:: add_determinism_to_wasi_ctx_builder ( & mut wasi_builder) ;
147- let wasi = wasi_builder. build_p1 ( ) ;
103+ let wasi = io_handler. wasi ( ) ;
104+ if wasi. is_some ( ) {
105+ wasmtime_wasi:: preview1:: add_to_linker_sync ( & mut linker, |ctx : & mut FunctionContext | {
106+ ctx. wasi . as_mut ( ) . expect ( "Should have WASI context" )
107+ } ) ?;
108+ deterministic_wasi_ctx:: replace_scheduling_functions ( & mut linker) ?;
109+ }
110+
148111 let function_context = FunctionContext :: new ( wasi) ;
149112 let mut store = Store :: new ( & engine, function_context) ;
150113 store. limiter ( |s| & mut s. limiter ) ;
114+
115+ io_handler. initialize ( & engine, & mut linker, & mut store) ?;
116+
151117 store. set_fuel ( STARTING_FUEL ) ?;
152118 store. set_epoch_deadline ( 1 ) ;
153119
154- import_modules ( & module, & engine, & mut linker, & mut store) ;
155-
156- linker. module ( & mut store, "Function" , & module) ?;
157- let instance = linker. instantiate ( & mut store, & module) ?;
120+ let instance = linker. instantiate ( & mut store, io_handler. module ( ) ) ?;
158121
159122 let func = instance. get_typed_func :: < ( ) , ( ) > ( store. as_context_mut ( ) , export) ?;
160123
@@ -163,7 +126,6 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
163126 . frequency ( profile_opts. interval )
164127 . weight_unit ( wasmprof:: WeightUnit :: Fuel )
165128 . profile ( |store| func. call ( store. as_context_mut ( ) , ( ) ) ) ;
166-
167129 (
168130 result,
169131 Some ( profile_data. into_collapsed_stacks ( ) . to_string ( ) ) ,
@@ -191,18 +153,14 @@ pub fn run(params: FunctionRunParams) -> Result<FunctionRunResult> {
191153 }
192154 }
193155
194- drop ( store) ;
195-
196- let mut logs = error_stream
197- . try_into_inner ( )
198- . expect ( "Log stream reference still exists" ) ;
156+ let OutputAndLogs {
157+ output : raw_output,
158+ mut logs,
159+ } = io_handler. finalize ( store) ?;
199160
200161 logs. extend_from_slice ( error_logs. as_bytes ( ) ) ;
201162
202163 let output_codec = input. codec ;
203- let raw_output = output_stream
204- . try_into_inner ( )
205- . expect ( "Output stream reference still exists" ) ;
206164 let output = BytesContainer :: new (
207165 BytesContainerType :: Output ,
208166 output_codec,
0 commit comments