Skip to content

WaveSpeedAI Python Client — Official Python SDK for WaveSpeedAI inference platform. This library provides a clean, unified, and high-performance API and serverless integration layer for your applications. Effortlessly connect to all WaveSpeedAI models and inference services with zero infrastructure overhead.

License

Notifications You must be signed in to change notification settings

WaveSpeedAI/wavespeed-python

Repository files navigation

WaveSpeedAI logo

WaveSpeedAI Python SDK

Official Python SDK for the WaveSpeedAI inference platform

🌐 Visit wavespeed.ai📖 Documentation💬 Issues


Installation

pip install wavespeed

API Client

Run WaveSpeed AI models with a simple API:

import wavespeed

output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    input={"prompt": "Cat"},
)

print(output["outputs"][0])  # Output URL

Authentication

Set your API key via environment variable (You can get your API key from https://wavespeed.ai/accesskey):

export WAVESPEED_API_KEY="your-api-key"

Or pass it directly:

from wavespeed import Client

client = Client(api_key="your-api-key")
output = client.run("wavespeed-ai/z-image/turbo", input={"prompt": "Cat"})

Options

output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    input={"prompt": "Cat"},
    timeout=36000.0,       # Max wait time in seconds (default: 36000.0)
    poll_interval=1.0,     # Status check interval (default: 1.0)
    enable_sync_mode=False, # Single request mode, no polling (default: False)
)

Sync Mode

Use enable_sync_mode=True for a single request that waits for the result (no polling).

Note: Not all models support sync mode. Check the model documentation for availability.

output = wavespeed.run(
    "wavespeed-ai/z-image/turbo",
    input={"prompt": "Cat"},
    enable_sync_mode=True,
)

Retry Configuration

Configure retries at the client level:

from wavespeed import Client

client = Client(
    api_key="your-api-key",
    max_retries=0,            # Task-level retries (default: 0)
    max_connection_retries=5, # HTTP connection retries (default: 5)
    retry_interval=1.0,       # Base delay between retries in seconds (default: 1.0)
)

Upload Files

Upload images, videos, or audio files:

import wavespeed

url = wavespeed.upload("/path/to/image.png")
print(url)

Serverless Worker

Build serverless workers for the WaveSpeed platform.

Basic Handler

import wavespeed.serverless as serverless

def handler(job):
    job_input = job["input"]
    result = job_input.get("prompt", "").upper()
    return {"output": result}

serverless.start({"handler": handler})

Async Handler

import wavespeed.serverless as serverless

async def handler(job):
    job_input = job["input"]
    result = await process_async(job_input)
    return {"output": result}

serverless.start({"handler": handler})

Generator Handler (Streaming)

import wavespeed.serverless as serverless

def handler(job):
    for i in range(10):
        yield {"progress": i, "partial": f"chunk-{i}"}

serverless.start({"handler": handler})

Input Validation

from wavespeed.serverless.utils import validate

INPUT_SCHEMA = {
    "prompt": {"type": str, "required": True},
    "max_tokens": {"type": int, "required": False, "default": 100},
    "temperature": {
        "type": float,
        "required": False,
        "default": 0.7,
        "constraints": lambda x: 0 <= x <= 2,
    },
}

def handler(job):
    result = validate(job["input"], INPUT_SCHEMA)
    if "errors" in result:
        return {"error": result["errors"]}

    validated = result["validated_input"]
    # process with validated input...
    return {"output": "done"}

Concurrent Execution

Enable concurrent job processing with concurrency_modifier:

import wavespeed.serverless as serverless

def handler(job):
    return {"output": job["input"]["data"]}

def concurrency_modifier(current_concurrency):
    return 2  # Process 2 jobs concurrently

serverless.start({
    "handler": handler,
    "concurrency_modifier": concurrency_modifier
})

Local Development

Test with JSON Input

# Using CLI argument
python handler.py --test_input '{"input": {"prompt": "hello"}}'

# Using test_input.json file (auto-detected)
echo '{"input": {"prompt": "hello"}}' > test_input.json
python handler.py

Running Tests

# Run all tests
python -m pytest

# Run a single test file
python -m pytest tests/test_api.py

# Run a specific test
python -m pytest tests/test_api.py::TestClient::test_run_success -v

FastAPI Development Server

python handler.py --waverless_serve_api --waverless_api_port 8000

Then use the interactive Swagger UI at http://localhost:8000/ or make requests:

# Synchronous execution
curl -X POST http://localhost:8000/runsync \
  -H "Content-Type: application/json" \
  -d '{"input": {"prompt": "hello"}}'

# Async execution
curl -X POST http://localhost:8000/run \
  -H "Content-Type: application/json" \
  -d '{"input": {"prompt": "hello"}}'

CLI Options

Option Description
--test_input JSON Run locally with JSON test input
--waverless_serve_api Start FastAPI development server
--waverless_api_host HOST API server host (default: localhost)
--waverless_api_port PORT API server port (default: 8000)
--waverless_log_level LEVEL Log level (DEBUG, INFO, WARN, ERROR)

Environment Variables

API Client

Variable Description
WAVESPEED_API_KEY WaveSpeed API key

Serverless Worker

Variable Description
WAVERLESS_POD_ID Worker/pod identifier
WAVERLESS_API_KEY API authentication key
WAVERLESS_WEBHOOK_GET_JOB Job fetch endpoint
WAVERLESS_WEBHOOK_POST_OUTPUT Result submission endpoint

License

MIT

About

WaveSpeedAI Python Client — Official Python SDK for WaveSpeedAI inference platform. This library provides a clean, unified, and high-performance API and serverless integration layer for your applications. Effortlessly connect to all WaveSpeedAI models and inference services with zero infrastructure overhead.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •