diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml
index e7a5b58..c439811 100644
--- a/.github/workflows/main.yml
+++ b/.github/workflows/main.yml
@@ -60,9 +60,37 @@ jobs:
run: ./gradlew --stacktrace --no-problems-report native-cli:distro
shell: bash
+ # Install Python build dependencies (setuptools/wheel may be missing on Windows runners)
+ - name: Install Python build dependencies
+ run: python3 -m pip install --upgrade setuptools wheel
+ shell: bash
+
+ # Generate native-lib python wheel
+ - name: Create Native Lib Python Wheel
+ run: ./gradlew --stacktrace --no-problems-report native-lib:buildPythonWheel
+ shell: bash
+
# Upload the artifact file
- name: Upload generated script
uses: actions/upload-artifact@v4
with:
name: dw-${{env.NATIVE_VERSION}}-${{runner.os}}
path: native-cli/build/distributions/native-cli-${{env.NATIVE_VERSION}}-native-distro-${{ matrix.script_name }}.zip
+
+ # Upload the Python wheel
+ - name: Upload Python wheel
+ uses: actions/upload-artifact@v4
+ with:
+ name: dw-python-wheel-${{env.NATIVE_VERSION}}-${{runner.os}}
+ path: native-lib/python/dist/dataweave_native-0.0.1-py3-*.whl
+
+ # Upload the native shared library + header together per OS
+ - name: Upload native shared library
+ uses: actions/upload-artifact@v4
+ with:
+ name: dwlib-${{env.NATIVE_VERSION}}-${{runner.os}}
+ path: |
+ native-lib/python/src/dataweave/native/dwlib.dylib
+ native-lib/python/src/dataweave/native/dwlib.so
+ native-lib/python/src/dataweave/native/dwlib.dll
+ native-lib/python/src/dataweave/native/dwlib.h
diff --git a/.gitignore b/.gitignore
index 6091d76..011a236 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,4 +19,8 @@ out/
.DS_Store
# GraalVM
-.graalvm
\ No newline at end of file
+.graalvm
+
+grimoires/
+
+.windsurf/
diff --git a/build.gradle b/build.gradle
index 9792b28..944d683 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,3 +1,13 @@
+buildscript {
+ repositories {
+ gradlePluginPortal()
+ mavenCentral()
+ }
+ dependencies {
+ classpath "org.graalvm.buildtools.native:org.graalvm.buildtools.native.gradle.plugin:0.11.2"
+ }
+}
+
plugins {
id "scala"
id "maven-publish"
@@ -11,6 +21,8 @@ subprojects {
apply plugin: 'maven-publish'
apply plugin: 'scala'
+ apply plugin: 'org.graalvm.buildtools.native'
+
group = 'org.mule.weave.native'
version = nativeVersion
@@ -21,8 +33,8 @@ subprojects {
compileJava {
- sourceCompatibility = '11'
- targetCompatibility = '11'
+ sourceCompatibility = '17'
+ targetCompatibility = '17'
}
repositories {
diff --git a/gradle.properties b/gradle.properties
index 01b5261..7b51347 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,10 +1,10 @@
-weaveVersion=2.11.0-20251023
-weaveTestSuiteVersion=2.11.0-20251023
+weaveVersion=2.12.0-SNAPSHOT
+weaveTestSuiteVersion=2.12.0-SNAPSHOT
nativeVersion=100.100.100
scalaVersion=2.12.18
-ioVersion=2.11.0-SNAPSHOT
+ioVersion=2.12.0-SNAPSHOT
graalvmVersion=24.0.2
-weaveSuiteVersion=2.11.0-20251023
+weaveSuiteVersion=2.12.0-SNAPSHOT
#Libaries
scalaTestVersion=3.2.15
scalaTestPluginVersion=0.33
diff --git a/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala b/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala
index 29a1ac0..66c23ba 100644
--- a/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala
+++ b/native-cli-integration-tests/src/test/scala/org/mule/weave/clinative/TCKCliTest.scala
@@ -259,6 +259,7 @@ class TCKCliTest extends AnyFunSpec with Matchers
"lazy_metadata_definition",
"module-singleton",
"multipart-write-binary",
+ "private_scope_directives",
"read-binary-files",
"underflow",
"try",
@@ -345,12 +346,16 @@ class TCKCliTest extends AnyFunSpec with Matchers
baseArray ++
Array(
"math-toRadians",
- "try-handle-lazy-values-with-failures"
+ "try-handle-lazy-values-with-failures",
+ "weave_ast_example",
+ "weave_ast_module"
)
} else if (versionString == "2.10") {
baseArray ++
Array(
- "try-handle-lazy-values-with-failures"
+ "try-handle-lazy-values-with-failures",
+ "weave_ast_example",
+ "weave_ast_module"
)
} else {
baseArray
diff --git a/native-cli/build.gradle b/native-cli/build.gradle
index b567687..5ce3b5e 100644
--- a/native-cli/build.gradle
+++ b/native-cli/build.gradle
@@ -2,8 +2,6 @@ plugins {
id "com.github.maiflai.scalatest" version "${scalaTestPluginVersion}"
id 'application'
- // Apply GraalVM Native Image plugin
- id 'org.graalvm.buildtools.native' version '0.11.2'
}
sourceSets {
diff --git a/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala b/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala
index 6a80b38..f5fbca8 100644
--- a/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala
+++ b/native-cli/src/main/scala/org/mule/weave/dwnative/NativeRuntime.scala
@@ -211,9 +211,3 @@ case class WeaveFailureResult(message: String) extends WeaveExecutionResult {
override def result(): String = message
}
-
-
-class CustomWeaveDataFormat(moduleManager: ModuleLoaderManager) extends WeaveDataFormat {
- override def createModuleLoader(): ModuleLoaderManager = moduleManager
-}
-
diff --git a/native-lib/.gitignore b/native-lib/.gitignore
new file mode 100644
index 0000000..0e845cc
--- /dev/null
+++ b/native-lib/.gitignore
@@ -0,0 +1,4 @@
+python/src/dataweave/native/
+python/src/dataweave_native.egg-info/
+python/dist/
+python/build/
diff --git a/native-lib/README.md b/native-lib/README.md
new file mode 100644
index 0000000..aa993d9
--- /dev/null
+++ b/native-lib/README.md
@@ -0,0 +1,222 @@
+# native-lib
+
+## Overview
+
+`native-lib` builds a **GraalVM native shared library** that embeds the MuleSoft **DataWeave runtime** and exposes a small C-compatible API.
+
+The main purpose is to allow non-JVM consumers (most notably the Python package in `native-lib/python`) to execute DataWeave scripts **without running a JVM**, while still using the official DataWeave runtime.
+
+## Architecture (GraalVM + FFI)
+
+```
+┌─────────────────────────────────────────────┐
+│ Python Process │
+│ │
+│ ┌────────────────────────────────────────┐ │
+│ │ Application Script │ │
+│ │ - Python: ctypes │ │
+│ └──────────────┬─────────────────────────┘ │
+│ │ │
+│ │ FFI Call │
+│ ▼ │
+│ ┌────────────────────────────────────────┐ │
+│ │ Native Shared Library (dwlib) │ │
+│ │ ┌──────────────────────────────────┐ │ │
+│ │ │ GraalVM Isolate │ │ │
+│ │ │ - NativeLib.run_script() │ │ │
+│ │ │ - DataWeave script execution │ │ │
+│ │ └──────────────────────────────────┘ │ │
+│ └────────────────────────────────────────┘ │
+└─────────────────────────────────────────────┘
+```
+
+## Building with Gradle
+
+### Prerequisites
+
+- A GraalVM distribution installed that includes `native-image`.
+- Enough memory for native-image (this build config uses `-J-Xmx6G`).
+
+### Build the shared library
+
+From the repository root:
+
+```bash
+./gradlew :native-lib:nativeCompile
+```
+
+The shared library is produced under:
+
+- `native-lib/build/native/nativeCompile/`
+
+and is named:
+
+- macOS: `dwlib.dylib`
+- Linux: `dwlib.so`
+- Windows: `dwlib.dll`
+
+### Stage the library into the Python package (dev workflow)
+
+```bash
+./gradlew :native-lib:stagePythonNativeLib
+```
+
+This copies `dwlib.*` into:
+
+- `native-lib/python/src/dataweave/native/`
+
+### Build a Python wheel (bundles the native library)
+
+```bash
+./gradlew :native-lib:buildPythonWheel
+```
+
+The wheel will be created in:
+
+- `native-lib/python/dist/`
+
+## Installing for use in a Python project
+
+### Option A: Install the produced wheel (recommended)
+
+After `:native-lib:buildPythonWheel`:
+
+```bash
+python3 -m pip install native-lib/python/dist/dataweave_native-0.0.1-*.whl
+```
+
+This wheel includes the `dwlib.*` shared library inside the Python package.
+
+### Option B: Editable install for development
+
+1. Stage the native library:
+
+```bash
+./gradlew :native-lib:stagePythonNativeLib
+```
+
+2. Install the Python package in editable mode:
+
+```bash
+python3 -m pip install -e native-lib/python
+```
+
+### Option C: Use an externally-built library via an environment variable
+
+If you want to point Python at a specific built artifact, set:
+
+- `DATAWEAVE_NATIVE_LIB=/absolute/path/to/dwlib.(dylib|so|dll)`
+
+The Python module will also try a few fallbacks (including the wheel-bundled location).
+
+## Using the library (Python examples)
+
+All examples below assume:
+
+```python
+import dataweave
+```
+
+### 1) Simple script
+
+```python
+result = dataweave.run_script("2 + 2")
+assert result.success is True
+print(result.get_string()) # "4"
+```
+
+### 2) Script with inputs (no explicit `mimeType`)
+
+Inputs can be plain Python values. The wrapper auto-encodes them as JSON or text.
+
+```python
+result = dataweave.run_script(
+ "num1 + num2",
+ {"num1": 25, "num2": 17},
+)
+print(result.get_string()) # "42"
+```
+
+### 3) Script with inputs (explicit `mimeType`, `charset`, `properties`)
+
+Use an explicit input dict when you need full control over how DataWeave interprets bytes.
+
+```python
+script = "payload.person"
+xml_bytes = b"Billy31".decode("utf-8").encode("utf-16")
+
+result = dataweave.run_script(
+ script,
+ {
+ "payload": {
+ "content": xml_bytes,
+ "mimeType": "application/xml",
+ "charset": "UTF-16",
+ "properties": {
+ "nullValueOn": "empty",
+ "maxAttributeSize": 256
+ },
+ }
+ },
+)
+
+if result.success:
+ print(result.get_string())
+else:
+ print(result.error)
+```
+
+You can also use `InputValue` for the same purpose:
+
+```python
+input_value = dataweave.InputValue(
+ content="1234567",
+ mimeType="application/csv",
+ properties={"header": False, "separator": "4"},
+)
+
+result = dataweave.run_script("in0.column_1[0]", {"in0": input_value})
+print(result.get_string()) # '"567"'
+```
+
+### 4) Reusing a DataWeave context to run multiple scripts quicker
+
+Creating an isolate/runtime has overhead. For repeated executions, reuse a single `DataWeave` instance:
+
+```python
+with dataweave.DataWeave() as dw:
+ r1 = dw.run("2 + 2")
+ r2 = dw.run("x + y", {"x": 10, "y": 32})
+
+ print(r1.get_string()) # "4"
+ print(r2.get_string()) # "42"
+```
+
+### 5) Error handling
+
+There are two common classes of errors:
+
+- The native library cannot be located/loaded.
+- Script compilation/execution fails (reported as an unsuccessful `ExecutionResult`).
+
+```python
+try:
+ result = dataweave.run_script("invalid syntax here")
+
+ if not result.success:
+ raise dataweave.DataWeaveError(result.error or "Unknown DataWeave error")
+
+ print(result.get_string())
+
+except dataweave.DataWeaveLibraryNotFoundError as e:
+ # Build it (and/or install a wheel) first.
+ # Example build command (from repo root): ./gradlew :native-lib:nativeCompile
+ raise
+
+except dataweave.DataWeaveError:
+ raise
+
+finally:
+ # Optional: if you used the global API and want to force cleanup
+ dataweave.cleanup()
+```
diff --git a/native-lib/build.gradle b/native-lib/build.gradle
new file mode 100644
index 0000000..8746d15
--- /dev/null
+++ b/native-lib/build.gradle
@@ -0,0 +1,115 @@
+dependencies {
+ api group: 'org.mule.weave', name: 'runtime', version: weaveVersion
+ api group: 'org.mule.weave', name: 'core-modules', version: weaveVersion
+
+ implementation group: 'org.mule.weave', name: 'parser', version: weaveVersion
+ implementation group: 'org.mule.weave', name: 'wlang', version: weaveVersion
+ compileOnly group: 'org.graalvm.sdk', name: 'graal-sdk', version: graalvmVersion
+ compileOnly group: 'org.graalvm.nativeimage', name: 'svm', version: graalvmVersion
+
+ implementation "org.scala-lang:scala-library:${scalaVersion}"
+
+ testImplementation platform('org.junit:junit-bom:5.10.0')
+ testImplementation 'org.junit.jupiter:junit-jupiter'
+ testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
+}
+
+test {
+ useJUnitPlatform()
+}
+
+tasks.matching { it.name == 'nativeCompileClasspathJar' }.configureEach { t ->
+ t.exclude('META-INF/services/org.mule.weave.v2.module.DataFormat')
+ t.from("${projectDir}/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat") {
+ into('META-INF/services')
+ rename { 'org.mule.weave.v2.module.DataFormat' }
+ }
+ t.exclude('META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader')
+ t.from("${projectDir}/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader") {
+ into('META-INF/services')
+ rename { 'org.mule.weave.v2.parser.phase.ModuleLoader' }
+ }
+}
+
+// Configure GraalVM native-image to build a shared library
+graalvmNative {
+// toolchainDetection = true
+ binaries {
+ main {
+ sharedLibrary = true
+ debug = true
+ verbose = true
+ fallback = false
+ //agent = false
+ useFatJar = true
+ //buildArgs.add('-Ob') // quick build mode to speed up builds during development
+ buildArgs.add('--no-fallback')
+ buildArgs.add('-H:Name=dwlib')
+ buildArgs.add('--verbose')
+ buildArgs.add("--report-unsupported-elements-at-runtime")
+ buildArgs.add("-J-Xmx6G")
+
+ buildArgs.add("-H:+ReportExceptionStackTraces")
+ buildArgs.add("-H:+UnlockExperimentalVMOptions")
+ buildArgs.add("--initialize-at-build-time=sun.instrument.InstrumentationImpl")
+ buildArgs.add("-H:DeadlockWatchdogInterval=1000")
+ buildArgs.add("-H:CompilationExpirationPeriod=0")
+ buildArgs.add("-H:+AddAllCharsets")
+ buildArgs.add("-H:+IncludeAllLocales")
+ // Pass project directory as system property for header path resolution
+ buildArgs.add("-Dproject.root=${projectDir}")
+ }
+ }
+}
+
+def pythonExe = (project.findProperty('pythonExe') ?: 'python3') as String
+
+tasks.register('stagePythonNativeLib', Copy) {
+ dependsOn tasks.named('nativeCompile')
+ from("${buildDir}/native/nativeCompile") {
+ include('dwlib.*')
+ }
+ into("${projectDir}/python/src/dataweave/native")
+}
+
+tasks.register('buildPythonWheel', Exec) {
+ dependsOn tasks.named('stagePythonNativeLib')
+ workingDir("${projectDir}/python")
+ // Track inputs so Gradle rebuilds when native lib or Python sources change
+ inputs.dir("${projectDir}/python/src/dataweave")
+ inputs.file("${projectDir}/python/setup.py")
+ inputs.file("${projectDir}/python/setup.cfg")
+ inputs.file("${projectDir}/python/pyproject.toml")
+ outputs.dir("${projectDir}/python/dist")
+ doFirst {
+ // Clean old wheels and build artifacts to ensure fresh build
+ delete("${projectDir}/python/dist")
+ delete("${projectDir}/python/build")
+ file("${projectDir}/python/dist").mkdirs()
+ }
+ // Use setup.py bdist_wheel to generate platform-specific wheel tags
+ // The custom setup.py overrides bdist_wheel to set correct platform tags
+ // for the bundled native library (dwlib)
+ commandLine(pythonExe, 'setup.py', 'bdist_wheel', '-d', 'dist')
+}
+
+tasks.register('pythonTest', Exec) {
+ if (project.findProperty('skipPythonTests')?.toString()?.toBoolean() == true) {
+ enabled = false
+ }
+
+ dependsOn tasks.named('stagePythonNativeLib')
+ workingDir("${projectDir}/python")
+ commandLine(pythonExe, 'tests/test_dataweave_module.py')
+}
+
+tasks.named('test') {
+ dependsOn tasks.named('pythonTest')
+}
+
+tasks.named('clean') {
+ delete("${projectDir}/python/dist")
+ delete("${projectDir}/python/build")
+ delete("${projectDir}/python/src/dataweave/native")
+ delete("${projectDir}/python/src/dataweave_native.egg-info")
+}
diff --git a/native-lib/example_dataweave_module.py b/native-lib/example_dataweave_module.py
new file mode 100755
index 0000000..d1740a2
--- /dev/null
+++ b/native-lib/example_dataweave_module.py
@@ -0,0 +1,200 @@
+#!/usr/bin/env python3
+"""
+Example demonstrating the simplified DataWeave Python module.
+
+This shows how easy it is to use DataWeave without dealing with
+any GraalVM or native library complexity.
+"""
+
+import sys
+from pathlib import Path
+
+_PYTHON_SRC_DIR = Path(__file__).resolve().parent / "python" / "src"
+sys.path.insert(0, str(_PYTHON_SRC_DIR))
+
+import dataweave
+
+def example_simple_functions():
+ """Example using simple function API"""
+ print("="*70)
+ print("Example 1: Simple Function API")
+ print("="*70)
+
+ ok = True
+
+ # Simple script execution
+ print("\n[*] Simple arithmetic:")
+ script = "2 + 2"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "4") and ok
+
+ print("\n[*] Square root:")
+ script = "sqrt(144)"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "12") and ok
+
+ print("\n[*] Array operations:")
+ script = "[1, 2, 3] map $ * 2"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "[\n 2, \n 4, \n 6\n]") and ok
+
+ print("\n[*] String operations:")
+ script = "upper('hello world')"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, '"HELLO WORLD"') and ok
+
+ # Script with inputs (simple values - auto-converted)
+ print("\n[*] Script with inputs (auto-converted):")
+ script = "num1 + num2"
+ result = dataweave.run_script(script, {"num1": 25, "num2": 17})
+ ok = assert_result(script, result, "42") and ok
+
+ # Script with complex inputs
+ print("\n[*] Script with complex object:")
+ script = "payload.name"
+ result = dataweave.run_script(script, {"payload": {"content": '{"name": "John", "age": 30}', "mimeType": "application/json"}})
+ ok = assert_result(script, result, '"John"') and ok
+
+ # Script with mixed input types
+ print("\n[*] Script with mixed input types:")
+ script = "greeting ++ ' ' ++ payload.name"
+ result = dataweave.run_script(script, {"greeting": "Hello", "payload": {"content": '{"name": "Alice", "role": "Developer"}', "mimeType": "application/json"}})
+ ok = assert_result(script, result, '"Hello Alice"') and ok
+
+ # Binary output
+ print("\n[*] Binary output:")
+ script = "output application/octet-stream\n---\ndw::core::Binaries::fromBase64(\"holamund\")"
+ result = dataweave.run_script(script)
+ ok = assert_result(script, result, "holamund") and ok
+
+ # Script with InputValue
+ print("\n[*] Inputs:")
+ input_value = dataweave.InputValue(
+ content="1234567",
+ mimeType="application/csv",
+ properties={"header": False, "separator": "4"}
+ )
+ script = "in0.column_1[0]"
+ result = dataweave.run_script(script, {"in0": input_value})
+ ok = assert_result(script, result, '"567"') and ok
+
+ # Cleanup when done
+ dataweave.cleanup()
+ print("\n[OK] Cleanup completed")
+
+ return ok
+
+
+def assert_result(script, result, expected):
+ print(f" {script} = {result}")
+ ok = result.get_string() == expected
+ if ok:
+ status = "[OK]"
+ else:
+ status = f"[FAIL] (expected: {expected})"
+ print(f" result as string = {result.get_string()} {status}")
+ print(f" result as bytes = {result.get_bytes()}")
+ return ok
+
+
+def example_context_manager():
+ """Example using context manager (recommended)"""
+ print("\n" + "="*70)
+ print("Example 2: Context Manager API (Recommended)")
+ print("="*70)
+
+ ok = True
+
+ with dataweave.DataWeave() as dw:
+ print("\n[*] Multiple operations with same runtime:")
+
+ script = "2 + 2"
+ result = dw.run(script)
+ ok = assert_result(script, result, "4") and ok
+
+ script = "x + y + z"
+ result = dw.run(script, {"x": 1, "y": 2, "z": 3})
+ ok = assert_result(script, result, "6") and ok
+
+ script = "numbers map $ * multiplier"
+ result = dw.run(script, {"numbers": [1, 2, 3, 4, 5], "multiplier": 10})
+ ok = assert_result(script, result, "[\n 10, \n 20, \n 30, \n 40, \n 50\n]") and ok
+
+ print("\n[OK] Context manager automatically cleaned up resources")
+
+ return ok
+
+
+def example_explicit_format():
+ """Example using explicit content/mimeType format"""
+ print("\n" + "="*70)
+ print("Example 3: Explicit Format (Advanced)")
+ print("="*70)
+
+ print("\n[*] Using explicit content and mimeType:")
+
+ ok = True
+
+ script = "payload.message"
+ result = dataweave.run_script(script, {"payload": {"content": '{"message": "Hello from JSON!", "value": 42}', "mimeType": "application/json"}})
+ ok = assert_result(script, result, '"Hello from JSON!"') and ok
+
+ script = "payload.value + offset"
+ result = dataweave.run_script(script, {"payload": {"content": '{"value": 100}', "mimeType": "application/json"}, "offset": 50})
+ ok = assert_result(script, result, "150") and ok
+
+ return ok
+
+
+def example_error_handling():
+ """Example with error handling"""
+ print("\n" + "="*70)
+ print("Example 4: Error Handling")
+ print("="*70)
+
+ try:
+ print("\n[*] Invalid script (will show error):")
+ result = dataweave.run_script("invalid syntax here", {})
+ print(f" Result: {result} {'[OK]' if result.success == False else '[FAIL]'}")
+
+ except dataweave.DataWeaveLibraryNotFoundError as e:
+ print(f"[ERROR] Library not found: {e}")
+ print(" Please build the library first: ./gradlew nativeCompile")
+ except dataweave.DataWeaveError as e:
+ print(f"[ERROR] DataWeave error: {e}")
+
+
+def main():
+ """Run all examples"""
+ print("\n" + "="*70)
+ print("DataWeave Python Module - Examples")
+ print("="*70)
+ print("\nThis module abstracts all GraalVM/native complexity!")
+ print("Just import and use - no ctypes, no manual memory management.\n")
+
+ try:
+ all_ok = True
+ all_ok = example_simple_functions() and all_ok
+ all_ok = example_context_manager() and all_ok
+ all_ok = example_explicit_format() and all_ok
+ example_error_handling()
+
+ print("\n" + "="*70)
+ if all_ok:
+ print("[OK] All examples completed successfully!")
+ else:
+ print("[FAIL] One or more examples failed")
+ print("="*70)
+
+ except dataweave.DataWeaveLibraryNotFoundError as e:
+ print(f"\n[ERROR] {e}")
+ print("\nPlease build the native library first:")
+ print(" ./gradlew nativeCompile")
+ except Exception as e:
+ print(f"\n[ERROR] Unexpected error: {e}")
+ import traceback
+ traceback.print_exc()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/native-lib/example_streaming.py b/native-lib/example_streaming.py
new file mode 100755
index 0000000..b410568
--- /dev/null
+++ b/native-lib/example_streaming.py
@@ -0,0 +1,227 @@
+#!/usr/bin/env python3
+
+import sys
+from pathlib import Path
+
+_PYTHON_SRC_DIR = Path(__file__).resolve().parent / "python" / "src"
+sys.path.insert(0, str(_PYTHON_SRC_DIR))
+
+import dataweave
+import resource
+import psutil, os
+import threading
+import json
+import time
+
+def example_streaming_input_output_callback():
+ print("\nTesting streaming input and output using callbacks (square numbers)...")
+ try:
+ start_time = time.monotonic()
+ num_elements = 1_000_000 * 50
+
+ script = """output application/json deferred=true
+---
+payload map ($ * $)"""
+
+ # -- input generator (called by native on a background thread) --
+ input_iter = iter(range(num_elements))
+ input_started = False
+ input_done = False
+
+ def read_callback(buf_size):
+ nonlocal input_started, input_done
+ if input_done:
+ return b""
+ parts = []
+ if not input_started:
+ parts.append(b"[")
+ input_started = True
+ remaining = buf_size - sum(len(p) for p in parts)
+ try:
+ while remaining > 0:
+ i = next(input_iter)
+ token = (b"," if i > 0 else b"") + str(i).encode("utf-8")
+ if len(token) > remaining:
+ # put it back via a wrapper – simpler: just include it and break
+ parts.append(token)
+ break
+ parts.append(token)
+ remaining -= len(token)
+ except StopIteration:
+ parts.append(b"]")
+ input_done = True
+ return b"".join(parts)
+
+ # -- output collector (called by native on the calling thread) --
+ chunk_count = 0
+ total_bytes = 0
+
+ def write_callback(data):
+ nonlocal chunk_count, total_bytes
+ chunk_count += 1
+ total_bytes += len(data)
+ if chunk_count % 5000 == 0:
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f"--- chunk {chunk_count}: {len(data)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+ return 0
+
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f">>> Before run_input_output_callback, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+
+ result = dataweave.run_input_output_callback(
+ script,
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=read_callback,
+ write_callback=write_callback,
+ input_charset="utf-8",
+ )
+
+ if not result.get("success", False):
+ raise Exception(result.get("error", "Unknown error"))
+
+ peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576
+ elapsed = time.monotonic() - start_time
+ mins, secs = divmod(elapsed, 60)
+ print(f"\n[OK] Streaming input/output callback done ({chunk_count} chunks, {total_bytes / 1048576:.1f} MB, {num_elements:,} elements) - Time: {int(mins)}:{secs:06.3f}")
+ print(f"Peak memory (max RSS): {peak_rss:.1f} MB")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming input/output callback failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+
+def example_streaming_input_output():
+ print("\nTesting streaming input and output (square numbers)...")
+ try:
+ start_time = time.monotonic()
+ num_elements = 1_000_000 * 50
+ chunk_size = 1024 * 64
+
+ input_stream = dataweave.open_input_stream("application/json", "utf-8")
+
+ script = """output application/json deferred=true
+---
+payload map ($ * $)"""
+
+ def feed_input():
+ try:
+ input_stream.write(b"[")
+ for i in range(num_elements):
+ if i > 0:
+ input_stream.write(b",")
+ input_stream.write(str(i).encode("utf-8"))
+ input_stream.write(b"]")
+ finally:
+ input_stream.close()
+
+ feeder = threading.Thread(target=feed_input, daemon=True)
+ feeder.start()
+
+ with dataweave.run_stream(script, inputs={"payload": input_stream}) as stream:
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f">>> Output mimeType={stream.mimeType}, charset={stream.charset}, binary={stream.binary}, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+ chunk_count = 0
+ total_bytes = 0
+ while True:
+ chunk = stream.read(chunk_size)
+ if not chunk:
+ break
+ chunk_count += 1
+ total_bytes += len(chunk)
+ if chunk_count % 5000 == 0:
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ print(f"--- chunk {chunk_count}: {len(chunk)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+
+ feeder.join()
+
+ peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576
+ elapsed = time.monotonic() - start_time
+ mins, secs = divmod(elapsed, 60)
+ print(f"\n[OK] Streaming input/output done ({chunk_count} chunks, {total_bytes/ 1048576:.1f} MB, {num_elements:,} elements) - Time: {int(mins)}:{secs:06.3f}")
+ print(f"Peak memory (max RSS): {peak_rss:.1f} MB")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming input/output failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def example_streaming_output_larger_than_memory():
+ print("\nTesting streaming output larger than memory...")
+ try:
+ script = """output application/json deferred=true
+---
+{items: (1 to pow(1000, 2)*10) map {id: $, name: "item_" ++ $}}"""
+ with dataweave.run_stream(script) as stream:
+ print(f">>> Output mimeType={stream.mimeType}, charset={stream.charset}, binary={stream.binary}")
+ chunk_count = 0
+ total_bytes = 0
+ while True:
+ chunk = stream.read(1024*1024*10) # deferred=true uses 8k chunks
+ if not chunk:
+ break
+ chunk_count += 1
+ total_bytes += len(chunk)
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ # print script output
+# sys.stdout.write(chunk.decode(stream.charset or "utf-8"))
+# sys.stdout.flush()
+ print(f"--- chunk {chunk_count}: {len(chunk)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+ peak_rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1048576
+ print(f"\n[OK] Streaming done ({chunk_count} chunks, {total_bytes} bytes)")
+ print(f"Peak memory (max RSS): {peak_rss:.1f} MB")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming chunked read failed: {e}")
+ return False
+
+
+def example_streaming_adding_chunks():
+ print("\nTesting streaming adding chunks...")
+ try:
+ script = """output application/json deferred=true
+---
+{items: (1 to pow(1000, 2)*10) map {id: $, name: "item_" ++ $}}"""
+ with dataweave.run_stream(script) as stream:
+ print(f">>> Output mimeType={stream.mimeType}, charset={stream.charset}, binary={stream.binary}")
+ chunks = []
+ total_bytes = 0
+ while True:
+ chunk = stream.read(1024*1024*16)
+ if not chunk:
+ break
+ total_bytes += len(chunk)
+ chunks.append(chunk)
+ usage = resource.getrusage(resource.RUSAGE_SELF)
+ current_rss = psutil.Process(os.getpid()).memory_info().rss
+ if len(chunks) % 1000 == 0:
+ print(f"--- chunk {len(chunks)}: {len(chunk)} bytes, total: {total_bytes / 1048576:.1f} MB, Max RSS: {usage.ru_maxrss / 1048576:.1f} MB, Current RSS: {current_rss / 1048576:.1f} MB ---")
+ full = b"".join(chunks)
+ assert len(chunks) > 1, f"Expected multiple chunks, got {len(chunks)}"
+ assert b"item_1" in full, "Expected 'item_1' in result"
+ assert b"item_100" in full, "Expected 'item_100' in result"
+ print(f"\n[OK] Streaming chunked read works ({len(chunks)} chunks)")
+ #print(f"\n[OK] Streaming done ({chunk_count} chunks, {total_bytes} bytes)")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming chunked read failed: {e}")
+ return False
+
+
+def main():
+ example_streaming_input_output_callback()
+# example_streaming_input_output()
+# example_streaming_output_larger_than_memory()
+# example_streaming_adding_chunks()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/native-lib/python/pyproject.toml b/native-lib/python/pyproject.toml
new file mode 100644
index 0000000..642ab3c
--- /dev/null
+++ b/native-lib/python/pyproject.toml
@@ -0,0 +1,3 @@
+[build-system]
+requires = ["setuptools>=68", "wheel"]
+build-backend = "setuptools.build_meta"
diff --git a/native-lib/python/setup.cfg b/native-lib/python/setup.cfg
new file mode 100644
index 0000000..a1635ae
--- /dev/null
+++ b/native-lib/python/setup.cfg
@@ -0,0 +1,18 @@
+[metadata]
+name = dataweave-native
+version = 0.0.1
+description = Python bindings for the DataWeave native library
+
+[options]
+package_dir =
+ = src
+packages = find:
+include_package_data = True
+python_requires = >=3.9
+
+[options.packages.find]
+where = src
+
+[options.package_data]
+dataweave =
+ native/*
diff --git a/native-lib/python/setup.py b/native-lib/python/setup.py
new file mode 100644
index 0000000..8c32d49
--- /dev/null
+++ b/native-lib/python/setup.py
@@ -0,0 +1,109 @@
+"""
+Custom setup.py to build platform-specific wheels for dataweave-native.
+
+Since this package bundles a native shared library (dwlib), the wheel must
+be tagged with the correct platform (e.g., macosx_11_0_arm64, manylinux, etc.)
+rather than the generic 'any' platform.
+
+This is achieved by overriding the bdist_wheel command to set platform-specific
+tags based on the current build environment.
+"""
+
+import platform
+import struct
+import sys
+
+from setuptools import setup
+
+try:
+ from wheel.bdist_wheel import bdist_wheel as _bdist_wheel
+except ImportError:
+ _bdist_wheel = None
+
+
+def get_platform_tag():
+ """
+ Determine the platform tag for the wheel based on the current system.
+ """
+ system = platform.system().lower()
+ machine = platform.machine().lower()
+
+ # Normalize machine architecture names
+ if machine in ("x86_64", "amd64"):
+ machine = "x86_64"
+ elif machine in ("arm64", "aarch64"):
+ machine = "arm64"
+ elif machine in ("i386", "i686"):
+ machine = "i686"
+
+ if system == "darwin":
+ # macOS: use macosx_11_0 as minimum for universal compatibility
+ # Adjust based on actual deployment target if needed
+ mac_ver = platform.mac_ver()[0]
+ if mac_ver:
+ parts = mac_ver.split(".")
+ major = int(parts[0])
+ minor = int(parts[1]) if len(parts) > 1 else 0
+ # Use at least 11.0 for arm64, 10.9 for x86_64
+ if machine == "arm64":
+ major = max(major, 11)
+ minor = 0
+ else:
+ major = max(major, 10)
+ minor = max(minor, 9) if major == 10 else 0
+ else:
+ major, minor = (11, 0) if machine == "arm64" else (10, 9)
+
+ return f"macosx_{major}_{minor}_{machine}"
+
+ elif system == "linux":
+ # Linux: use manylinux2014 for broad compatibility
+ # manylinux2014 supports glibc 2.17+
+ return f"manylinux2014_{machine}"
+
+ elif system == "windows":
+ # Windows: win_amd64 or win32
+ bits = struct.calcsize("P") * 8
+ if bits == 64:
+ return "win_amd64"
+ else:
+ return "win32"
+
+ else:
+ # Fallback: use the platform module's platform tag
+ return None
+
+
+if _bdist_wheel is not None:
+
+ class bdist_wheel(_bdist_wheel):
+ """
+ Custom bdist_wheel that forces platform-specific tags for native library wheels.
+ """
+
+ def finalize_options(self):
+ super().finalize_options()
+ # Mark as platform-specific (not pure Python)
+ self.root_is_pure = False
+
+ def get_tag(self):
+ # Get the default tags
+ python, abi, plat = super().get_tag()
+
+ # Override with platform-specific tag
+ platform_tag = get_platform_tag()
+ if platform_tag:
+ plat = platform_tag
+
+ # Use py3 and none for Python/ABI since we don't have compiled Python extensions
+ return "py3", "none", plat
+
+else:
+ bdist_wheel = None
+
+
+cmdclass = {}
+if bdist_wheel is not None:
+ cmdclass["bdist_wheel"] = bdist_wheel
+
+setup(cmdclass=cmdclass)
diff --git a/native-lib/python/src/dataweave/__init__.py b/native-lib/python/src/dataweave/__init__.py
new file mode 100644
index 0000000..041ef6f
--- /dev/null
+++ b/native-lib/python/src/dataweave/__init__.py
@@ -0,0 +1,978 @@
+"""
+DataWeave Python Module
+
+A simple Python wrapper for executing DataWeave scripts via the native library.
+This module abstracts all GraalVM and native library complexity, providing a
+clean Python API for executing DataWeave scripts with or without inputs.
+
+Basic Usage:
+ import dataweave
+
+ result = dataweave.run_script("2 + 2")
+ print(result.get_string())
+
+ # Call cleanup() when done to release native resources
+ dataweave.cleanup()
+
+Using context manager (recommended for automatic cleanup):
+ from dataweave import DataWeave
+
+ with DataWeave() as dw:
+ result = dw.run("2 + 2")
+ print(result.get_string())
+ # Resources are automatically released when exiting the 'with' block
+
+Handling errors:
+ import dataweave
+
+ result = dataweave.run_script("1 / 0")
+ if not result.success:
+ print(f"Error: {result.error}")
+ else:
+ print(result.get_string())
+"""
+
+import base64
+import ctypes
+import io
+import json
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any, Dict, Iterator, Optional, Union
+
+
+class DataWeaveError(Exception):
+ pass
+
+
+class DataWeaveLibraryNotFoundError(Exception):
+ pass
+
+
+_DEFAULT_CHUNK_SIZE = 8192
+
+# ctypes callback signatures matching NativeCallbacks.WriteCallback / ReadCallback.
+# Buffer parameters use c_void_p (not c_char_p) because ctypes gives c_char_p
+# special treatment that prevents writing into the buffer.
+# int (*WriteCallback)(void *ctx, const char *buffer, int length)
+WRITE_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int)
+# int (*ReadCallback)(void *ctx, char *buffer, int bufferSize)
+READ_CALLBACK = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int)
+
+
+_ENV_NATIVE_LIB = "DATAWEAVE_NATIVE_LIB"
+
+
+@dataclass
+class InputValue:
+ content: Union[str, bytes]
+ mimeType: Optional[str] = None
+ charset: Optional[str] = None
+ properties: Optional[Dict[str, Union[str, int, bool]]] = None
+
+ def encode_content(self) -> str:
+ if isinstance(self.content, bytes):
+ raw = self.content
+ else:
+ raw = self.content.encode(self.charset or "utf-8")
+ return base64.b64encode(raw).decode("ascii")
+
+
+@dataclass
+class ExecutionResult:
+ success: bool
+ result: Optional[str]
+ error: Optional[str]
+ binary: bool
+ mimeType: Optional[str]
+ charset: Optional[str]
+
+ def get_bytes(self) -> Optional[bytes]:
+ if not self.success or self.result is None:
+ return None
+ return base64.b64decode(self.result)
+
+ def get_string(self) -> Optional[str]:
+ if not self.success or self.result is None:
+ return None
+ if self.binary:
+ return self.result
+ return self.get_bytes().decode(self.charset or "utf-8")
+
+
+class DataWeaveStream(io.RawIOBase):
+ """A file-like stream that reads script execution results from the native library
+ without loading the entire output into memory.
+
+ Implements :class:`io.RawIOBase` so it can be wrapped with
+ :func:`io.BufferedReader` or used anywhere a binary file-like object is expected.
+
+ Usage::
+
+ with dw.run_stream("output application/json --- payload") as stream:
+ for chunk in stream:
+ process(chunk)
+
+ The stream **must** be closed (or used as a context manager) to release native
+ resources. Metadata (``mimeType``, ``charset``, ``binary``) is available as
+ attributes immediately after creation.
+ """
+
+ def __init__(self, lib, thread, handle: int, metadata: dict):
+ super().__init__()
+ self._lib = lib
+ self._thread = thread
+ self._handle = handle
+ self.mimeType: Optional[str] = metadata.get("mimeType")
+ self.charset: Optional[str] = metadata.get("charset")
+ self.binary: bool = bool(metadata.get("binary", False))
+ self._closed_native = False
+
+ # ── io.RawIOBase interface ──────────────────────────────────────────
+
+ def readable(self) -> bool:
+ return True
+
+ def readinto(self, b) -> Optional[int]:
+ """Read up to ``len(b)`` bytes into the pre-allocated buffer *b*."""
+ if self.closed:
+ raise ValueError("I/O operation on closed stream")
+ buf = (ctypes.c_char * len(b))()
+ n = self._lib.run_script_read(self._thread, self._handle, buf, len(b))
+ if n <= 0:
+ return 0
+ b[:n] = buf[:n]
+ return n
+
+ def read(self, size: int = -1) -> bytes:
+ """Read up to *size* bytes. ``-1`` reads until EOF."""
+ if self.closed:
+ raise ValueError("I/O operation on closed stream")
+ if size == 0:
+ return b""
+ if size < 0:
+ chunks = []
+ while True:
+ chunk = self.read(_DEFAULT_CHUNK_SIZE)
+ if not chunk:
+ break
+ chunks.append(chunk)
+ return b"".join(chunks)
+ buf = ctypes.create_string_buffer(size)
+ n = self._lib.run_script_read(self._thread, self._handle, buf, size)
+ if n <= 0:
+ return b""
+ return buf.raw[:n]
+
+ def close(self):
+ if not self._closed_native and self._handle is not None:
+ try:
+ self._lib.run_script_close(self._thread, self._handle)
+ except Exception:
+ pass
+ self._closed_native = True
+ super().close()
+
+ # ── convenience helpers ─────────────────────────────────────────────
+
+ def read_all_string(self) -> str:
+ """Read the full result and decode it as a string using the session charset."""
+ raw = self.read(-1)
+ return raw.decode(self.charset or "utf-8")
+
+ def __iter__(self) -> Iterator[bytes]:
+ """Iterate over the stream in chunks."""
+ while True:
+ chunk = self.read(_DEFAULT_CHUNK_SIZE)
+ if not chunk:
+ break
+ yield chunk
+
+
+class DataWeaveInputStream:
+ """Wraps a native input stream session handle, allowing the caller to
+ push data into the DW engine as an input binding.
+
+ The caller writes bytes via :meth:`write` and signals EOF via :meth:`close`.
+ This object is intended to be used from a **separate thread** that feeds
+ input while the main thread reads the output stream.
+
+ Usage::
+
+ dw = DataWeave()
+ dw.initialize()
+ input_handle = dw.open_input_stream("application/json")
+
+ def feed():
+ with open("large.json", "rb") as f:
+ while chunk := f.read(8192):
+ input_handle.write(chunk)
+ input_handle.close()
+
+ import threading
+ t = threading.Thread(target=feed)
+ t.start()
+
+ with dw.run_stream("payload", inputs={"payload": input_handle}) as out:
+ for chunk in out:
+ process(chunk)
+ t.join()
+ """
+
+ def __init__(self, lib, isolate, isolate_t_ptr, isolatethread_t_ptr, handle: int, mime_type: str, charset: Optional[str] = None):
+ self._lib = lib
+ self._isolate = isolate
+ self._isolate_t_ptr = isolate_t_ptr
+ self._isolatethread_t_ptr = isolatethread_t_ptr
+ self._handle = handle
+ self.mime_type: str = mime_type
+ self.charset: Optional[str] = charset
+ self._closed = False
+ self._attached_thread = None
+
+ @property
+ def handle(self) -> int:
+ """The native handle for this input stream session."""
+ return self._handle
+
+ def _ensure_thread_attached(self):
+ """Attach the current OS thread to the GraalVM isolate if not already attached.
+
+ Each OS thread that calls a ``@CEntryPoint`` function must have its own
+ ``IsolateThread`` token. This method calls ``graal_attach_thread`` to
+ obtain one for the feeder thread.
+ """
+ if self._attached_thread is not None:
+ return
+ self._lib.graal_attach_thread.argtypes = [
+ self._isolate_t_ptr,
+ ctypes.POINTER(self._isolatethread_t_ptr),
+ ]
+ self._lib.graal_attach_thread.restype = ctypes.c_int
+
+ thread = self._isolatethread_t_ptr()
+ rc = self._lib.graal_attach_thread(self._isolate, ctypes.byref(thread))
+ if rc != 0:
+ raise DataWeaveError(f"Failed to attach feeder thread to GraalVM isolate (error {rc})")
+ self._attached_thread = thread
+
+ def _detach_thread(self):
+ """Detach the feeder thread from the GraalVM isolate."""
+ if self._attached_thread is not None:
+ try:
+ self._lib.graal_detach_thread.argtypes = [self._isolatethread_t_ptr]
+ self._lib.graal_detach_thread.restype = ctypes.c_int
+ self._lib.graal_detach_thread(self._attached_thread)
+ except Exception:
+ pass
+ self._attached_thread = None
+
+ def write(self, data: bytes) -> None:
+ """Write bytes into the input stream.
+
+ :param data: the bytes to write
+ :raises DataWeaveError: on I/O failure or if the stream is closed
+ """
+ if self._closed:
+ raise DataWeaveError("Input stream is already closed")
+ if not data:
+ return
+ self._ensure_thread_attached()
+ buf = ctypes.create_string_buffer(data)
+ rc = self._lib.input_stream_write(self._attached_thread, self._handle, buf, len(data))
+ if rc != 0:
+ raise DataWeaveError("Failed to write to input stream")
+
+ def close(self) -> None:
+ """Close the write end of the pipe, signalling EOF to the DW engine."""
+ if not self._closed:
+ self._ensure_thread_attached()
+ self._lib.input_stream_close(self._attached_thread, self._handle)
+ self._closed = True
+ self._detach_thread()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.close()
+ return False
+
+
+def _parse_native_encoded_response(raw: str) -> ExecutionResult:
+ if raw is None:
+ return ExecutionResult(False, None, "Native returned null", False, None, None)
+
+ if raw == "":
+ return ExecutionResult(False, None, "Native returned empty response", False, None, None)
+
+ try:
+ parsed = json.loads(raw)
+ except Exception as e:
+ return ExecutionResult(False, None, f"Failed to parse native JSON response: {e}", False, None, None)
+
+ if not isinstance(parsed, dict):
+ return ExecutionResult(False, None, "Native response JSON is not an object", False, None, None)
+
+ success = bool(parsed.get("success", False))
+ if not success:
+ return ExecutionResult(False, None, parsed.get("error"), False, None, None)
+
+ return ExecutionResult(
+ success=True,
+ result=parsed.get("result"),
+ error=None,
+ binary=bool(parsed.get("binary", False)),
+ mimeType=parsed.get("mimeType"),
+ charset=parsed.get("charset"),
+ )
+
+
+def _candidate_library_paths() -> list[Path]:
+ paths: list[Path] = []
+
+ env_value = (__import__("os").environ.get(_ENV_NATIVE_LIB) or "").strip()
+ if env_value:
+ paths.append(Path(env_value))
+
+ pkg_dir = Path(__file__).resolve().parent
+ native_dir = pkg_dir / "native"
+ paths.append(native_dir / "dwlib.dylib")
+ paths.append(native_dir / "dwlib.so")
+ paths.append(native_dir / "dwlib.dll")
+
+ # Dev fallback: if this package is being used from the data-weave-cli repo
+ # tree, locate native-lib/build/native/nativeCompile.
+ for parent in pkg_dir.parents:
+ build_dir = parent / "build" / "native" / "nativeCompile"
+ if build_dir.exists():
+ paths.append(build_dir / "dwlib.dylib")
+ paths.append(build_dir / "dwlib.so")
+ paths.append(build_dir / "dwlib.dll")
+ break
+
+ # CWD fallback
+ paths.append(Path("dwlib.dylib"))
+ paths.append(Path("dwlib.so"))
+ paths.append(Path("dwlib.dll"))
+
+ return paths
+
+
+def _find_library() -> str:
+ for p in _candidate_library_paths():
+ if p.exists() and p.is_file():
+ return str(p)
+
+ raise DataWeaveLibraryNotFoundError(
+ "Could not find DataWeave native library (dwlib). "
+ f"Set {_ENV_NATIVE_LIB} to an absolute path or install a wheel that bundles the native library."
+ )
+
+
+def _normalize_input_value(value: Any, mime_type: Optional[str] = None) -> Dict[str, Any]:
+ if isinstance(value, dict):
+ allowed_keys = {"content", "mimeType", "charset", "properties"}
+ extra_keys = set(value.keys()) - allowed_keys
+ if extra_keys:
+ raise DataWeaveError(
+ "Explicit input dict contains unsupported keys: " + ", ".join(sorted(extra_keys))
+ )
+
+ if "content" in value or "mimeType" in value:
+ if "content" not in value or "mimeType" not in value:
+ raise DataWeaveError(
+ "Explicit input dict must include both 'content' and 'mimeType'"
+ )
+
+ raw_content = value.get("content")
+ charset = value.get("charset") or "utf-8"
+ if isinstance(raw_content, bytes):
+ encoded_content = base64.b64encode(raw_content).decode("ascii")
+ else:
+ encoded_content = base64.b64encode(str(raw_content).encode(charset)).decode("ascii")
+
+ normalized: Dict[str, Any] = {
+ "content": encoded_content,
+ "mimeType": value.get("mimeType"),
+ }
+ if "charset" in value:
+ normalized["charset"] = value.get("charset")
+ if "properties" in value:
+ normalized["properties"] = value.get("properties")
+ return normalized
+
+ if isinstance(value, InputValue):
+ out: Dict[str, Any] = {
+ "content": value.encode_content(),
+ "mimeType": value.mimeType or mime_type,
+ }
+ if value.charset is not None:
+ out["charset"] = value.charset
+ if value.properties is not None:
+ out["properties"] = value.properties
+ return out
+
+ if isinstance(value, str):
+ content = value
+ default_mime = "text/plain"
+ elif isinstance(value, (int, float, bool)):
+ content = json.dumps(value)
+ default_mime = "application/json"
+ elif value is None:
+ content = "null"
+ default_mime = "application/json"
+ else:
+ try:
+ content = json.dumps(value)
+ default_mime = "application/json"
+ except (TypeError, ValueError):
+ content = str(value)
+ default_mime = "text/plain"
+
+ charset = "utf-8"
+ encoded_content = base64.b64encode(content.encode(charset)).decode("ascii")
+
+ return {
+ "content": encoded_content,
+ "mimeType": mime_type or default_mime,
+ "charset": charset,
+ "properties": None,
+ }
+
+
+class DataWeave:
+ def __init__(self, lib_path: Optional[str] = None):
+ self._lib_path = lib_path or _find_library()
+ self._lib = None
+ self._isolate = None
+ self._thread = None
+ self._initialized = False
+
+ def _load_library(self):
+ try:
+ self._lib = ctypes.CDLL(self._lib_path)
+ except OSError as e:
+ raise DataWeaveError(f"Failed to load library from {self._lib_path}: {e}")
+
+ def _setup_graal_structures(self):
+ class graal_isolate_t(ctypes.Structure):
+ pass
+
+ class graal_isolatethread_t(ctypes.Structure):
+ pass
+
+ self._graal_isolate_t_ptr = ctypes.POINTER(graal_isolate_t)
+ self._graal_isolatethread_t_ptr = ctypes.POINTER(graal_isolatethread_t)
+
+ def _create_isolate(self):
+ self._lib.graal_create_isolate.argtypes = [
+ ctypes.c_void_p,
+ ctypes.POINTER(self._graal_isolate_t_ptr),
+ ctypes.POINTER(self._graal_isolatethread_t_ptr),
+ ]
+ self._lib.graal_create_isolate.restype = ctypes.c_int
+
+ self._isolate = self._graal_isolate_t_ptr()
+ self._thread = self._graal_isolatethread_t_ptr()
+
+ result = self._lib.graal_create_isolate(None, ctypes.byref(self._isolate), ctypes.byref(self._thread))
+ if result != 0:
+ raise DataWeaveError(f"Failed to create GraalVM isolate. Error code: {result}")
+
+ def _setup_functions(self):
+ if not hasattr(self._lib, "run_script"):
+ raise DataWeaveError("Native library does not export run_script")
+
+ self._lib.run_script.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ]
+ self._lib.run_script.restype = ctypes.c_void_p
+
+ if hasattr(self._lib, "free_cstring"):
+ self._lib.free_cstring.argtypes = [self._graal_isolatethread_t_ptr, ctypes.c_void_p]
+ self._lib.free_cstring.restype = None
+
+ # Streaming API
+ if hasattr(self._lib, "run_script_open"):
+ self._lib.run_script_open.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ]
+ self._lib.run_script_open.restype = ctypes.c_long
+
+ self._lib.run_script_read.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_long,
+ ctypes.c_char_p,
+ ctypes.c_int,
+ ]
+ self._lib.run_script_read.restype = ctypes.c_int
+
+ self._lib.run_script_metadata.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_long,
+ ]
+ self._lib.run_script_metadata.restype = ctypes.c_void_p
+
+ self._lib.run_script_stream_error.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_long,
+ ]
+ self._lib.run_script_stream_error.restype = ctypes.c_void_p
+
+ self._lib.run_script_close.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_long,
+ ]
+ self._lib.run_script_close.restype = None
+
+ self._has_streaming = True
+ else:
+ self._has_streaming = False
+
+ # Streaming Input API
+ if hasattr(self._lib, "input_stream_open"):
+ self._lib.input_stream_open.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ]
+ self._lib.input_stream_open.restype = ctypes.c_long
+
+ self._lib.input_stream_write.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_long,
+ ctypes.c_char_p,
+ ctypes.c_int,
+ ]
+ self._lib.input_stream_write.restype = ctypes.c_int
+
+ self._lib.input_stream_close.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_long,
+ ]
+ self._lib.input_stream_close.restype = ctypes.c_int
+
+ self._has_streaming_input = True
+ else:
+ self._has_streaming_input = False
+
+ # Callback-based Streaming API
+ if hasattr(self._lib, "run_script_callback"):
+ self._lib.run_script_callback.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ WRITE_CALLBACK,
+ ctypes.c_void_p,
+ ]
+ self._lib.run_script_callback.restype = ctypes.c_void_p
+
+ self._has_callback_streaming = True
+ else:
+ self._has_callback_streaming = False
+
+ if hasattr(self._lib, "run_script_input_output_callback"):
+ self._lib.run_script_input_output_callback.argtypes = [
+ self._graal_isolatethread_t_ptr,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ READ_CALLBACK,
+ WRITE_CALLBACK,
+ ctypes.c_void_p,
+ ]
+ self._lib.run_script_input_output_callback.restype = ctypes.c_void_p
+
+ self._has_callback_input_output = True
+ else:
+ self._has_callback_input_output = False
+
+ def _decode_and_free(self, ptr: Optional[int]) -> str:
+ if not ptr:
+ return ""
+
+ try:
+ result_bytes = ctypes.string_at(ptr)
+ return result_bytes.decode("utf-8")
+ finally:
+ if self._lib is not None and hasattr(self._lib, "free_cstring"):
+ self._lib.free_cstring(self._thread, ptr)
+
+ def initialize(self):
+ if self._initialized:
+ return
+
+ self._load_library()
+ self._setup_graal_structures()
+ self._create_isolate()
+ self._setup_functions()
+ self._initialized = True
+
+ def cleanup(self):
+ if not self._initialized:
+ return
+
+ if hasattr(self._lib, "graal_detach_thread") and self._thread:
+ try:
+ self._lib.graal_detach_thread.argtypes = [self._graal_isolatethread_t_ptr]
+ self._lib.graal_detach_thread.restype = ctypes.c_int
+ self._lib.graal_detach_thread(self._thread)
+ except Exception:
+ pass
+
+ self._initialized = False
+ self._thread = None
+ self._isolate = None
+ self._lib = None
+
+ def open_input_stream(self, mime_type: str, charset: Optional[str] = None) -> DataWeaveInputStream:
+ """Create a new streaming input that can be written to from a separate thread.
+
+ The returned :class:`DataWeaveInputStream` can be passed as a value in the
+ ``inputs`` dict of :meth:`run_stream`. The caller **must** write data and
+ call :meth:`DataWeaveInputStream.close` (or use it as a context manager)
+ from a **separate thread** to avoid deadlocks.
+
+ :param mime_type: the MIME type of the data being streamed
+ :param charset: the charset (default UTF-8)
+ :return: a :class:`DataWeaveInputStream`
+ :raises DataWeaveError: if the runtime is not initialized or streaming input is unsupported
+ """
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_streaming_input:
+ raise DataWeaveError("Native library does not support streaming input API (input_stream_open not found).")
+
+ charset_arg = charset.encode("utf-8") if charset else None
+ handle = self._lib.input_stream_open(
+ self._thread,
+ mime_type.encode("utf-8"),
+ charset_arg,
+ )
+ if handle <= 0:
+ raise DataWeaveError("Failed to create input stream session")
+ return DataWeaveInputStream(
+ self._lib, self._isolate,
+ self._graal_isolate_t_ptr, self._graal_isolatethread_t_ptr,
+ handle, mime_type, charset,
+ )
+
+ def run_stream(self, script: str, inputs: Optional[Dict[str, Any]] = None) -> DataWeaveStream:
+ """Execute a DataWeave script and return a :class:`DataWeaveStream` for
+ reading the result incrementally.
+
+ The returned stream **must** be closed (or used as a context manager) to
+ release native resources.
+
+ :param script: the DataWeave script source
+ :param inputs: optional input bindings
+ :return: a :class:`DataWeaveStream`
+ :raises DataWeaveError: if the runtime is not initialized or streaming is unsupported
+ """
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_streaming:
+ raise DataWeaveError("Native library does not support the streaming API (run_script_open not found).")
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {}
+ for key, val in inputs.items():
+ if isinstance(val, DataWeaveInputStream):
+ normalized_inputs[key] = {
+ "streamHandle": str(val.handle),
+ "mimeType": val.mime_type,
+ }
+ if val.charset:
+ normalized_inputs[key]["charset"] = val.charset
+ else:
+ normalized_inputs[key] = _normalize_input_value(val)
+ inputs_json = json.dumps(normalized_inputs)
+
+ try:
+ handle = self._lib.run_script_open(
+ self._thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ )
+
+ # Check for error session
+ err_ptr = self._lib.run_script_stream_error(self._thread, handle)
+ err_msg = ""
+ if err_ptr:
+ try:
+ err_msg = ctypes.string_at(err_ptr).decode("utf-8")
+ finally:
+ if hasattr(self._lib, "free_cstring"):
+ self._lib.free_cstring(self._thread, err_ptr)
+
+ if err_msg:
+ self._lib.run_script_close(self._thread, handle)
+ raise DataWeaveError(err_msg)
+
+ # Fetch metadata
+ meta_ptr = self._lib.run_script_metadata(self._thread, handle)
+ metadata = {}
+ if meta_ptr:
+ try:
+ meta_raw = ctypes.string_at(meta_ptr).decode("utf-8")
+ metadata = json.loads(meta_raw)
+ finally:
+ if hasattr(self._lib, "free_cstring"):
+ self._lib.free_cstring(self._thread, meta_ptr)
+
+ return DataWeaveStream(self._lib, self._thread, handle, metadata)
+ except DataWeaveError:
+ raise
+ except Exception as e:
+ raise DataWeaveError(f"Failed to open streaming session: {e}")
+
+ def run_callback(
+ self,
+ script: str,
+ write_callback,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> dict:
+ """Execute a DataWeave script and stream the output via a write callback.
+
+ Instead of the session-based ``run_stream`` API, the native side reads the
+ output internally and invokes *write_callback* for each chunk.
+
+ :param script: the DataWeave script source
+ :param write_callback: callable ``(data: bytes) -> int`` invoked with each
+ output chunk. Must return ``0`` on success or non-zero to abort.
+ :param inputs: optional input bindings (same format as :meth:`run`)
+ :return: a dict with ``success``, ``mimeType``, ``charset``, ``binary`` on
+ success, or ``success`` and ``error`` on failure
+ :raises DataWeaveError: if the runtime is not initialized or the callback API
+ is not available
+ """
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_callback_streaming:
+ raise DataWeaveError(
+ "Native library does not support callback streaming API (run_script_callback not found)."
+ )
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {}
+ for key, val in inputs.items():
+ if isinstance(val, DataWeaveInputStream):
+ normalized_inputs[key] = {
+ "streamHandle": str(val.handle),
+ "mimeType": val.mime_type,
+ }
+ if val.charset:
+ normalized_inputs[key]["charset"] = val.charset
+ else:
+ normalized_inputs[key] = _normalize_input_value(val)
+ inputs_json = json.dumps(normalized_inputs)
+
+ @WRITE_CALLBACK
+ def _write_cb(_ctx, buf, length):
+ try:
+ data = ctypes.string_at(buf, length)
+ return write_callback(data)
+ except Exception:
+ return -1
+
+ try:
+ result_ptr = self._lib.run_script_callback(
+ self._thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ _write_cb,
+ None,
+ )
+ raw = self._decode_and_free(result_ptr)
+ return json.loads(raw) if raw else {"success": False, "error": "Empty response"}
+ except Exception as e:
+ raise DataWeaveError(f"Failed to execute callback streaming: {e}")
+
+ def run_input_output_callback(
+ self,
+ script: str,
+ input_name: str,
+ input_mime_type: str,
+ read_callback,
+ write_callback,
+ input_charset: Optional[str] = None,
+ inputs: Optional[Dict[str, Any]] = None,
+ ) -> dict:
+ """Execute a DataWeave script with callback-driven input *and* output streaming.
+
+ The native side calls *read_callback* on a background thread to pull input
+ data for the binding named *input_name*, and calls *write_callback* on the
+ calling thread to push output chunks.
+
+ :param script: the DataWeave script source
+ :param input_name: the binding name for the callback-supplied input
+ :param input_mime_type: MIME type of the callback-supplied input
+ :param read_callback: callable ``(buf_size: int) -> bytes`` returning the
+ next chunk, empty bytes ``b""`` on EOF, or raising on error
+ :param write_callback: callable ``(data: bytes) -> int`` returning ``0`` on
+ success or non-zero to abort
+ :param input_charset: charset of the callback-supplied input (default UTF-8)
+ :param inputs: optional additional input bindings
+ :return: a dict with metadata on success, or error info on failure
+ :raises DataWeaveError: if the runtime is not initialized or the API is missing
+ """
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+ if not self._has_callback_input_output:
+ raise DataWeaveError(
+ "Native library does not support callback input/output API "
+ "(run_script_input_output_callback not found)."
+ )
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()}
+ inputs_json = json.dumps(normalized_inputs)
+
+ @READ_CALLBACK
+ def _read_cb(_ctx, buf, buf_size):
+ try:
+ data = read_callback(buf_size)
+ if not data:
+ return 0 # EOF
+ n = len(data)
+ ctypes.memmove(buf, data, n)
+ return n
+ except Exception:
+ return -1
+
+ @WRITE_CALLBACK
+ def _write_cb(_ctx, buf, length):
+ try:
+ data = ctypes.string_at(buf, length)
+ return write_callback(data)
+ except Exception:
+ return -1
+
+ try:
+ result_ptr = self._lib.run_script_input_output_callback(
+ self._thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ input_name.encode("utf-8"),
+ input_mime_type.encode("utf-8"),
+ input_charset.encode("utf-8") if input_charset else None,
+ _read_cb,
+ _write_cb,
+ None,
+ )
+ raw = self._decode_and_free(result_ptr)
+ return json.loads(raw) if raw else {"success": False, "error": "Empty response"}
+ except Exception as e:
+ raise DataWeaveError(f"Failed to execute callback input/output streaming: {e}")
+
+ def run(self, script: str, inputs: Optional[Dict[str, Any]] = None) -> ExecutionResult:
+ if not self._initialized:
+ raise DataWeaveError("DataWeave runtime not initialized. Call initialize() first.")
+
+ if inputs is None:
+ inputs = {}
+
+ normalized_inputs = {key: _normalize_input_value(val) for key, val in inputs.items()}
+ inputs_json = json.dumps(normalized_inputs)
+
+ try:
+ result_ptr = self._lib.run_script(
+ self._thread,
+ script.encode("utf-8"),
+ inputs_json.encode("utf-8"),
+ )
+ raw = self._decode_and_free(result_ptr)
+ return _parse_native_encoded_response(raw)
+ except Exception as e:
+ raise DataWeaveError(f"Failed to execute script: {e}")
+
+ def __enter__(self):
+ self.initialize()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ self.cleanup()
+ return False
+
+
+_global_instance: Optional[DataWeave] = None
+
+
+def _get_global_instance() -> DataWeave:
+ global _global_instance
+ if _global_instance is None:
+ _global_instance = DataWeave()
+ _global_instance.initialize()
+ return _global_instance
+
+
+def run_script(script: str, inputs: Optional[Dict[str, Any]] = None) -> ExecutionResult:
+ return _get_global_instance().run(script, inputs)
+
+
+def run_stream(script: str, inputs: Optional[Dict[str, Any]] = None) -> DataWeaveStream:
+ """Execute a script and return a :class:`DataWeaveStream` for incremental reading."""
+ return _get_global_instance().run_stream(script, inputs)
+
+
+def open_input_stream(mime_type: str, charset: Optional[str] = None) -> DataWeaveInputStream:
+ """Create a streaming input session. See :meth:`DataWeave.open_input_stream`."""
+ return _get_global_instance().open_input_stream(mime_type, charset)
+
+
+def run_callback(script: str, write_callback, inputs: Optional[Dict[str, Any]] = None) -> dict:
+ """Execute a script and stream output via a write callback. See :meth:`DataWeave.run_callback`."""
+ return _get_global_instance().run_callback(script, write_callback, inputs)
+
+
+def run_input_output_callback(
+ script: str,
+ input_name: str,
+ input_mime_type: str,
+ read_callback,
+ write_callback,
+ input_charset: Optional[str] = None,
+ inputs: Optional[Dict[str, Any]] = None,
+) -> dict:
+ """Execute a script with callback-driven input and output. See :meth:`DataWeave.run_input_output_callback`."""
+ return _get_global_instance().run_input_output_callback(
+ script, input_name, input_mime_type, read_callback, write_callback, input_charset, inputs,
+ )
+
+
+def cleanup():
+ global _global_instance
+ if _global_instance is not None:
+ _global_instance.cleanup()
+ _global_instance = None
+
+
+__all__ = [
+ "DataWeaveError",
+ "DataWeaveInputStream",
+ "DataWeaveLibraryNotFoundError",
+ "DataWeaveStream",
+ "ExecutionResult",
+ "InputValue",
+ "READ_CALLBACK",
+ "WRITE_CALLBACK",
+ "open_input_stream",
+ "run_callback",
+ "run_input_output_callback",
+ "run_script",
+ "run_stream",
+ "cleanup",
+]
diff --git a/native-lib/python/tests/person.xml b/native-lib/python/tests/person.xml
new file mode 100644
index 0000000..376a6b7
Binary files /dev/null and b/native-lib/python/tests/person.xml differ
diff --git a/native-lib/python/tests/test_dataweave_module.py b/native-lib/python/tests/test_dataweave_module.py
new file mode 100755
index 0000000..835990b
--- /dev/null
+++ b/native-lib/python/tests/test_dataweave_module.py
@@ -0,0 +1,486 @@
+#!/usr/bin/env python3
+"""
+Quick test script for the DataWeave Python module.
+"""
+
+import sys
+from pathlib import Path
+
+_PYTHON_SRC_DIR = Path(__file__).resolve().parents[1] / "src"
+sys.path.insert(0, str(_PYTHON_SRC_DIR))
+
+import dataweave
+
+def test_basic():
+ """Test basic functionality"""
+ print("Testing basic script execution...")
+ try:
+ result = dataweave.run_script("2 + 2", {})
+ assert result.get_string() == "4", f"Expected '4', got '{result.get_string()}'"
+ print("[OK] Basic script execution works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Basic script execution failed: {e}")
+ return False
+
+def test_with_inputs():
+ """Test script with inputs"""
+ print("\nTesting script with inputs...")
+ try:
+ result = dataweave.run_script("num1 + num2", {"num1": 25, "num2": 17})
+ assert result.get_string() == "42", f"Expected '42', got '{result.get_string()}'"
+ print("[OK] Script with inputs works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Script with inputs failed: {e}")
+ return False
+
+def test_context_manager():
+ """Test context manager"""
+ print("\nTesting with context manager...")
+ try:
+ with dataweave.DataWeave() as dw:
+
+ result = dw.run("sqrt(144)")
+ assert result.get_string() == "12", f"Expected '12', got '{result.get_string()}'"
+ result = dw.run("sqrt(10000)")
+ assert result.get_string() == "100", f"Expected '100', got '{result.get_string()}'"
+ print("[OK] Script execution witch context manager works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Script execution witch context manager failed: {e}")
+ return False
+
+def test_encoding():
+ """Test reading UTF-16 XML input and producing CSV output"""
+ print("\nTesting encoding (UTF-16 XML -> CSV)...")
+ try:
+ xml_path = (
+ Path(__file__).resolve().parent / "person.xml"
+ )
+ xml_bytes = xml_path.read_bytes()
+
+ script = """output application/csv header=true
+---
+[payload.person]
+"""
+
+ result = dataweave.run_script(
+ script,
+ {
+ "payload": {
+ "content": xml_bytes,
+ "mimeType": "application/xml",
+ "charset": "UTF-16",
+ }
+ },
+ )
+
+ out = result.get_string() or ""
+ print(f"out: \n{out}")
+ assert result.success is True, f"Expected success=true, got: {result}"
+ assert "name" in out and "age" in out, f"CSV header missing, got: {out!r}"
+ assert "Billy" in out, f"Expected name 'Billy' in CSV, got: {out!r}"
+ assert "31" in out, f"Expected age '31' in CSV, got: {out!r}"
+
+ print("[OK] Encoding conversion works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Encoding conversion failed: {e}")
+ return False
+
+def test_auto_conversion():
+ """Test auto-conversion of different types"""
+ print("\nTesting auto-conversion...")
+ try:
+
+ # Test array
+ result = dataweave.run_script(
+ "numbers[0]",
+ {"numbers": [1, 2, 3]}
+ )
+ assert result.get_string() == "1", f"Expected '1', got '{result.get_string()}'"
+
+ print("[OK] Auto-conversion works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Auto-conversion failed: {e}")
+ return False
+
+def test_streaming_basic():
+ """Test basic streaming execution"""
+ print("\nTesting streaming basic execution...")
+ try:
+ with dataweave.run_stream("2 + 2") as stream:
+ assert stream.mimeType is not None, "Expected mimeType in metadata"
+ result = stream.read_all_string()
+ assert result == "4", f"Expected '4', got '{result}'"
+ print("[OK] Streaming basic execution works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming basic execution failed: {e}")
+ return False
+
+def test_streaming_with_inputs():
+ """Test streaming execution with inputs"""
+ print("\nTesting streaming with inputs...")
+ try:
+ with dataweave.run_stream("num1 + num2", {"num1": 25, "num2": 17}) as stream:
+ result = stream.read_all_string()
+ assert result == "42", f"Expected '42', got '{result}'"
+ print("[OK] Streaming with inputs works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming with inputs failed: {e}")
+ return False
+
+def test_streaming_chunked_read():
+ """Test streaming with small chunk reads"""
+ print("\nTesting streaming chunked read...")
+ try:
+ script = """output application/json
+---
+{items: (1 to 100) map {id: $, name: "item_" ++ $}}"""
+ with dataweave.run_stream(script) as stream:
+ chunks = []
+ while True:
+ chunk = stream.read(32)
+ if not chunk:
+ break
+ chunks.append(chunk)
+ full = b"".join(chunks)
+ assert len(chunks) > 1, f"Expected multiple chunks, got {len(chunks)}"
+ assert b"item_1" in full, "Expected 'item_1' in result"
+ assert b"item_100" in full, "Expected 'item_100' in result"
+ print(f"[OK] Streaming chunked read works ({len(chunks)} chunks)")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming chunked read failed: {e}")
+ return False
+
+def test_streaming_iterator():
+ """Test streaming via iterator protocol"""
+ print("\nTesting streaming iterator...")
+ try:
+ with dataweave.run_stream("output application/json --- [1,2,3]") as stream:
+ chunks = list(stream)
+ full = b"".join(chunks)
+ text = full.decode(stream.charset or "utf-8")
+ assert "1" in text and "3" in text, f"Expected [1,2,3] in result, got '{text}'"
+ print("[OK] Streaming iterator works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming iterator failed: {e}")
+ return False
+
+def test_streaming_context_manager():
+ """Test that streaming context manager properly cleans up"""
+ print("\nTesting streaming context manager cleanup...")
+ try:
+ with dataweave.DataWeave() as dw:
+ with dw.run_stream("sqrt(144)") as stream:
+ result = stream.read_all_string()
+ assert result == "12", f"Expected '12', got '{result}'"
+ print("[OK] Streaming context manager works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming context manager failed: {e}")
+ return False
+
+def test_streaming_input_basic():
+ """Test streaming input with a separate feeder thread"""
+ print("\nTesting streaming input basic...")
+ try:
+ import threading
+
+ with dataweave.DataWeave() as dw:
+ input_stream = dw.open_input_stream("application/json")
+
+ def feed():
+ try:
+ input_stream.write(b'{"name": "Alice", "age": 30}')
+ input_stream.close()
+ except Exception as e:
+ print(f" Feed error: {e}")
+
+ t = threading.Thread(target=feed)
+ t.start()
+
+ with dw.run_stream(
+ "output application/json\n---\npayload.name",
+ inputs={"payload": input_stream},
+ ) as out:
+ result = out.read_all_string()
+
+ t.join(timeout=5)
+ assert result == '"Alice"', f"Expected '\"Alice\"', got '{result}'"
+
+ print("[OK] Streaming input basic works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming input basic failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_streaming_input_large():
+ """Test streaming a large input in chunks from a feeder thread"""
+ print("\nTesting streaming input large...")
+ try:
+ import threading
+
+ with dataweave.DataWeave() as dw:
+ input_stream = dw.open_input_stream("application/json")
+
+ def feed():
+ try:
+ # Build a large JSON array and stream it in chunks
+ data = b"["
+ for i in range(1, 501):
+ if i > 1:
+ data += b","
+ data += f'{{"id":{i},"val":"item_{i}"}}'.encode()
+ data += b"]"
+ chunk_size = 4096
+ for offset in range(0, len(data), chunk_size):
+ input_stream.write(data[offset:offset + chunk_size])
+ input_stream.close()
+ except Exception as e:
+ print(f" Feed error: {e}")
+
+ t = threading.Thread(target=feed)
+ t.start()
+
+ with dw.run_stream(
+ "output application/json\n---\nsizeOf(payload)",
+ inputs={"payload": input_stream},
+ ) as out:
+ result = out.read_all_string()
+
+ t.join(timeout=10)
+ assert result == "500", f"Expected '500', got '{result}'"
+
+ print("[OK] Streaming input large works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming input large failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_streaming_input_context_manager():
+ """Test DataWeaveInputStream as a context manager"""
+ print("\nTesting streaming input context manager...")
+ try:
+ import threading
+
+ with dataweave.DataWeave() as dw:
+ input_stream = dw.open_input_stream("application/json")
+
+ def feed():
+ try:
+ with input_stream:
+ input_stream.write(b'[1, 2, 3]')
+ except Exception as e:
+ print(f" Feed error: {e}")
+
+ t = threading.Thread(target=feed)
+ t.start()
+
+ with dw.run_stream(
+ "output application/json\n---\npayload[2]",
+ inputs={"payload": input_stream},
+ ) as out:
+ result = out.read_all_string()
+
+ t.join(timeout=5)
+ assert result == "3", f"Expected '3', got '{result}'"
+
+ print("[OK] Streaming input context manager works")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Streaming input context manager failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_callback_output_basic():
+ """Test callback-based output streaming"""
+ print("\nTesting callback output basic...")
+ try:
+ chunks = []
+
+ def on_write(data: bytes) -> int:
+ chunks.append(data)
+ return 0
+
+ result = dataweave.run_callback("2 + 2", on_write)
+ assert result.get("success") is True, f"Expected success, got: {result}"
+ full = b"".join(chunks)
+ text = full.decode(result.get("charset", "utf-8"))
+ assert text == "4", f"Expected '4', got '{text}'"
+ print(f"[OK] Callback output basic works (chunks={len(chunks)}, result='{text}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback output basic failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_callback_output_with_inputs():
+ """Test callback-based output streaming with inputs"""
+ print("\nTesting callback output with inputs...")
+ try:
+ chunks = []
+
+ def on_write(data: bytes) -> int:
+ chunks.append(data)
+ return 0
+
+ result = dataweave.run_callback("num1 + num2", on_write, inputs={"num1": 25, "num2": 17})
+ assert result.get("success") is True, f"Expected success, got: {result}"
+ full = b"".join(chunks)
+ text = full.decode(result.get("charset", "utf-8"))
+ assert text == "42", f"Expected '42', got '{text}'"
+ print(f"[OK] Callback output with inputs works (result='{text}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback output with inputs failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_callback_input_output():
+ """Test callback-based input and output streaming"""
+ print("\nTesting callback input+output...")
+ try:
+ import io as _io
+
+ source = _io.BytesIO(b'[10, 20, 30, 40, 50]')
+ output_chunks = []
+
+ def on_read(buf_size: int) -> bytes:
+ return source.read(buf_size)
+
+ def on_write(data: bytes) -> int:
+ output_chunks.append(data)
+ return 0
+
+ script = "output application/json\n---\npayload map ($ * 2)"
+ result = dataweave.run_input_output_callback(
+ script,
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=on_read,
+ write_callback=on_write,
+ )
+ assert result.get("success") is True, f"Expected success, got: {result}"
+ full = b"".join(output_chunks)
+ text = full.decode(result.get("charset", "utf-8"))
+ assert "20" in text, f"Expected 20 in result (10*2), got: {text}"
+ assert "100" in text, f"Expected 100 in result (50*2), got: {text}"
+ print(f"[OK] Callback input+output works (chunks={len(output_chunks)}, result={text.strip()[:80]}...)")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback input+output failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def test_callback_input_output_large():
+ """Test callback-based input+output streaming with large data"""
+ print("\nTesting callback input+output large...")
+ try:
+ import io as _io
+
+ # Build a large JSON array
+ parts = [b"["]
+ for i in range(1, 1001):
+ if i > 1:
+ parts.append(b",")
+ parts.append(f'{{"id":{i}}}'.encode())
+ parts.append(b"]")
+ source = _io.BytesIO(b"".join(parts))
+ output_chunks = []
+
+ def on_read(buf_size: int) -> bytes:
+ return source.read(buf_size)
+
+ def on_write(data: bytes) -> int:
+ output_chunks.append(data)
+ return 0
+
+ result = dataweave.run_input_output_callback(
+ "output application/json\n---\nsizeOf(payload)",
+ input_name="payload",
+ input_mime_type="application/json",
+ read_callback=on_read,
+ write_callback=on_write,
+ )
+ assert result.get("success") is True, f"Expected success, got: {result}"
+ full = b"".join(output_chunks)
+ text = full.decode(result.get("charset", "utf-8"))
+ assert text == "1000", f"Expected '1000', got '{text}'"
+ print(f"[OK] Callback input+output large works (result='{text}')")
+ return True
+ except Exception as e:
+ print(f"[FAIL] Callback input+output large failed: {e}")
+ import traceback
+ traceback.print_exc()
+ return False
+
+def main():
+ """Run all tests"""
+ print("="*70)
+ print("DataWeave Python Module - Test Suite")
+ print("="*70)
+
+ try:
+ results = []
+ results.append(test_basic())
+ results.append(test_with_inputs())
+ results.append(test_context_manager())
+ results.append(test_encoding())
+ results.append(test_auto_conversion())
+ results.append(test_streaming_basic())
+ results.append(test_streaming_with_inputs())
+ results.append(test_streaming_chunked_read())
+ results.append(test_streaming_iterator())
+ results.append(test_streaming_context_manager())
+ results.append(test_streaming_input_basic())
+ results.append(test_streaming_input_large())
+ results.append(test_streaming_input_context_manager())
+ results.append(test_callback_output_basic())
+ results.append(test_callback_output_with_inputs())
+ results.append(test_callback_input_output())
+ results.append(test_callback_input_output_large())
+
+ # Cleanup
+ dataweave.cleanup()
+
+ print("\n" + "="*70)
+ passed = sum(results)
+ total = len(results)
+ print(f"Results: {passed}/{total} tests passed")
+ print("="*70)
+
+ if passed == total:
+ print("\n[OK] All tests passed!")
+ sys.exit(0)
+ else:
+ print(f"\n[FAIL] {total - passed} test(s) failed")
+ sys.exit(1)
+
+ except dataweave.DataWeaveLibraryNotFoundError as e:
+ print(f"\n[ERROR] {e}")
+ print("\nPlease build the native library first:")
+ print(" ./gradlew nativeCompile")
+ sys.exit(2)
+ except Exception as e:
+ print(f"\n[ERROR] Unexpected error: {e}")
+ import traceback
+ traceback.print_exc()
+ sys.exit(1)
+
+if __name__ == "__main__":
+ main()
diff --git a/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java b/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java
new file mode 100644
index 0000000..6d1537b
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/InputStreamSession.java
@@ -0,0 +1,125 @@
+package org.mule.weave.lib;
+
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Manages a {@link PipedInputStream}/{@link PipedOutputStream} pair that allows FFI callers
+ * to stream data into the DataWeave engine as an input binding.
+ *
+ *
The caller writes bytes into the {@link PipedOutputStream} via {@link #write(byte[], int)}
+ * while the DW engine reads from the paired {@link PipedInputStream} on a separate thread.
+ *
+ * Instances are stored in a static registry keyed by a monotonically increasing handle so that
+ * native callers can reference them across {@code @CEntryPoint} invocations.
+ */
+public class InputStreamSession {
+
+ private static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>();
+ private static final AtomicLong NEXT_HANDLE = new AtomicLong(1);
+
+ private static final int PIPE_BUFFER_SIZE = 64 * 1024;
+
+ private final PipedInputStream pipedInputStream;
+ private final PipedOutputStream pipedOutputStream;
+ private final String mimeType;
+ private final String charset;
+
+ /**
+ * Creates a new input stream session with the given metadata.
+ *
+ * @param mimeType the MIME type of the input data
+ * @param charset the character set of the input data (may be {@code null}, defaults to UTF-8)
+ */
+ public InputStreamSession(String mimeType, String charset) {
+ try {
+ this.pipedInputStream = new PipedInputStream(PIPE_BUFFER_SIZE);
+ this.pipedOutputStream = new PipedOutputStream(pipedInputStream);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create piped streams", e);
+ }
+ this.mimeType = mimeType;
+ this.charset = charset != null ? charset : "UTF-8";
+ }
+
+ /**
+ * Registers this session and returns its handle.
+ *
+ * @return a unique handle that callers use to reference this session
+ */
+ public long register() {
+ long handle = NEXT_HANDLE.getAndIncrement();
+ SESSIONS.put(handle, this);
+ return handle;
+ }
+
+ /**
+ * Looks up a previously registered session.
+ *
+ * @param handle the handle returned by {@link #register()}
+ * @return the session, or {@code null} if not found
+ */
+ public static InputStreamSession get(long handle) {
+ return SESSIONS.get(handle);
+ }
+
+ /**
+ * Removes a session from the registry and closes both ends of the pipe.
+ *
+ * @param handle the session handle
+ */
+ public static void close(long handle) {
+ InputStreamSession session = SESSIONS.remove(handle);
+ if (session != null) {
+ try {
+ session.pipedOutputStream.close();
+ } catch (IOException ignored) {
+ }
+ try {
+ session.pipedInputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Writes bytes into the pipe. The DW engine will read these from the paired
+ * {@link PipedInputStream}.
+ *
+ * @param data the byte array to write from
+ * @param length the number of bytes to write
+ * @throws IOException if an I/O error occurs
+ */
+ public void write(byte[] data, int length) throws IOException {
+ pipedOutputStream.write(data, 0, length);
+ }
+
+ /**
+ * Closes the write end of the pipe, signalling EOF to the reader.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void closeWriter() throws IOException {
+ pipedOutputStream.close();
+ }
+
+ /**
+ * Returns the {@link PipedInputStream} that the DW engine should read from.
+ *
+ * @return the read end of the pipe
+ */
+ public PipedInputStream getInputStream() {
+ return pipedInputStream;
+ }
+
+ public String getMimeType() {
+ return mimeType;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java b/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java
new file mode 100644
index 0000000..3872354
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/NativeCallbacks.java
@@ -0,0 +1,49 @@
+package org.mule.weave.lib;
+
+import org.graalvm.nativeimage.c.function.CFunctionPointer;
+import org.graalvm.nativeimage.c.function.InvokeCFunctionPointer;
+import org.graalvm.nativeimage.c.type.CCharPointer;
+import org.graalvm.word.PointerBase;
+
+/**
+ * Function-pointer (callback) interfaces used by the callback-based streaming API in
+ * {@link NativeLib}.
+ *
+ * FFI callers pass C function pointers that conform to these signatures. The Java side
+ * invokes them via {@link InvokeCFunctionPointer} to push/pull data without requiring
+ * the session-based open/read|write/close round-trips.
+ */
+public final class NativeCallbacks {
+
+ private NativeCallbacks() {
+ }
+
+ /**
+ * Callback the native caller provides to receive output data.
+ *
+ * Signature in C:
+ *
{@code int (*WriteCallback)(void *ctx, const char *buffer, int length);}
+ *
+ * The Java side calls this repeatedly with chunks of the script result.
+ * The callback must return {@code 0} on success or a non-zero value to abort.
+ */
+ public interface WriteCallback extends CFunctionPointer {
+ @InvokeCFunctionPointer
+ int invoke(PointerBase ctx, CCharPointer buffer, int length);
+ }
+
+ /**
+ * Callback the native caller provides to supply input data.
+ *
+ * Signature in C:
+ *
{@code int (*ReadCallback)(void *ctx, char *buffer, int bufferSize);}
+ *
+ * The Java side calls this to pull the next chunk of input bytes. The callback must
+ * write up to {@code bufferSize} bytes into {@code buffer} and return the number of
+ * bytes written, {@code 0} on EOF, or {@code -1} on error.
+ */
+ public interface ReadCallback extends CFunctionPointer {
+ @InvokeCFunctionPointer
+ int invoke(PointerBase ctx, CCharPointer buffer, int bufferSize);
+ }
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java b/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java
new file mode 100644
index 0000000..e4f259e
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/NativeLib.java
@@ -0,0 +1,524 @@
+package org.mule.weave.lib;
+
+import org.graalvm.nativeimage.IsolateThread;
+import org.graalvm.nativeimage.UnmanagedMemory;
+import org.graalvm.nativeimage.c.function.CEntryPoint;
+import org.graalvm.nativeimage.c.type.CCharPointer;
+import org.graalvm.nativeimage.c.type.CTypeConversion;
+import org.graalvm.word.PointerBase;
+import org.graalvm.word.WordFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * GraalVM native entry points exposed for FFI consumers.
+ *
+ * This class provides C-callable functions to execute DataWeave scripts and to free the returned
+ * unmanaged strings.
+ */
+public class NativeLib {
+
+ /**
+ * Native method that executes a DataWeave script with inputs and returns the result.
+ * Can be called from Python via FFI.
+ *
+ * @param thread the isolate thread (automatically provided by GraalVM)
+ * @param script the DataWeave script to execute (C string pointer)
+ * @param inputsJson JSON string containing the inputs map with content (base64 encoded), mimeType, properties and charset for each binding
+ * @return the script execution result base64 encoded (C string pointer)
+ */
+ @CEntryPoint(name = "run_script")
+ public static CCharPointer runDwScriptEncoded(IsolateThread thread, CCharPointer script, CCharPointer inputsJson) {
+ String dwScript = CTypeConversion.toJavaString(script);
+ String inputs = CTypeConversion.toJavaString(inputsJson);
+
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ String result = runtime.run(dwScript, inputs);
+ return toUnmanagedCString(result);
+ }
+
+ /**
+ * Frees a C string previously returned by {@link #runDwScriptEncoded(IsolateThread, CCharPointer, CCharPointer)}.
+ *
+ * @param thread the isolate thread (automatically provided by GraalVM)
+ * @param pointer the pointer to the unmanaged C string to free; if null, this is a no-op
+ */
+ @CEntryPoint(name = "free_cstring")
+ public static void freeCString(IsolateThread thread, CCharPointer pointer) {
+ if (pointer.isNull()) {
+ return;
+ }
+ UnmanagedMemory.free(pointer);
+ }
+
+ // ── Streaming Input API ─────────────────────────────────────────────
+
+ /**
+ * Creates a new input stream session that the caller can write data into.
+ * The returned handle references a {@link java.io.PipedInputStream}/{@link java.io.PipedOutputStream}
+ * pair. The caller writes chunks via {@link #inputStreamWrite} and signals EOF
+ * with {@link #inputStreamClose}. The handle can then be referenced in the inputs
+ * JSON passed to {@link #runScriptOpen} using a {@code "streamHandle"} key.
+ *
+ * @param thread the isolate thread
+ * @param mimeType the MIME type of the data being streamed (C string)
+ * @param charset the charset of the data (C string), may be null for UTF-8
+ * @return a positive handle for the input stream session
+ */
+ @CEntryPoint(name = "input_stream_open")
+ public static long inputStreamOpen(IsolateThread thread, CCharPointer mimeType, CCharPointer charset) {
+ String mime = CTypeConversion.toJavaString(mimeType);
+ String cs = charset.isNull() ? null : CTypeConversion.toJavaString(charset);
+ InputStreamSession session = new InputStreamSession(mime, cs);
+ return session.register();
+ }
+
+ /**
+ * Writes a chunk of bytes into an open input stream session.
+ *
+ * @param thread the isolate thread
+ * @param handle the input stream session handle
+ * @param buffer the data to write
+ * @param bufferSize number of bytes to write from the buffer
+ * @return {@code 0} on success, {@code -1} on error (invalid handle or I/O failure)
+ */
+ @CEntryPoint(name = "input_stream_write")
+ public static int inputStreamWrite(IsolateThread thread, long handle, CCharPointer buffer, int bufferSize) {
+ InputStreamSession session = InputStreamSession.get(handle);
+ if (session == null) {
+ return -1;
+ }
+ try {
+ byte[] tmp = new byte[bufferSize];
+ for (int i = 0; i < bufferSize; i++) {
+ tmp[i] = buffer.read(i);
+ }
+ session.write(tmp, bufferSize);
+ return 0;
+ } catch (IOException e) {
+ return -1;
+ }
+ }
+
+ /**
+ * Closes the write end of an input stream session, signalling EOF to the reader.
+ * The session remains in the registry so the DW engine can finish reading.
+ * It will be fully cleaned up when the output streaming session is closed.
+ *
+ * @param thread the isolate thread
+ * @param handle the input stream session handle
+ * @return {@code 0} on success, {@code -1} on error
+ */
+ @CEntryPoint(name = "input_stream_close")
+ public static int inputStreamClose(IsolateThread thread, long handle) {
+ InputStreamSession session = InputStreamSession.get(handle);
+ if (session == null) {
+ return -1;
+ }
+ try {
+ session.closeWriter();
+ return 0;
+ } catch (IOException e) {
+ return -1;
+ }
+ }
+
+ // ── Streaming Output API ──────────────────────────────────────────────
+
+ /**
+ * Executes a DataWeave script and returns an opaque handle to a streaming session.
+ * The result can then be read incrementally via {@link #runScriptRead} and must be
+ * closed with {@link #runScriptClose} when done.
+ *
+ * A handle value of {@code -1} indicates an error during compilation or execution.
+ * In that case call {@link #runScriptStreamError} to retrieve the error message.
+ *
+ * @param thread the isolate thread
+ * @param script the DataWeave script (C string)
+ * @param inputsJson JSON-encoded inputs map (C string), may be null
+ * @return a positive session handle, or {@code -1} on error
+ */
+ @CEntryPoint(name = "run_script_open")
+ public static long runScriptOpen(IsolateThread thread, CCharPointer script, CCharPointer inputsJson) {
+ String dwScript = CTypeConversion.toJavaString(script);
+ String inputs = inputsJson.isNull() ? null : CTypeConversion.toJavaString(inputsJson);
+
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ StreamSession session = runtime.runStreaming(dwScript, inputs);
+ return session.register();
+ }
+
+ /**
+ * Reads the next chunk of bytes from an open streaming session.
+ *
+ * @param thread the isolate thread
+ * @param handle the session handle returned by {@link #runScriptOpen}
+ * @param buffer caller-allocated buffer to write into
+ * @param bufferSize size of the buffer in bytes
+ * @return the number of bytes written into {@code buffer}, {@code 0} on EOF,
+ * or {@code -1} on error (invalid handle or I/O failure)
+ */
+ @CEntryPoint(name = "run_script_read")
+ public static int runScriptRead(IsolateThread thread, long handle, CCharPointer buffer, int bufferSize) {
+ StreamSession session = StreamSession.get(handle);
+ if (session == null) {
+ return -1;
+ }
+ try {
+ byte[] tmp = new byte[bufferSize];
+ int n = session.read(tmp, bufferSize);
+ if (n <= 0) {
+ return 0;
+ }
+ for (int i = 0; i < n; i++) {
+ buffer.write(i, tmp[i]);
+ }
+ return n;
+ } catch (IOException e) {
+ return -1;
+ }
+ }
+
+ /**
+ * Returns JSON metadata for an open streaming session:
+ * {@code {"mimeType":"...","charset":"...","binary":true/false}}.
+ *
+ * The caller must free the returned pointer with {@link #freeCString}.
+ *
+ * @param thread the isolate thread
+ * @param handle the session handle
+ * @return an unmanaged C string with JSON metadata, or a null pointer if the handle is invalid
+ */
+ @CEntryPoint(name = "run_script_metadata")
+ public static CCharPointer runScriptMetadata(IsolateThread thread, long handle) {
+ StreamSession session = StreamSession.get(handle);
+ if (session == null) {
+ return CTypeConversion.toCString("").get();
+ }
+ String json = "{"
+ + "\"mimeType\":\"" + session.getMimeType() + "\","
+ + "\"charset\":\"" + session.getCharset() + "\","
+ + "\"binary\":" + session.isBinary()
+ + "}";
+ return toUnmanagedCString(json);
+ }
+
+ /**
+ * Retrieves the error message for a failed streaming session.
+ *
+ * When a session was created from an execution failure its handle is still valid
+ * and the error message can be obtained here. The caller must free the returned pointer
+ * with {@link #freeCString}.
+ *
+ * @param thread the isolate thread
+ * @param handle the session handle
+ * @return an unmanaged C string with the error message, or empty string if not an error session
+ */
+ @CEntryPoint(name = "run_script_stream_error")
+ public static CCharPointer runScriptStreamError(IsolateThread thread, long handle) {
+ StreamSession session = StreamSession.get(handle);
+ if (session == null || session.getError() == null) {
+ return toUnmanagedCString("");
+ }
+ return toUnmanagedCString(session.getError());
+ }
+
+ /**
+ * Closes a streaming session, releasing the underlying {@link java.io.InputStream} and
+ * removing the session from the registry.
+ *
+ * @param thread the isolate thread
+ * @param handle the session handle
+ */
+ @CEntryPoint(name = "run_script_close")
+ public static void runScriptClose(IsolateThread thread, long handle) {
+ StreamSession.close(handle);
+ }
+
+ // ── Callback-based Streaming API ─────────────────────────────────────
+
+ private static final int CALLBACK_BUFFER_SIZE = 8 * 1024;
+
+ /**
+ * Executes a DataWeave script and streams the result to a caller-supplied write callback.
+ *
+ * Instead of the session-based open/read/close cycle, the caller passes a
+ * {@code WriteCallback} function pointer. The Java side reads the output stream in chunks
+ * and invokes the callback for each chunk until the stream is exhausted.
+ *
+ * The returned C string is a JSON object with the execution metadata:
+ *
+ * - On success: {@code {"success":true,"mimeType":"...","charset":"...","binary":true/false}}
+ * - On error: {@code {"success":false,"error":"..."}}
+ *
+ * The caller must free the returned pointer with {@link #freeCString}.
+ *
+ * @param thread the isolate thread
+ * @param script the DataWeave script (C string)
+ * @param inputsJson JSON-encoded inputs map (C string), may be null
+ * @param writeCallback function pointer invoked with each output chunk; must return 0 on success
+ * @param ctx opaque context pointer forwarded to every callback invocation
+ * @return an unmanaged C string with JSON metadata/error
+ */
+ @CEntryPoint(name = "run_script_callback")
+ public static CCharPointer runScriptCallback(
+ IsolateThread thread,
+ CCharPointer script,
+ CCharPointer inputsJson,
+ NativeCallbacks.WriteCallback writeCallback,
+ PointerBase ctx) {
+
+ String dwScript = CTypeConversion.toJavaString(script);
+ String inputs = inputsJson.isNull() ? null : CTypeConversion.toJavaString(inputsJson);
+
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ StreamSession session = runtime.runStreaming(dwScript, inputs);
+
+ if (session.isError()) {
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(session.getError()) + "\"}");
+ }
+
+ try {
+ byte[] buf = new byte[CALLBACK_BUFFER_SIZE];
+ CCharPointer nativeBuf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE);
+ try {
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ for (int i = 0; i < n; i++) {
+ nativeBuf.write(i, buf[i]);
+ }
+ int rc = writeCallback.invoke(ctx, nativeBuf, n);
+ if (rc != 0) {
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + "Write callback returned error: " + rc + "\"}");
+ }
+ }
+ } finally {
+ UnmanagedMemory.free(nativeBuf);
+ }
+ } catch (IOException e) {
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(e.getMessage()) + "\"}");
+ }
+
+ return toUnmanagedCString("{\"success\":true"
+ + ",\"mimeType\":\"" + session.getMimeType() + "\""
+ + ",\"charset\":\"" + session.getCharset() + "\""
+ + ",\"binary\":" + session.isBinary()
+ + "}");
+ }
+
+ /**
+ * Executes a DataWeave script whose output is streamed via a write callback, and whose
+ * input named {@code inputName} is fed via a read callback.
+ *
+ * The read callback is invoked on a background thread to pull input data while the
+ * output is pushed to the write callback on the calling thread. This allows fully
+ * callback-driven input and output streaming in a single call.
+ *
+ * The returned C string follows the same JSON schema as
+ * {@link #runScriptCallback}.
+ *
+ * @param thread the isolate thread
+ * @param script the DataWeave script (C string)
+ * @param inputsJson JSON-encoded inputs map (C string), may be null; entries for
+ * {@code inputName} are ignored since the read callback supplies that input
+ * @param inputName the binding name for the callback-supplied input (C string)
+ * @param inputMimeType the MIME type of the callback-supplied input (C string)
+ * @param inputCharset the charset of the callback-supplied input (C string), may be null for UTF-8
+ * @param readCallback function pointer invoked to read the next chunk; must return bytes written,
+ * 0 on EOF, or -1 on error
+ * @param writeCallback function pointer invoked with each output chunk; must return 0 on success
+ * @param ctx opaque context pointer forwarded to every callback invocation
+ * @return an unmanaged C string with JSON metadata/error
+ */
+ @CEntryPoint(name = "run_script_input_output_callback")
+ public static CCharPointer runScriptInputOutputCallback(
+ IsolateThread thread,
+ CCharPointer script,
+ CCharPointer inputsJson,
+ CCharPointer inputName,
+ CCharPointer inputMimeType,
+ CCharPointer inputCharset,
+ NativeCallbacks.ReadCallback readCallback,
+ NativeCallbacks.WriteCallback writeCallback,
+ PointerBase ctx) {
+
+ String dwScript = CTypeConversion.toJavaString(script);
+ String inputs = inputsJson.isNull() ? null : CTypeConversion.toJavaString(inputsJson);
+ String inName = CTypeConversion.toJavaString(inputName);
+ String inMime = CTypeConversion.toJavaString(inputMimeType);
+ String inCharset = inputCharset.isNull() ? null : CTypeConversion.toJavaString(inputCharset);
+
+ // Create a piped input stream session for the callback-supplied input
+ InputStreamSession inputSession = new InputStreamSession(inMime, inCharset);
+ long inputHandle = inputSession.register();
+
+ // Merge the stream handle into the inputs JSON
+ String streamEntry = "{\"streamHandle\":\"" + inputHandle + "\",\"mimeType\":\"" + inMime + "\""
+ + (inCharset != null ? ",\"charset\":\"" + inCharset + "\"" : "") + "}";
+ String mergedInputs = mergeInputEntry(inputs, inName, streamEntry);
+
+ // Start a background thread that calls the readCallback and feeds data into the pipe.
+ // Word types (CCharPointer, CFunctionPointer, PointerBase) cannot be captured in
+ // lambdas in GraalVM Native Image, so we use an explicit Runnable that stores their
+ // raw addresses and reconstitutes them via WordFactory.
+ CCharPointer readBuf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE);
+ final long readCallbackAddr = readCallback.rawValue();
+ final long ctxAddr = ctx.rawValue();
+ final long readBufAddr = readBuf.rawValue();
+ Thread feeder = new Thread(new InputCallbackFeeder(
+ readCallbackAddr, ctxAddr, readBufAddr, inputSession), "dw-input-callback-feeder");
+ feeder.setDaemon(true);
+ feeder.start();
+
+ // Execute the script and stream output via the writeCallback
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ StreamSession session = runtime.runStreaming(dwScript, mergedInputs);
+
+ if (session.isError()) {
+ cleanupFeeder(feeder, readBuf, inputHandle);
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(session.getError()) + "\"}");
+ }
+
+ try {
+ byte[] buf = new byte[CALLBACK_BUFFER_SIZE];
+ CCharPointer writeBuf = UnmanagedMemory.malloc(CALLBACK_BUFFER_SIZE);
+ try {
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ for (int i = 0; i < n; i++) {
+ writeBuf.write(i, buf[i]);
+ }
+ int rc = writeCallback.invoke(ctx, writeBuf, n);
+ if (rc != 0) {
+ cleanupFeeder(feeder, readBuf, inputHandle);
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + "Write callback returned error: " + rc + "\"}");
+ }
+ }
+ } finally {
+ UnmanagedMemory.free(writeBuf);
+ }
+ } catch (IOException e) {
+ cleanupFeeder(feeder, readBuf, inputHandle);
+ return toUnmanagedCString("{\"success\":false,\"error\":\""
+ + escapeJsonString(e.getMessage()) + "\"}");
+ }
+
+ cleanupFeeder(feeder, readBuf, inputHandle);
+
+ return toUnmanagedCString("{\"success\":true"
+ + ",\"mimeType\":\"" + session.getMimeType() + "\""
+ + ",\"charset\":\"" + session.getCharset() + "\""
+ + ",\"binary\":" + session.isBinary()
+ + "}");
+ }
+
+ /**
+ * Merges a single input entry into an existing JSON inputs string.
+ */
+ private static String mergeInputEntry(String existingJson, String name, String entryJson) {
+ if (existingJson == null || existingJson.trim().isEmpty() || existingJson.trim().equals("{}")) {
+ return "{\"" + name + "\":" + entryJson + "}";
+ }
+ // Insert before the final closing brace
+ String trimmed = existingJson.trim();
+ int lastBrace = trimmed.lastIndexOf('}');
+ if (lastBrace <= 0) {
+ return "{\"" + name + "\":" + entryJson + "}";
+ }
+ String prefix = trimmed.substring(0, lastBrace).trim();
+ // Add comma if there is already content
+ if (prefix.length() > 1) {
+ return prefix + ",\"" + name + "\":" + entryJson + "}";
+ }
+ return "{\"" + name + "\":" + entryJson + "}";
+ }
+
+ /**
+ * Waits for the feeder thread to finish and frees native resources.
+ */
+ private static void cleanupFeeder(Thread feeder, CCharPointer readBuf, long inputHandle) {
+ try {
+ feeder.join(5000);
+ } catch (InterruptedException ignored) {
+ }
+ UnmanagedMemory.free(readBuf);
+ InputStreamSession.close(inputHandle);
+ }
+
+ /**
+ * Explicit {@link Runnable} that drives the read-callback loop on a background thread.
+ *
+ * GraalVM Native Image forbids capturing {@code Word} types (such as
+ * {@link CCharPointer} or {@link CFunctionPointer}) inside lambdas. This class stores
+ * the raw addresses as plain {@code long} values and reconstitutes the pointers via
+ * {@link WordFactory#pointer(long)} inside {@link #run()}.
+ */
+ private static final class InputCallbackFeeder implements Runnable {
+ private final long readCallbackAddr;
+ private final long ctxAddr;
+ private final long readBufAddr;
+ private final InputStreamSession inputSession;
+
+ InputCallbackFeeder(long readCallbackAddr, long ctxAddr, long readBufAddr,
+ InputStreamSession inputSession) {
+ this.readCallbackAddr = readCallbackAddr;
+ this.ctxAddr = ctxAddr;
+ this.readBufAddr = readBufAddr;
+ this.inputSession = inputSession;
+ }
+
+ @Override
+ public void run() {
+ NativeCallbacks.ReadCallback cb = WordFactory.pointer(readCallbackAddr);
+ PointerBase ctx = WordFactory.pointer(ctxAddr);
+ CCharPointer buf = WordFactory.pointer(readBufAddr);
+ try {
+ while (true) {
+ int n = cb.invoke(ctx, buf, CALLBACK_BUFFER_SIZE);
+ if (n <= 0) {
+ break; // 0 = EOF, negative = error
+ }
+ byte[] tmp = new byte[n];
+ for (int i = 0; i < n; i++) {
+ tmp[i] = buf.read(i);
+ }
+ inputSession.write(tmp, n);
+ }
+ } catch (IOException e) {
+ // pipe broken – DW engine will see the error
+ } finally {
+ try {
+ inputSession.closeWriter();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+ }
+
+ private static String escapeJsonString(String input) {
+ if (input == null) return "";
+ return input
+ .replace("\\", "\\\\")
+ .replace("\"", "\\\"")
+ .replace("\n", "\\n")
+ .replace("\r", "\\r")
+ .replace("\t", "\\t");
+ }
+
+ private static CCharPointer toUnmanagedCString(String value) {
+ byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
+ CCharPointer ptr = UnmanagedMemory.malloc(bytes.length + 1);
+ for (int i = 0; i < bytes.length; i++) {
+ ptr.write(i, bytes[i]);
+ }
+ ptr.write(bytes.length, (byte) 0);
+ return ptr;
+ }
+
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java b/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java
new file mode 100644
index 0000000..849e63f
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/ScriptRuntime.java
@@ -0,0 +1,472 @@
+package org.mule.weave.lib;
+
+import org.mule.weave.v2.runtime.BindingValue;
+import org.mule.weave.v2.runtime.DataWeaveResult;
+import org.mule.weave.v2.runtime.ScriptingBindings;
+import org.mule.weave.v2.runtime.api.DWResult;
+import org.mule.weave.v2.runtime.api.DWScript;
+import org.mule.weave.v2.runtime.api.DWScriptingEngine;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.immutable.Map;
+import scala.collection.immutable.Map$;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Base64;
+
+/**
+ * Singleton wrapper around a {@link DWScriptingEngine} used to compile and execute DataWeave scripts.
+ *
+ * Execution results are returned as a JSON string containing a base64-encoded payload plus metadata
+ * (mime type, charset, and whether the result is binary). Errors are returned as a JSON string with
+ * {@code success=false} and an escaped error message.
+ */
+public class ScriptRuntime {
+
+ private static final ScriptRuntime INSTANCE = new ScriptRuntime();
+
+ /**
+ * Returns the singleton instance.
+ *
+ * @return the shared {@link ScriptRuntime}
+ */
+ public static ScriptRuntime getInstance() {
+ return INSTANCE;
+ }
+
+ private DWScriptingEngine engine;
+
+ private ScriptRuntime() {
+ engine = DWScriptingEngine.builder().build();
+ }
+
+ /**
+ * Executes a DataWeave script with no input bindings.
+ *
+ * @param script the DataWeave script source
+ * @return a JSON string describing either the successful result or an error
+ */
+ public String run(String script) {
+ return run(script, null);
+ }
+
+ /**
+ * Executes a DataWeave script with optional input bindings encoded as JSON.
+ *
+ * The expected JSON structure maps binding names to an object containing {@code content}
+ * (base64), {@code mimeType}, optional {@code charset}, and optional {@code properties}.
+ *
+ * @param script the DataWeave script source
+ * @param inputsJson JSON string encoding the input bindings map, or {@code null}
+ * @return a JSON string describing either the successful result or an error
+ */
+ public String run(String script, String inputsJson) {
+ ScriptingBindings bindings = parseJsonInputsToBindings(inputsJson);
+ String[] inputs = bindings.bindingNames();
+
+ try {
+ DWScript compiled = engine.compileDWScript(script, inputs);
+ DWResult dwResult = compiled.writeDWResult(bindings);
+
+ String encodedResult;
+ if (dwResult.getContent() instanceof InputStream) {
+ try {
+ byte[] ba = ((InputStream) dwResult.getContent()).readAllBytes();
+ encodedResult = Base64.getEncoder().encodeToString(ba);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ throw new RuntimeException("Result is not an InputStream: " + dwResult.getContent().getClass().getName());
+ }
+
+ return "{"
+ + "\"success\":true,"
+ + "\"result\":\"" + encodedResult + "\","
+ + "\"mimeType\":\"" + dwResult.getMimeType() + "\","
+ + "\"charset\":\"" + dwResult.getCharset() + "\","
+ + "\"binary\":" + ((DataWeaveResult) dwResult).isBinary()
+ + "}";
+ } catch (Exception e) {
+ String message = e.getMessage();
+ if (message == null || message.trim().isEmpty()) {
+ message = e.toString();
+ }
+
+ return "{"
+ + "\"success\":false,"
+ + "\"error\":\"" + escapeJsonString(message) + "\""
+ + "}";
+ }
+ }
+
+ /**
+ * Executes a DataWeave script and returns a {@link StreamSession} whose {@link java.io.InputStream}
+ * can be read incrementally, avoiding loading the entire result into memory.
+ *
+ * @param script the DataWeave script source
+ * @param inputsJson JSON string encoding the input bindings map, or {@code null}
+ * @return a {@link StreamSession} with the result stream and metadata, or an error session
+ */
+ public StreamSession runStreaming(String script, String inputsJson) {
+ ScriptingBindings bindings = parseJsonInputsToBindings(inputsJson);
+ String[] inputs = bindings.bindingNames();
+
+ try {
+ DWScript compiled = engine.compileDWScript(script, inputs);
+ DWResult dwResult = compiled.writeDWResult(bindings);
+
+ if (dwResult.getContent() instanceof InputStream) {
+ return new StreamSession(
+ (InputStream) dwResult.getContent(),
+ dwResult.getMimeType(),
+ dwResult.getCharset().name(),
+ ((DataWeaveResult) dwResult).isBinary()
+ );
+ } else {
+ return StreamSession.ofError("Result is not an InputStream: " + dwResult.getContent().getClass().getName());
+ }
+ } catch (Exception e) {
+ String message = e.getMessage();
+ if (message == null || message.trim().isEmpty()) {
+ message = e.toString();
+ }
+ return StreamSession.ofError(message);
+ }
+ }
+
+ private ScriptingBindings parseJsonInputsToBindings(String inputsJson) {
+ ScriptingBindings bindings = new ScriptingBindings();
+
+ if (inputsJson == null || inputsJson.trim().isEmpty()) {
+ return bindings;
+ }
+
+ try {
+ String json = inputsJson.trim();
+
+ // Parse top-level entries: "name": { ... }
+ int pos = 1; // Skip opening brace
+
+ while (pos < json.length()) {
+ // Skip whitespace, commas
+ while (pos < json.length() && (Character.isWhitespace(json.charAt(pos)) || json.charAt(pos) == ',')) {
+ pos++;
+ }
+
+ if (pos >= json.length() || json.charAt(pos) == '}') break;
+
+ // Expect a quoted string (binding name)
+ if (json.charAt(pos) != '"') break;
+
+ int nameEnd = findClosingQuote(json, pos + 1);
+ if (nameEnd == -1) break;
+
+ String name = json.substring(pos + 1, nameEnd);
+ pos = nameEnd + 1; // Move past the closing quote
+
+ // Skip whitespace and colon
+ while (pos < json.length() && (Character.isWhitespace(json.charAt(pos)) || json.charAt(pos) == ':')) {
+ pos++;
+ }
+
+ // Expect opening brace for nested object
+ if (pos >= json.length() || json.charAt(pos) != '{') break;
+
+ int objEnd = findClosingBrace(json, pos + 1);
+ if (objEnd == -1) break;
+
+ String nestedContent = json.substring(pos + 1, objEnd);
+ pos = objEnd + 1;
+
+ String streamHandleRaw = extractStringValue(nestedContent, "streamHandle");
+ String contentRaw = extractStringValue(nestedContent, "content");
+
+ if (streamHandleRaw != null) {
+ // Streaming input: look up the InputStreamSession by handle
+ long streamHandle = Long.parseLong(streamHandleRaw);
+ InputStreamSession inputSession = InputStreamSession.get(streamHandle);
+ if (inputSession == null) {
+ throw new RuntimeException("Invalid streamHandle " + streamHandle + " for input '" + name + "'");
+ }
+ String mimeTypeRaw = extractStringValue(nestedContent, "mimeType");
+ if (mimeTypeRaw == null) {
+ mimeTypeRaw = inputSession.getMimeType();
+ }
+ String charsetRaw = extractStringValue(nestedContent, "charset");
+ Charset charset = Charset.forName(charsetRaw != null ? charsetRaw : inputSession.getCharset());
+ Option mimeType = Option.apply(mimeTypeRaw);
+
+ BindingValue bindingValue = new BindingValue(inputSession.getInputStream(), mimeType, Map$.MODULE$.empty(), charset);
+ bindings.addBinding(name, bindingValue);
+
+ } else if (contentRaw != null) {
+ String mimeTypeRaw = extractStringValue(nestedContent, "mimeType");
+ String propertiesRaw = null;
+ if (nestedContent.indexOf("\"properties\": {") != -1) {
+ propertiesRaw = nestedContent.substring(nestedContent.indexOf("\"properties\": {") + 14, nestedContent.lastIndexOf("}") + 1);
+ }
+ String charsetRaw = extractStringValue(nestedContent, "charset");
+
+ Map properties = Map$.MODULE$.empty();
+ if (propertiesRaw != null) {
+ properties = parseJsonProperties(propertiesRaw);
+ }
+ Charset charset = Charset.forName(charsetRaw != null ? charsetRaw : "UTF-8");
+ Option mimeType = Option.apply(mimeTypeRaw);
+
+ byte[] content = Base64.getDecoder().decode(contentRaw);
+ BindingValue bindingValue = new BindingValue(content, mimeType, properties, charset);
+ bindings.addBinding(name, bindingValue);
+
+ }
+ }
+ } catch (Exception e) {
+ System.err.println("Error parsing JSON inputs: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ return bindings;
+ }
+
+ private Map parseJsonProperties(String jsonProperties) {
+ if (jsonProperties == null || jsonProperties.trim().isEmpty()) {
+ return Map$.MODULE$.empty();
+ }
+
+ String json = jsonProperties.trim();
+ if (json.charAt(0) != '{') {
+ throw new IllegalArgumentException("properties must be a JSON object (must start with '{'): " + jsonProperties);
+ }
+
+ int end = findClosingBrace(json, 1);
+ if (end == -1) {
+ throw new IllegalArgumentException("properties must be a valid JSON object (missing closing '}'): " + jsonProperties);
+ }
+
+ // Disallow trailing non-whitespace after the object
+ for (int i = end + 1; i < json.length(); i++) {
+ if (!Character.isWhitespace(json.charAt(i))) {
+ throw new IllegalArgumentException("properties must contain a single JSON object (unexpected trailing content): " + jsonProperties);
+ }
+ }
+
+ Map result = Map$.MODULE$.empty();
+ int pos = 1; // skip '{'
+
+ while (pos < end) {
+ while (pos < end && (Character.isWhitespace(json.charAt(pos)) || json.charAt(pos) == ',')) {
+ pos++;
+ }
+
+ if (pos >= end) {
+ break;
+ }
+
+ if (json.charAt(pos) != '"') {
+ throw new IllegalArgumentException("properties keys must be quoted strings at position " + pos + ": " + jsonProperties);
+ }
+
+ int keyEnd = findClosingQuote(json, pos + 1);
+ if (keyEnd == -1 || keyEnd > end) {
+ throw new IllegalArgumentException("properties has an unterminated key string: " + jsonProperties);
+ }
+
+ String key = unescapeJsonString(json.substring(pos + 1, keyEnd));
+ pos = keyEnd + 1;
+
+ while (pos < end && Character.isWhitespace(json.charAt(pos))) {
+ pos++;
+ }
+ if (pos >= end || json.charAt(pos) != ':') {
+ throw new IllegalArgumentException("properties expected ':' after key '" + key + "': " + jsonProperties);
+ }
+ pos++;
+
+ while (pos < end && Character.isWhitespace(json.charAt(pos))) {
+ pos++;
+ }
+ if (pos >= end) {
+ throw new IllegalArgumentException("properties missing value for key '" + key + "': " + jsonProperties);
+ }
+
+ Object value;
+ char c = json.charAt(pos);
+ if (c == '"') {
+ int valueEnd = findClosingQuote(json, pos + 1);
+ if (valueEnd == -1 || valueEnd > end) {
+ throw new IllegalArgumentException("properties has an unterminated string value for key '" + key + "': " + jsonProperties);
+ }
+ value = unescapeJsonString(json.substring(pos + 1, valueEnd));
+ pos = valueEnd + 1;
+ } else if (c == 't' || c == 'f') {
+ if (json.startsWith("true", pos)) {
+ value = Boolean.TRUE;
+ pos += 4;
+ } else if (json.startsWith("false", pos)) {
+ value = Boolean.FALSE;
+ pos += 5;
+ } else {
+ throw new IllegalArgumentException("properties invalid boolean value for key '" + key + "' at position " + pos + ": " + jsonProperties);
+ }
+ } else if (c == 'n') {
+ throw new IllegalArgumentException("properties values cannot be null (key '" + key + "'): " + jsonProperties);
+ } else if (c == '{' || c == '[') {
+ throw new IllegalArgumentException("properties values must be primitive (string/number/boolean) (key '" + key + "'): " + jsonProperties);
+ } else {
+ int numEnd = pos;
+ while (numEnd < end) {
+ char nc = json.charAt(numEnd);
+ if (nc == ',' || nc == '}' || Character.isWhitespace(nc)) {
+ break;
+ }
+ numEnd++;
+ }
+ String numStr = json.substring(pos, numEnd);
+ if (numStr.isEmpty()) {
+ throw new IllegalArgumentException("properties invalid number value for key '" + key + "' at position " + pos + ": " + jsonProperties);
+ }
+ try {
+ if (numStr.indexOf('.') >= 0 || numStr.indexOf('e') >= 0 || numStr.indexOf('E') >= 0) {
+ value = Double.parseDouble(numStr);
+ } else {
+ value = Long.parseLong(numStr);
+ }
+ } catch (NumberFormatException nfe) {
+ throw new IllegalArgumentException("properties invalid number value for key '" + key + "': " + numStr, nfe);
+ }
+ pos = numEnd;
+ }
+
+ result = (Map) result.$plus(new Tuple2<>(key, value));
+ }
+
+ return result;
+ }
+
+ /**
+ * Parse a JSON string value starting at position (which should be at the opening quote).
+ * Returns the unescaped string content (without quotes).
+ */
+ private String parseString(String json, int startPos) {
+ if (json.charAt(startPos) != '"') return null;
+
+ int endPos = findClosingQuote(json, startPos + 1);
+ if (endPos == -1) return null;
+
+ String escaped = json.substring(startPos + 1, endPos);
+ return unescapeJsonString(escaped);
+ }
+
+ /**
+ * Extract a string value by key from a JSON object content.
+ * Simplified version assuming all values are strings.
+ */
+ private String extractStringValue(String json, String key) {
+ String searchKey = "\"" + key + "\"";
+ int keyPos = json.indexOf(searchKey);
+ if (keyPos == -1) return null;
+
+ // Find the colon after the key
+ int colonPos = keyPos + searchKey.length();
+ while (colonPos < json.length() && json.charAt(colonPos) != ':') {
+ colonPos++;
+ }
+ if (colonPos >= json.length()) return null;
+
+ // Skip whitespace after colon
+ int valueStart = colonPos + 1;
+ while (valueStart < json.length() && Character.isWhitespace(json.charAt(valueStart))) {
+ valueStart++;
+ }
+
+ if (valueStart >= json.length() || json.charAt(valueStart) != '"') return null;
+
+ return parseString(json, valueStart);
+ }
+
+ /**
+ * Find the closing quote, skipping escaped quotes.
+ * Properly handles escaped backslashes.
+ */
+ private int findClosingQuote(String str, int startPos) {
+ for (int i = startPos; i < str.length(); i++) {
+ char c = str.charAt(i);
+
+ if (c == '\\' && i + 1 < str.length()) {
+ // Skip the escaped character
+ i++;
+ } else if (c == '"') {
+ // Found unescaped quote
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Find the closing brace, properly handling nested braces and strings.
+ */
+ private int findClosingBrace(String str, int startPos) {
+ int depth = 1;
+ boolean inString = false;
+
+ for (int i = startPos; i < str.length(); i++) {
+ char c = str.charAt(i);
+
+ if (inString) {
+ if (c == '\\' && i + 1 < str.length()) {
+ i++; // Skip escaped character
+ } else if (c == '"') {
+ inString = false;
+ }
+ } else {
+ if (c == '"') {
+ inString = true;
+ } else if (c == '{') {
+ depth++;
+ } else if (c == '}') {
+ depth--;
+ if (depth == 0) {
+ return i;
+ }
+ }
+ }
+ }
+ return -1;
+ }
+
+ /**
+ * Unescapes JSON string escape sequences.
+ * Order matters: handle \\\\ first to avoid conflicts with other escape sequences.
+ */
+ private String unescapeJsonString(String input) {
+ if (input == null) {
+ return null;
+ }
+ // Use a placeholder for escaped backslashes to avoid conflicts
+ String placeholder = "\u0000"; // null character as temporary placeholder
+ return input
+ .replace("\\\\", placeholder)
+ .replace("\\n", "\n")
+ .replace("\\r", "\r")
+ .replace("\\t", "\t")
+ .replace("\\\"", "\"")
+ .replace(placeholder, "\\");
+ }
+
+ private String escapeJsonString(String input) {
+ if (input == null) {
+ return "";
+ }
+
+ return input
+ .replace("\\", "\\\\")
+ .replace("\"", "\\\"")
+ .replace("\n", "\\n")
+ .replace("\r", "\\r")
+ .replace("\t", "\\t");
+ }
+}
diff --git a/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java b/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java
new file mode 100644
index 0000000..627f172
--- /dev/null
+++ b/native-lib/src/main/java/org/mule/weave/lib/StreamSession.java
@@ -0,0 +1,124 @@
+package org.mule.weave.lib;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Holds an open {@link InputStream} and associated metadata for a streaming script execution.
+ *
+ * Instances are stored in a static registry keyed by a monotonically increasing handle so that
+ * native callers can reference them across {@code @CEntryPoint} invocations.
+ */
+public class StreamSession {
+
+ private static final ConcurrentHashMap SESSIONS = new ConcurrentHashMap<>();
+ private static final AtomicLong NEXT_HANDLE = new AtomicLong(1);
+
+ private final InputStream inputStream;
+ private final String mimeType;
+ private final String charset;
+ private final boolean binary;
+ private final String error;
+
+ StreamSession(InputStream inputStream, String mimeType, String charset, boolean binary) {
+ this.inputStream = inputStream;
+ this.mimeType = mimeType;
+ this.charset = charset;
+ this.binary = binary;
+ this.error = null;
+ }
+
+ private StreamSession(String error) {
+ this.inputStream = null;
+ this.mimeType = null;
+ this.charset = null;
+ this.binary = false;
+ this.error = error;
+ }
+
+ /**
+ * Creates an error session that carries only an error message.
+ *
+ * @param error the error message
+ * @return an error session
+ */
+ public static StreamSession ofError(String error) {
+ return new StreamSession(error);
+ }
+
+ /**
+ * Registers this session and returns its handle.
+ *
+ * @return a unique handle that callers use to reference this session
+ */
+ public long register() {
+ long handle = NEXT_HANDLE.getAndIncrement();
+ SESSIONS.put(handle, this);
+ return handle;
+ }
+
+ /**
+ * Looks up a previously registered session.
+ *
+ * @param handle the handle returned by {@link #register()}
+ * @return the session, or {@code null} if not found
+ */
+ public static StreamSession get(long handle) {
+ return SESSIONS.get(handle);
+ }
+
+ /**
+ * Removes and closes a session.
+ *
+ * @param handle the session handle
+ */
+ public static void close(long handle) {
+ StreamSession session = SESSIONS.remove(handle);
+ if (session != null && session.inputStream != null) {
+ try {
+ session.inputStream.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ /**
+ * Reads up to {@code len} bytes into the provided byte array.
+ *
+ * @param buf destination buffer
+ * @param len maximum number of bytes to read
+ * @return number of bytes actually read, or {@code -1} on EOF
+ * @throws IOException if an I/O error occurs
+ */
+ public int read(byte[] buf, int len) throws IOException {
+ return inputStream.read(buf, 0, len);
+ }
+
+ public String getMimeType() {
+ return mimeType;
+ }
+
+ public String getCharset() {
+ return charset;
+ }
+
+ public boolean isBinary() {
+ return binary;
+ }
+
+ /**
+ * Returns the error message if this is an error session, or {@code null} otherwise.
+ */
+ public String getError() {
+ return error;
+ }
+
+ /**
+ * Returns {@code true} if this session represents a failed execution.
+ */
+ public boolean isError() {
+ return error != null;
+ }
+}
diff --git a/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat
new file mode 100644
index 0000000..b1a22df
--- /dev/null
+++ b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.module.DataFormat
@@ -0,0 +1,9 @@
+org.mule.weave.v2.interpreted.module.WeaveDataFormat
+org.mule.weave.v2.module.core.json.JsonDataFormat
+org.mule.weave.v2.module.core.xml.XmlDataFormat
+org.mule.weave.v2.module.core.csv.CSVDataFormat
+org.mule.weave.v2.module.core.octetstream.OctetStreamDataFormat
+org.mule.weave.v2.module.core.textplain.TextPlainDataFormat
+org.mule.weave.v2.module.core.urlencoded.UrlEncodedDataFormat
+org.mule.weave.v2.module.core.multipart.MultiPartDataFormat
+org.mule.weave.v2.module.core.properties.PropertiesDataFormat
\ No newline at end of file
diff --git a/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader
new file mode 100644
index 0000000..ef3215b
--- /dev/null
+++ b/native-lib/src/main/resources/META-INF/services/org.mule.weave.v2.parser.phase.ModuleLoader
@@ -0,0 +1 @@
+org.mule.weave.v2.compilation.loader.WeaveBinaryResourceModuleLoader
\ No newline at end of file
diff --git a/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java b/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java
new file mode 100644
index 0000000..d6ab1d6
--- /dev/null
+++ b/native-lib/src/test/java/org/mule/weave/lib/ScriptRuntimeTest.java
@@ -0,0 +1,617 @@
+package org.mule.weave.lib;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Base64;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+class ScriptRuntimeTest {
+
+ @Test
+ void runSimpleScript() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Running sqrt(144) 10 times with timing:");
+ System.out.println("=".repeat(50));
+
+ for (int i = 1; i <= 20; i++) {
+ long startTime = System.nanoTime();
+ String result = runtime.run("sqrt(144)");
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ assertEquals("12", Result.parse(result).result);
+ System.out.printf("Run %2d: %.3f ms - Result: %s%n", i, executionTimeMs, result);
+ }
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runParseError() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Running sqrt(144) 10 times with timing:");
+ System.out.println("=".repeat(50));
+
+ String result = runtime.run("invalid syntax here");
+
+ String error = Result.parse(result).error;
+ assertTrue(error.contains("Unable to resolve reference"));
+ System.out.printf("Error: %s%n", result);
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithInputs() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing runWithInputs with two integer numbers:");
+ System.out.println("=".repeat(50));
+
+ // Test 1: Sum 25 + 17
+ int num1 = 25;
+ int num2 = 17;
+ int expected = num1 + num2;
+
+ // Create inputs JSON with content and mimeType for each binding
+ String inputsJson = String.format(
+ "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " +
+ "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(num1), encode(num2)
+ );
+
+ String script = "num1 + num2";
+
+ System.out.printf("Test 1: %d + %d%n", num1, num2);
+ System.out.printf("Script: %s%n", script);
+ System.out.printf("Inputs: %s%n", inputsJson);
+
+ long startTime = System.nanoTime();
+ String result = Result.parse(runtime.run(script, inputsJson)).result;
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: %d%n", expected);
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ assertEquals(String.valueOf(expected), result);
+ System.out.println("✓ Test 1 passed!");
+
+ System.out.println("-".repeat(50));
+
+ // Test 2: Sum 100 + 250
+ num1 = 100;
+ num2 = 250;
+ expected = num1 + num2;
+
+ inputsJson = String.format(
+ "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " +
+ "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(num1), encode(num2)
+ );
+
+ System.out.printf("Test 2: %d + %d%n", num1, num2);
+ System.out.printf("Script: %s%n", script);
+
+ startTime = System.nanoTime();
+ result = Result.parse(runtime.run(script, inputsJson)).result;
+ endTime = System.nanoTime();
+ executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: %d%n", expected);
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ assertEquals(String.valueOf(expected), result);
+ System.out.println("✓ Test 2 passed!");
+
+ System.out.println("=".repeat(50));
+ }
+
+ private String encode(Object value) {
+ byte[] bytes = value instanceof byte[] ? (byte[]) value : String.valueOf(value).getBytes();
+ return Base64.getEncoder().encodeToString(bytes);
+
+ }
+
+ @Test
+ void runWithXmlInput() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing runWithInputs with XML input to calculate average age:");
+ System.out.println("=".repeat(50));
+
+ // XML input with two people
+ String xmlInput = """
+
+
+ 19
+ john
+
+
+ 25
+ jane
+
+
+ """;
+
+ String inputsJson = String.format(
+ "{\"people\": {\"content\": \"%s\", \"mimeType\": \"application/xml\"}}",
+ encode(xmlInput)
+ );
+
+ // DataWeave script to calculate average age
+ String script = """
+ output application/json
+ ---
+ avg(people.people.*person.age)
+ """;
+
+ System.out.printf("XML Input:%n%s%n", xmlInput);
+ System.out.printf("Script:%n%s%n", script);
+
+ long startTime = System.nanoTime();
+ String result = runtime.run(script, inputsJson);
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: 22 (average of 19 and 25)%n");
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ // The average of 19 and 25 is 22
+ assertEquals("22", Result.parse(result).result);
+ System.out.println("✓ Test passed!");
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithJsonObjectInput() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing runWithInputs with JSON object input:");
+ System.out.println("=".repeat(50));
+
+ String jsonInput = "{\"name\": \"John\", \"age\": 30}";
+
+ String inputsJson = String.format(
+ "{\"payload\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(jsonInput)
+ );
+
+ // DataWeave script to extract name
+ String script = "output application/json\n---\npayload.name";
+
+ System.out.printf("JSON Input: %s%n", jsonInput);
+ System.out.printf("Script: %s%n", script);
+
+ long startTime = System.nanoTime();
+ String result = Result.parse(runtime.run(script, inputsJson)).result;
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ System.out.printf("Result: %s%n", result);
+ System.out.printf("Expected: \"John\"%n");
+ System.out.printf("Execution time: %.3f ms%n", executionTimeMs);
+
+ assertEquals("\"John\"", result);
+ System.out.println("✓ Test passed!");
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithBinaryResult() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Running fromBase64 10 times with timing:");
+ System.out.println("=".repeat(50));
+
+ for (int i = 1; i <= 1; i++) {
+ long startTime = System.nanoTime();
+ Result result = Result.parse(runtime.run("import fromBase64 from dw::core::Binaries\n" +
+ "output application/octet-stream\n" +
+ "---\n" +
+ "fromBase64(\"12345678\")", ""));
+ long endTime = System.nanoTime();
+ double executionTimeMs = (endTime - startTime) / 1_000_000.0;
+
+ assertEquals("12345678", result.result);
+ System.out.printf("Run %2d: %.3f ms - Result: %s%n", i, executionTimeMs, result.result);
+ }
+
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void runWithInputProperties() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+ String encodedIn0 = Base64.getEncoder().encodeToString("1234567".getBytes());
+ Result result = Result.parse(runtime.run("in0.column_1[0] as Number",
+ "{\"in0\": " +
+ "{\"content\": \"" + encodedIn0 + "\", " +
+ "\"mimeType\": \"application/csv\", " +
+ "\"properties\": {\"header\": false, \"separator\": \"4\"}}}"));
+ assertEquals("567", result.result);
+
+ }
+
+ @Test
+ void streamSimpleScript() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming simple script:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("sqrt(144)", null);
+ assertFalse(session.isError(), "Expected successful session");
+ assertNull(session.getError());
+ assertNotNull(session.getMimeType());
+
+ byte[] buf = new byte[64];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("12", result);
+ StreamSession.close(session.register()); // clean up handle
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming simple script passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamWithInputs() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming with inputs:");
+ System.out.println("=".repeat(50));
+
+ String inputsJson = String.format(
+ "{\"num1\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}, " +
+ "\"num2\": {\"content\": \"%s\", \"mimeType\": \"application/json\"}}",
+ encode(25), encode(17)
+ );
+
+ StreamSession session = runtime.runStreaming("num1 + num2", inputsJson);
+ assertFalse(session.isError());
+
+ byte[] buf = new byte[64];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("42", result);
+ StreamSession.close(session.register());
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming with inputs passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamChunkedRead() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming chunked read:");
+ System.out.println("=".repeat(50));
+
+ String script = "output application/json\n---\n{items: (1 to 100) map {id: $, name: \"item_\" ++ $}}";
+
+ StreamSession session = runtime.runStreaming(script, null);
+ assertFalse(session.isError());
+
+ byte[] smallBuf = new byte[32];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ int chunkCount = 0;
+ while ((n = session.read(smallBuf, smallBuf.length)) > 0) {
+ bos.write(smallBuf, 0, n);
+ chunkCount++;
+ }
+ String result = bos.toString(session.getCharset());
+ assertTrue(chunkCount > 1, "Expected multiple chunks, got " + chunkCount);
+ assertTrue(result.contains("item_1"));
+ assertTrue(result.contains("item_100"));
+ StreamSession.close(session.register());
+
+ System.out.printf("Read %d chunks, total %d bytes%n", chunkCount, bos.size());
+ System.out.println("✓ Streaming chunked read passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamWithStreamingInput() throws Exception {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming with streaming input:");
+ System.out.println("=".repeat(50));
+
+ // Create an input stream session for JSON data
+ InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8");
+ long inputHandle = inputSession.register();
+
+ // Build inputs JSON referencing the streamHandle
+ String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}";
+
+ // The DW engine will read from the PipedInputStream on the main thread,
+ // so we must feed data from a separate thread.
+ CountDownLatch started = new CountDownLatch(1);
+ AtomicReference feedError = new AtomicReference<>();
+
+ Thread feeder = new Thread(() -> {
+ try {
+ started.countDown();
+ String jsonData = "{\"name\": \"Alice\", \"age\": 30}";
+ byte[] bytes = jsonData.getBytes("UTF-8");
+ inputSession.write(bytes, bytes.length);
+ inputSession.closeWriter();
+ } catch (Exception e) {
+ feedError.set(e);
+ }
+ });
+ feeder.start();
+ started.await();
+
+ // Run streaming with the piped input
+ StreamSession session = runtime.runStreaming("output application/json\n---\npayload.name", inputsJson);
+ assertFalse(session.isError(), "Expected successful session but got: " + session.getError());
+
+ byte[] buf = new byte[64];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("\"Alice\"", result);
+ StreamSession.close(session.register());
+ InputStreamSession.close(inputHandle);
+ feeder.join(5000);
+ assertNull(feedError.get(), "Feeder thread threw: " + feedError.get());
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming with streaming input passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamWithLargeStreamingInput() throws Exception {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming with large streaming input:");
+ System.out.println("=".repeat(50));
+
+ InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8");
+ long inputHandle = inputSession.register();
+
+ String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}";
+
+ // Feed a large JSON array from a separate thread
+ AtomicReference feedError = new AtomicReference<>();
+ Thread feeder = new Thread(() -> {
+ try {
+ StringBuilder sb = new StringBuilder("[");
+ for (int i = 1; i <= 1000; i++) {
+ if (i > 1) sb.append(",");
+ sb.append("{\"id\":").append(i).append(",\"val\":\"item_").append(i).append("\"}");
+ }
+ sb.append("]");
+ byte[] bytes = sb.toString().getBytes("UTF-8");
+ // Write in chunks to simulate streaming
+ int chunkSize = 4096;
+ for (int off = 0; off < bytes.length; off += chunkSize) {
+ int len = Math.min(chunkSize, bytes.length - off);
+ byte[] chunk = new byte[len];
+ System.arraycopy(bytes, off, chunk, 0, len);
+ inputSession.write(chunk, len);
+ }
+ inputSession.closeWriter();
+ } catch (Exception e) {
+ feedError.set(e);
+ }
+ });
+ feeder.start();
+
+ StreamSession session = runtime.runStreaming("output application/json\n---\nsizeOf(payload)", inputsJson);
+ assertFalse(session.isError(), "Expected successful session but got: " + session.getError());
+
+ byte[] buf = new byte[256];
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ bos.write(buf, 0, n);
+ }
+ String result = bos.toString(session.getCharset());
+ assertEquals("1000", result);
+ StreamSession.close(session.register());
+ InputStreamSession.close(inputHandle);
+ feeder.join(10000);
+ assertNull(feedError.get(), "Feeder thread threw: " + feedError.get());
+
+ System.out.println("Result: " + result);
+ System.out.println("✓ Streaming with large streaming input passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void streamErrorSession() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing streaming error session:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("invalid syntax here", null);
+ assertTrue(session.isError());
+ assertNotNull(session.getError());
+ assertTrue(session.getError().contains("Unable to resolve reference"));
+
+ System.out.println("Error: " + session.getError());
+ System.out.println("✓ Streaming error session passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ // ── Callback-based streaming pattern tests ──────────────────────────
+
+ @Test
+ void callbackOutputStreaming() throws IOException {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing callback-based output streaming:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("output application/json\n---\n{items: (1 to 50) map {id: $}}", null);
+ assertFalse(session.isError());
+
+ // Simulate the write-callback pattern: read chunks and collect them
+ ByteArrayOutputStream collected = new ByteArrayOutputStream();
+ byte[] buf = new byte[64];
+ int callbackCount = 0;
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ // This is what the write callback would receive
+ collected.write(buf, 0, n);
+ callbackCount++;
+ }
+ String result = collected.toString(session.getCharset());
+ assertTrue(result.contains("\"id\": 1"), "Expected id 1 in result");
+ assertTrue(result.contains("\"id\": 50"), "Expected id 50 in result");
+ assertTrue(callbackCount > 0, "Expected at least one callback invocation");
+ StreamSession.close(session.register());
+
+ System.out.printf("Callback invoked %d times, total %d bytes%n", callbackCount, collected.size());
+ System.out.println("✓ Callback output streaming passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void callbackInputOutputStreaming() throws Exception {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing callback-based input+output streaming:");
+ System.out.println("=".repeat(50));
+
+ // Simulate the read-callback pattern: a feeder thread pulls from a data source
+ // and pushes into an InputStreamSession, while the main thread reads the output.
+ InputStreamSession inputSession = new InputStreamSession("application/json", "UTF-8");
+ long inputHandle = inputSession.register();
+
+ String inputsJson = "{\"payload\": {\"streamHandle\": \"" + inputHandle + "\", \"mimeType\": \"application/json\"}}";
+
+ // Simulate read callback: feeds a JSON array in chunks
+ byte[] sourceData = "[10, 20, 30, 40, 50]".getBytes("UTF-8");
+ AtomicReference feedError = new AtomicReference<>();
+
+ Thread feeder = new Thread(() -> {
+ try {
+ int chunkSize = 8;
+ for (int off = 0; off < sourceData.length; off += chunkSize) {
+ int len = Math.min(chunkSize, sourceData.length - off);
+ byte[] chunk = new byte[len];
+ System.arraycopy(sourceData, off, chunk, 0, len);
+ inputSession.write(chunk, len);
+ }
+ inputSession.closeWriter();
+ } catch (Exception e) {
+ feedError.set(e);
+ }
+ }, "test-read-callback-feeder");
+ feeder.start();
+
+ StreamSession session = runtime.runStreaming("output application/json\n---\npayload map ($ * 2)", inputsJson);
+ assertFalse(session.isError(), "Expected successful session but got: " + session.getError());
+
+ // Simulate write callback: collect output chunks
+ ByteArrayOutputStream collected = new ByteArrayOutputStream();
+ byte[] buf = new byte[32];
+ int callbackCount = 0;
+ int n;
+ while ((n = session.read(buf, buf.length)) > 0) {
+ collected.write(buf, 0, n);
+ callbackCount++;
+ }
+ String result = collected.toString(session.getCharset());
+ assertTrue(result.contains("20"), "Expected 20 in result (10*2)");
+ assertTrue(result.contains("100"), "Expected 100 in result (50*2)");
+
+ StreamSession.close(session.register());
+ InputStreamSession.close(inputHandle);
+ feeder.join(5000);
+ assertNull(feedError.get(), "Feeder thread threw: " + feedError.get());
+
+ System.out.printf("Read callback fed %d bytes, write callback invoked %d times, output: %s%n",
+ sourceData.length, callbackCount, result.trim());
+ System.out.println("✓ Callback input+output streaming passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ @Test
+ void callbackOutputStreamingError() {
+ ScriptRuntime runtime = ScriptRuntime.getInstance();
+
+ System.out.println("Testing callback-based output streaming with error:");
+ System.out.println("=".repeat(50));
+
+ StreamSession session = runtime.runStreaming("invalid syntax here", null);
+ assertTrue(session.isError());
+ assertNotNull(session.getError());
+
+ System.out.println("Error correctly returned: " + session.getError());
+ System.out.println("✓ Callback output streaming error passed!");
+ System.out.println("=".repeat(50));
+ }
+
+ static class Result {
+ boolean success;
+ String result;
+ String error;
+ boolean binary;
+ String mimeType;
+ String charset;
+
+ static Result parse(String json) {
+ Result result = new Result();
+
+ String successString = json.substring(json.indexOf(":") + 1, json.indexOf(","));
+ result.success = Boolean.parseBoolean(successString);
+ if (result.success) {
+ String binaryString = json.substring(json.indexOf(",\"binary\":") + 10, json.indexOf("}"));
+ result.binary = Boolean.parseBoolean(binaryString);
+ String resultString = json.substring(json.indexOf(",\"result\":") + 11, json.indexOf(",\"mimeType\":")-1);
+ String mimeTypeString = json.substring(json.indexOf(",\"mimeType\":") + 13, json.indexOf(",\"charset\":")-1);
+ result.mimeType = mimeTypeString;
+ String charsetString = json.substring(json.indexOf(",\"charset\":") + 12, json.indexOf(",\"binary\":")-1);
+ result.charset = charsetString;
+ if (result.binary) {
+ result.result = resultString;
+ } else {
+ result.result = new String(Base64.getDecoder().decode(resultString), Charset.forName(result.charset));
+ }
+
+ } else {
+ result.error = json.substring(json.indexOf(",\"error\":") + 10, json.length()-2);
+ }
+ return result;
+ }
+ }
+
+}
diff --git a/settings.gradle b/settings.gradle
index a47c02c..befbb96 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,3 +1,3 @@
include 'native-cli'
include 'native-cli-integration-tests'
-
+include 'native-lib'