Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 172 additions & 0 deletions examples/boundary-pushing-wow/wow-end2end-enterprise/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@

---

# 🚀 End-to-End Enterprise Workflow

### **Environment:** Saturn Cloud | **Stack:** Python 3.10, Feast, Spark, MLflow, FastAPI

This template provides a production-grade blueprint for an **Ingest → Feature → Train → Serve** machine learning pipeline. It is optimized for the [Saturn Cloud](https://saturncloud.io/) environment to ensure scalability and reproducibility through automated environment management.

---

## 📂 Directory Structure

The following tree represents the organization of your workspace:

```text
/workspace
├── data/ # Local storage for raw and processed features
│ ├── user_features.parquet # Feature data generated by Spark
│ └── online_store.db # SQLite Online Store for Feast real-time serving
├── feature_repo/ # Feast Feature Store Repository
│ ├── data/ # Feast local registry and metadata storage
│ ├── definitions.py # Feature definitions (Entities, Feature Views)
│ ├── feature_store.yaml # Feast infrastructure configuration
│ ├── registry.db # Centralized feature registry database
│ └── online_store.db # Online store for Feast real-time retrieval
├── mlruns/ # MLflow Experiment Tracking
│ └── 1/ # Experiment ID 1 artifacts and models
├── src/ # Source Code
│ ├── ingestion.py # PySpark Data Ingestion logic
│ ├── training.py # Model Training & MLflow Logging
│ └── main.py # FastAPI Serving with Auto-Discovery
├── requirements.txt # Python library dependencies
├── setup_saturn.sh # Automated environment setup script
├── run_workflow.sh # One-click E2E pipeline execution script
└── virt-env/ # Python 3.10 Virtual Environment

```

---

## 🏗️ Architecture & Workflow

The system follows a modular MLOps pattern to ensure data consistency and model reliability:

1. **Data Ingestion (PySpark)**: Transforms raw events into structured features.
2. **Feature Store (Feast)**: Ensures feature consistency between offline training and online serving.
3. **Experiment Tracking (MLflow)**: Logs every training run and manages model artifacts.
4. **Model Serving (FastAPI)**: A REST API with **Auto-Discovery** logic to load the latest model from MLflow automatically.

---

## 🛠️ Automated Setup (`setup_saturn.sh`)

We have automated the system-level and environment preparation to handle the specific version requirements (Python 3.10 and Java 11).

### **1. Run the Setup Script**

Run the following commands to install system dependencies, configure Java, and build the virtual environment:

```bash
chmod +x setup_saturn.sh
./setup_saturn.sh
source virt-env/bin/activate

```

### **2. Virtual Environment Details**

* **Python Version**: 3.10 (Required for Feast/Dask stability).
* **Java Version**: OpenJDK 11 (Required for the PySpark JVM gateway).

---

## 🚀 Execution Guide

### **Option A: Automated One-Click Run**

For a fresh run that cleans old data and executes the entire pipeline automatically, use the workflow script:

```bash
chmod +x run_workflow.sh
./run_workflow.sh

```

### **Option B: Manual Step-by-Step Run**

1. **Ingest Features**: `python src/ingestion.py`.
2. **Initialize Feast**:
```bash
cd feature_repo && feast apply
feast materialize-incremental $(date -u +"%Y-%m-%dT%H:%M:%S") && cd ..

```


3. **Train Model**: `python src/training.py`.
4. **Start API**: `python src/main.py`.

---

## 🌐 Accessing the API

Visit **`http://localhost:8000/docs`** within your Saturn Cloud environment to access the interactive Swagger UI for testing predictions.]

---

## 🧪 API Testing Script (`src/test_api.py`)

This script allows you to verify that the entire pipeline—from Feast feature retrieval to MLflow model inference—is working correctly without using a browser.

---

### 🛠️ Final README Section: Troubleshooting

Add this to the end of your **README.md** to handle common environment issues on Saturn Cloud.

## 🛠️ Troubleshooting

| Issue | Cause | Resolution |
| --- | --- | --- |
| **`503 Service Unavailable`** | Model not loaded in FastAPI. | Ensure `src/training.py` finished and `mlruns/` exists in the root. |
| **`TypeError: descriptor...`** | Wrong Python version. | Ensure you are using **Python 3.10** (`python --version`). |
| **`Java Not Found`** | Spark cannot find JVM. | Run `setup_saturn.sh` to install OpenJDK 11 and set `JAVA_HOME`. |
| **`Port 8000 Occupied`** | Multiple API instances. | Run `pkill -f uvicorn` to stop old processes before restarting. |
| **`Feast Materialization Error`** | Online store sync failed. | Delete `feature_repo/online_store.db` and re-run `feast apply`. |

---

### 🚀 Final Verification

1. **Execute** the full pipeline:

```bash
chmod +x run_workflow.sh
./run_workflow.sh

```

2. **Run the test script** in a second terminal (while the API is running):

```bash
source virt-env/bin/activate
python src/test_api.py

```

---


## 🔗 Resources

* **Platform**: [Saturn Cloud Website](https://saturncloud.io/)
* **Documentation**: [Saturn Cloud Official Docs](https://saturncloud.io/docs/)
* **Knowledge Base**: [Building MLOps on Saturn Cloud](https://saturncloud.io/docs/user-guide/how-to/resources/deployments/)

---











#

**Would you like me to show you how to wrap all of this into a single Git commit and push it to your repository?**
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from datetime import timedelta
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64

# Define the 'Entity' (The object we are tracking)
user = Entity(name="user_id", join_keys=["user_id"])

# Define the 'Source' (The Spark output)
user_source = FileSource(
path="/workspace/data/user_features.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)

# Define the 'Feature View'
user_stats_view = FeatureView(
name="user_stats",
entities=[user],
ttl=timedelta(days=1),
schema=[
Field(name="total_spend", dtype=Float32),
],
online=True,
source=user_source,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
project: enterprise_workflow
registry: data/registry.db
provider: local
online_store:
type: sqlite
path: data/online_store.db
entity_key_serialization_version: 2
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# --- Data Engineering & Feature Store ---
pyspark==3.5.1
feast[pyspark]==0.40.0
pyarrow # Efficient Parquet handling for Spark/Feast

# --- Experiment Tracking & ML ---
mlflow
scikit-learn # Standard CPU training
pandas<2.0.0 # Required for Feast registry compatibility

# --- Serving & API ---
fastapi
uvicorn[standard] # High-performance ASGI server
pydantic # Data validation for API schemas

# --- Hardware Acceleration (CPU/GPU) ---
xgboost # Supports both CPU and GPU training
torch # Deep learning support for GPU

# --- Utilities ---
python-multipart # For FastAPI form handling
boto3 # For S3/Managed store integration
dask
distributed
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/bash
# 1. Clean previous runs
rm -rf data/*.db data/*.parquet mlruns/

# 2. Source environment
source virt-env/bin/activate

# 3. Ingest: Raw Data -> Spark -> Parquet
echo "--- Phase 1: Ingesting Data with Spark ---"
python src/ingestion.py

# 4. Feature: Register with Feast
echo "--- Phase 2: Registering Features with Feast ---"
cd feature_repo
feast apply
# Materialize data into the Online Store (SQLite) for FastAPI
CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
feast materialize-incremental $CURRENT_TIME
cd ..

# 5. Train: Feast -> MLflow
echo "--- Phase 3: Training Model with MLflow ---"
python src/training.py

# 6. Serve: MLflow + Feast -> FastAPI
echo "--- Phase 4: Launching FastAPI Serving Layer ---"
echo "API will be available at http://localhost:8000"
echo "Check Swagger Docs at http://localhost:8000/docs"
python src/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/bin/bash

# 1. Update and install system-level dependencies
echo "📦 Installing system dependencies (Python 3.10 and Java 11)..."
apt update
apt install -y software-properties-common
add-apt-repository ppa:deadsnakes/ppa -y
apt install -y python3.10 python3.10-venv python3.10-dev openjdk-11-jdk build-essential libffi-dev libssl-dev

# 2. Setup Java environment variables for Spark
echo "☕ Configuring JAVA_HOME..."
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
echo "export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64" >> ~/.bashrc

# 3. Create and configure the Virtual Environment
echo "🐍 Creating Python 3.10 virtual environment..."
python3.10 -m venv virt-env
source virt-env/bin/activate

# 4. Install Python libraries
echo "🚀 Installing Python requirements..."
pip install --upgrade pip
pip install -r requirements.txt

echo "✅ Setup complete! Run 'source virt-env/bin/activate' to start."
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from datetime import datetime, timedelta

def run_ingestion():
spark = SparkSession.builder.appName("EnterpriseIngestion").getOrCreate()

# 1. Generate Synthetic Data
data = [
(101, 50.5, datetime.now() - timedelta(days=1)),
(101, 150.0, datetime.now() - timedelta(hours=2)),
(102, 20.0, datetime.now() - timedelta(days=2)),
]
columns = ["user_id", "amount", "event_timestamp"]
df = spark.createDataFrame(data, columns)

# 2. Feature Engineering: Calculate total spend per user
# Feast requires an 'event_timestamp' for time-travel
features_df = df.groupBy("user_id").agg(
F.sum("amount").alias("total_spend"),
F.max("event_timestamp").alias("event_timestamp")
).withColumn("created_timestamp", F.current_timestamp())

# 3. Save to Offline Store (Parquet)
output_path = "/workspace/data/user_features.parquet"
features_df.write.mode("overwrite").parquet(output_path)
print(f"✅ Features ingested to {output_path}")

if __name__ == "__main__":
run_ingestion()
Loading
Loading