diff --git a/SKILL.md b/SKILL.md index 1f84117..dd370be 100644 --- a/SKILL.md +++ b/SKILL.md @@ -1,6 +1,6 @@ --- name: temporal-developer -description: This skill should be used when the user asks to "create a Temporal workflow", "write a Temporal activity", "debug stuck workflow", "fix non-determinism error", "Temporal Python", "Temporal TypeScript", "workflow replay", "activity timeout", "signal workflow", "query workflow", "worker not starting", "activity keeps retrying", "Temporal heartbeat", "continue-as-new", "child workflow", "saga pattern", "workflow versioning", "durable execution", "reliable distributed systems", or mentions Temporal SDK development. +description: This skill should be used when the user asks to "create a Temporal workflow", "write a Temporal activity", "debug stuck workflow", "fix non-determinism error", "Temporal Python", "Temporal TypeScript", "Temporal Go", "Temporal Golang", "workflow replay", "activity timeout", "signal workflow", "query workflow", "worker not starting", "activity keeps retrying", "Temporal heartbeat", "continue-as-new", "child workflow", "saga pattern", "workflow versioning", "durable execution", "reliable distributed systems", or mentions Temporal SDK development. Provides multi-language guidance for Python, TypeScript, and Go. version: 1.0.0 --- @@ -8,7 +8,7 @@ version: 1.0.0 ## Overview -Temporal is a durable execution platform that makes workflows survive failures automatically. This skill provides guidance for building Temporal applications in Python and TypeScript. +Temporal is a durable execution platform that makes workflows survive failures automatically. This skill provides guidance for building Temporal applications in Python, TypeScript, and Go. ## Core Architecture @@ -59,15 +59,17 @@ See `references/core/determinism.md` for detailed explanation. ## Determinism Quick Reference -| Forbidden | Python | TypeScript | -|-----------|--------|------------| -| Current time | `workflow.now()` | `Date.now()` (auto-replaced) | -| Random | `workflow.random()` | `Math.random()` (auto-replaced) | -| UUID | `workflow.uuid4()` | `uuid4()` from workflow | -| Sleep | `asyncio.sleep()` | `sleep()` from workflow | +| Forbidden | Python | TypeScript | Go | +|-----------|--------|------------|-----| +| Current time | `workflow.now()` | `Date.now()` (auto-replaced) | `workflow.Now(ctx)` | +| Random | `workflow.random()` | `Math.random()` (auto-replaced) | `workflow.SideEffect()` | +| UUID | `workflow.uuid4()` | `uuid4()` from workflow | `workflow.SideEffect()` | +| Sleep | `asyncio.sleep()` | `sleep()` from workflow | `workflow.Sleep(ctx, d)` | +| Concurrency | N/A (async) | N/A (async) | `workflow.Go(ctx, fn)` | **Python sandbox**: Explicit protection, use `workflow.unsafe.imports_passed_through()` for libraries **TypeScript sandbox**: V8 isolation, automatic replacements, use type-only imports for activities +**Go**: No sandbox - use `workflowcheck` static analysis tool and code review ## Language Selection @@ -85,16 +87,24 @@ See `references/core/determinism.md` for detailed explanation. - Webpack bundling for workflows - See `references/typescript/typescript.md` +### Go +- Regular functions with `workflow.Context` or `context.Context` +- No sandbox - determinism via code review and static analysis +- Use `workflow.Go()` instead of `go` keyword +- Use `workflow.Channel` instead of Go channels +- Run `workflowcheck ./...` in CI +- See `references/go/go.md` + ## Pattern Index -| Pattern | Use Case | Python | TypeScript | -|---------|----------|--------|------------| -| **Signals** | Fire-and-forget events to running workflow | `references/python/patterns.md` | `references/typescript/patterns.md` | -| **Queries** | Read-only state inspection | `references/python/patterns.md` | `references/typescript/patterns.md` | -| **Updates** | Synchronous state modification with response | `references/python/patterns.md` | `references/typescript/patterns.md` | -| **Child Workflows** | Break down large workflows, isolate failures | `references/python/patterns.md` | `references/typescript/patterns.md` | -| **Continue-as-New** | Prevent unbounded history growth | `references/python/advanced-features.md` | `references/typescript/advanced-features.md` | -| **Saga** | Distributed transactions with compensation | `references/python/patterns.md` | `references/typescript/patterns.md` | +| Pattern | Use Case | Python | TypeScript | Go | +|---------|----------|--------|------------|-----| +| **Signals** | Fire-and-forget events to running workflow | `references/python/patterns.md` | `references/typescript/patterns.md` | `references/go/patterns.md` | +| **Queries** | Read-only state inspection | `references/python/patterns.md` | `references/typescript/patterns.md` | `references/go/patterns.md` | +| **Updates** | Synchronous state modification with response | `references/python/patterns.md` | `references/typescript/patterns.md` | `references/go/advanced-features.md` | +| **Child Workflows** | Break down large workflows, isolate failures | `references/python/patterns.md` | `references/typescript/patterns.md` | `references/go/patterns.md` | +| **Continue-as-New** | Prevent unbounded history growth | `references/python/advanced-features.md` | `references/typescript/advanced-features.md` | `references/go/patterns.md` | +| **Saga** | Distributed transactions with compensation | `references/python/patterns.md` | `references/typescript/patterns.md` | `references/go/patterns.md` | ## Troubleshooting Quick Reference @@ -157,6 +167,18 @@ See `references/core/versioning.md` for concepts, language-specific files for im - **`references/typescript/advanced-features.md`** - Sinks, updates, schedules and more - **`references/typescript/gotchas.md`** - TypeScript-specific anti-patterns +### Go References +- **`references/go/go.md`** - Go SDK overview, quick start +- **`references/go/determinism.md`** - workflowcheck, safe alternatives, concurrency +- **`references/go/patterns.md`** - Go pattern implementations (signals, queries, saga) +- **`references/go/testing.md`** - TestWorkflowEnvironment, mocking, replay testing +- **`references/go/error-handling.md`** - ApplicationError, retry policies, idempotency +- **`references/go/data-handling.md`** - Data converters, protobuf, encryption +- **`references/go/observability.md`** - Logging, metrics, tracing, Search Attributes +- **`references/go/versioning.md`** - GetVersion API, Worker Versioning +- **`references/go/advanced-features.md`** - Continue-as-new, updates, schedules, interceptors +- **`references/go/gotchas.md`** - Go-specific anti-patterns + ## Feedback If this skill's explanations are unclear, misleading, or missing important information—or if Temporal concepts are proving unexpectedly difficult to work with—draft a GitHub issue body describing the problem encountered and what would have helped, then ask the user to file it at https://github.com/temporalio/skill-temporal-developer/issues/new. Do not file the issue autonomously. diff --git a/references/go/advanced-features.md b/references/go/advanced-features.md new file mode 100644 index 0000000..dfb5ef7 --- /dev/null +++ b/references/go/advanced-features.md @@ -0,0 +1,436 @@ +# Go SDK Advanced Features + +## Local Activities + +### WHY: Reduce latency for short, lightweight operations by skipping the task queue +### WHEN: +- **Short operations** - Activities completing in milliseconds/seconds +- **High-frequency calls** - When task queue overhead is significant +- **Low-latency requirements** - When you can't afford task queue round-trip + +**Tradeoffs:** Local activities don't appear in history until the workflow task completes, and don't benefit from task queue load balancing. + +```go +func WorkflowWithLocalActivity(ctx workflow.Context) error { + lao := workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: 5 * time.Second, + } + ctx = workflow.WithLocalActivityOptions(ctx, lao) + + var result string + err := workflow.ExecuteLocalActivity(ctx, LocalDataLookup, "key").Get(ctx, &result) + if err != nil { + return err + } + return nil +} + +// Local activity - same signature as regular activity +func LocalDataLookup(ctx context.Context, key string) (string, error) { + // Short, local operation - e.g., cache lookup, simple computation + return lookupFromCache(key), nil +} +``` + +## Async Activity Completion + +### WHY: Complete activities from external systems (webhooks, human tasks, external services) +### WHEN: +- **Human approval workflows** - Wait for human to complete task externally +- **Webhook-based integrations** - External service calls back when done +- **Long-polling external systems** - Activity starts work, external system finishes it + +```go +// Activity that starts async work +func AsyncActivity(ctx context.Context, taskID string) (string, error) { + // Get task token for later completion + info := activity.GetInfo(ctx) + taskToken := info.TaskToken + + // Store task token for external service to use + storeTaskToken(taskID, taskToken) + + // Signal external system to start work + startExternalWork(taskID) + + // Return ErrResultPending - activity stays open until completed externally + return "", activity.ErrResultPending +} + +// External service completes the activity using the client +func CompleteActivityExternally(taskToken []byte, result string) error { + c, _ := client.Dial(client.Options{}) + defer c.Close() + + return c.CompleteActivity(context.Background(), taskToken, result, nil) +} + +// Or complete by ID instead of token +func CompleteActivityByID(workflowID, runID, activityID, result string) error { + c, _ := client.Dial(client.Options{}) + defer c.Close() + + return c.CompleteActivityByID(context.Background(), + "default", workflowID, runID, activityID, result, nil) +} +``` + +## Continue-as-New + +Use continue-as-new to prevent unbounded history growth in long-running workflows. + +```go +func BatchProcessingWorkflow(ctx workflow.Context, state ProcessingState) (string, error) { + ao := workflow.ActivityOptions{StartToCloseTimeout: 5 * time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + for !state.IsComplete { + // Process next batch + err := workflow.ExecuteActivity(ctx, ProcessBatchActivity, state).Get(ctx, &state) + if err != nil { + return "", err + } + + // Check history size and continue-as-new if needed + info := workflow.GetInfo(ctx) + if info.GetCurrentHistoryLength() > 10000 { + return "", workflow.NewContinueAsNewError(ctx, BatchProcessingWorkflow, state) + } + } + + return "completed", nil +} +``` + +### Continue-as-New with Options + +```go +// Continue with modified options +return "", workflow.NewContinueAsNewError( + ctx, + BatchProcessingWorkflow, + newState, + workflow.WithWorkflowRunTimeout(time.Hour*24), + workflow.WithMemo(map[string]interface{}{ + "lastProcessed": itemID, + }), +) +``` + +## Workflow Updates + +Updates allow synchronous interaction with running workflows. + +### Defining Update Handlers + +```go +func OrderWorkflow(ctx workflow.Context, order Order) (string, error) { + var items []string + + // Register update handler + err := workflow.SetUpdateHandler(ctx, "addItem", func(ctx workflow.Context, item string) (int, error) { + items = append(items, item) + return len(items), nil + }) + if err != nil { + return "", err + } + + // Register update handler with validator + err = workflow.SetUpdateHandlerWithOptions(ctx, "addItemValidated", + func(ctx workflow.Context, item string) (int, error) { + items = append(items, item) + return len(items), nil + }, + workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context, item string) error { + if item == "" { + return errors.New("item cannot be empty") + } + if len(items) >= 100 { + return errors.New("order is full") + } + return nil + }, + }, + ) + + // Wait for completion signal + workflow.GetSignalChannel(ctx, "complete").Receive(ctx, nil) + + return fmt.Sprintf("Order with %d items completed", len(items)), nil +} +``` + +### Calling Updates from Client + +```go +handle, err := c.GetWorkflowHandle(ctx, "order-123") + +// Execute update and wait for result +var count int +err = handle.UpdateWorkflow(ctx, "addItem", client.UpdateWorkflowOptions{}, "new-item").Get(ctx, &count) +if err != nil { + return err +} +fmt.Printf("Order now has %d items\n", count) +``` + +## Schedules + +Create recurring workflow executions. + +```go +import "go.temporal.io/sdk/client" + +// Create a schedule +scheduleID := "daily-report" +handle, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: scheduleID, + Spec: client.ScheduleSpec{ + Intervals: []client.ScheduleIntervalSpec{ + {Every: 24 * time.Hour}, + }, + }, + Action: &client.ScheduleWorkflowAction{ + ID: "daily-report", + Workflow: DailyReportWorkflow, + TaskQueue: "reports", + }, +}) + +// Manage schedules +handle, _ = c.ScheduleClient().GetHandle(ctx, scheduleID) +handle.Pause(ctx, client.SchedulePauseOptions{Note: "Maintenance window"}) +handle.Unpause(ctx, client.ScheduleUnpauseOptions{}) +handle.Trigger(ctx, client.ScheduleTriggerOptions{}) // Run immediately +handle.Delete(ctx) +``` + +### Cron-style Schedules + +```go +handle, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: "hourly-cleanup", + Spec: client.ScheduleSpec{ + CronExpressions: []string{"0 * * * *"}, // Every hour on the hour + }, + Action: &client.ScheduleWorkflowAction{ + Workflow: CleanupWorkflow, + TaskQueue: "maintenance", + }, +}) +``` + +## Worker Sessions + +Sessions ensure activities run on the same worker for resource affinity. + +```go +func FileProcessingWorkflow(ctx workflow.Context, files []string) error { + // Create session - all activities will run on the same worker + sessionOpts := &workflow.SessionOptions{ + CreationTimeout: time.Minute, + ExecutionTimeout: time.Hour, + } + + sessionCtx, err := workflow.CreateSession(ctx, sessionOpts) + if err != nil { + return err + } + defer workflow.CompleteSession(sessionCtx) + + ao := workflow.ActivityOptions{StartToCloseTimeout: 10 * time.Minute} + sessionCtx = workflow.WithActivityOptions(sessionCtx, ao) + + // Download file (runs on session worker) + var localPath string + err = workflow.ExecuteActivity(sessionCtx, DownloadFileActivity, files[0]).Get(sessionCtx, &localPath) + if err != nil { + return err + } + + // Process file (runs on same worker where file was downloaded) + err = workflow.ExecuteActivity(sessionCtx, ProcessFileActivity, localPath).Get(sessionCtx, nil) + if err != nil { + return err + } + + return nil +} +``` + +## Interceptors + +Interceptors allow cross-cutting concerns like logging, metrics, and auth. + +### Creating a Custom Interceptor + +```go +import ( + "go.temporal.io/sdk/interceptor" + "go.temporal.io/sdk/workflow" +) + +type LoggingInterceptor struct { + interceptor.WorkerInterceptorBase +} + +func (i *LoggingInterceptor) InterceptActivity( + ctx context.Context, + next interceptor.ActivityInboundInterceptor, +) interceptor.ActivityInboundInterceptor { + return &loggingActivityInterceptor{next} +} + +type loggingActivityInterceptor struct { + interceptor.ActivityInboundInterceptorBase +} + +func (i *loggingActivityInterceptor) Execute( + ctx context.Context, + in *interceptor.ExecuteActivityInput, +) (interface{}, error) { + logger := activity.GetLogger(ctx) + logger.Info("Activity starting", "activity", in.ActivityType) + + result, err := i.Next.Execute(ctx, in) + + if err != nil { + logger.Error("Activity failed", "error", err) + } else { + logger.Info("Activity completed") + } + return result, err +} + +// Apply to worker +w := worker.New(c, "my-queue", worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{&LoggingInterceptor{}}, +}) +``` + +## Dynamic Workflows and Activities + +Handle workflows/activities not known at compile time. + +### Dynamic Workflow Registration + +```go +func DynamicWorkflowHandler(ctx workflow.Context, args ...interface{}) (interface{}, error) { + workflowType := workflow.GetInfo(ctx).WorkflowType.Name + + // Route based on type + switch workflowType { + case "order-workflow": + return handleOrderWorkflow(ctx, args) + case "refund-workflow": + return handleRefundWorkflow(ctx, args) + default: + return nil, fmt.Errorf("unknown workflow type: %s", workflowType) + } +} + +// Register as dynamic handler +w.RegisterWorkflowWithOptions(DynamicWorkflowHandler, workflow.RegisterOptions{ + Name: "", // Empty name means dynamic +}) +``` + +### Dynamic Activity Registration + +```go +func DynamicActivityHandler(ctx context.Context, args ...interface{}) (interface{}, error) { + info := activity.GetInfo(ctx) + activityType := info.ActivityType.Name + + // Route based on type + switch activityType { + case "process-payment": + return processPayment(ctx, args) + default: + return nil, fmt.Errorf("unknown activity type: %s", activityType) + } +} + +// Register as dynamic handler +w.RegisterActivityWithOptions(DynamicActivityHandler, activity.RegisterOptions{ + Name: "", // Empty name means dynamic +}) +``` + +## Worker Tuning + +Configure worker performance settings. + +```go +w := worker.New(c, "my-queue", worker.Options{ + // Workflow task concurrency + MaxConcurrentWorkflowTaskExecutionSize: 100, + + // Activity task concurrency + MaxConcurrentActivityExecutionSize: 100, + + // Local activity concurrency + MaxConcurrentLocalActivityExecutionSize: 100, + + // Session worker options (for file processing etc.) + MaxConcurrentSessionExecutionSize: 1000, + + // Graceful stop timeout + WorkerStopTimeout: 30 * time.Second, +}) +``` + +## Workflow Info and Metadata + +Access workflow metadata from within workflows. + +```go +func MyWorkflow(ctx workflow.Context) error { + info := workflow.GetInfo(ctx) + + workflowID := info.WorkflowExecution.ID + runID := info.WorkflowExecution.RunID + taskQueue := info.TaskQueueName + namespace := info.Namespace + attempt := info.Attempt + historyLength := info.GetCurrentHistoryLength() + + workflow.GetLogger(ctx).Info("Workflow info", + "workflowID", workflowID, + "attempt", attempt, + "historyLength", historyLength, + ) + + return nil +} +``` + +## slog Integration (Go 1.21+) + +### WHY: Use Go's standard structured logging with Temporal +### WHEN: +- **Go 1.21+ projects** - Native structured logging support +- **Existing slog infrastructure** - Integrate Temporal with your logging setup + +```go +import ( + "log/slog" + "go.temporal.io/sdk/log" +) + +// Create a Temporal logger from slog +slogger := slog.Default() +temporalLogger := log.NewStructuredLogger(slogger) + +// Use with client +c, _ := client.Dial(client.Options{ + Logger: temporalLogger, +}) + +// Use with worker +w := worker.New(c, "my-queue", worker.Options{ + Logger: temporalLogger, +}) +``` diff --git a/references/go/data-handling.md b/references/go/data-handling.md new file mode 100644 index 0000000..a2e78b2 --- /dev/null +++ b/references/go/data-handling.md @@ -0,0 +1,318 @@ +# Go SDK Data Handling + +## Overview + +The Go SDK uses data converters to serialize/deserialize workflow inputs, outputs, and activity parameters. + +## Default Data Converter + +The default converter handles: +- `nil` +- `[]byte` (as binary) +- Protobuf messages +- JSON-serializable types (via encoding/json) + +## Using Protobuf + +```go +import ( + "google.golang.org/protobuf/proto" + "myapp/pb" // Generated protobuf code +) + +// Activities and workflows can use protobuf messages directly +func ProcessOrderActivity(ctx context.Context, order *pb.Order) (*pb.OrderResult, error) { + // Process order... + return &pb.OrderResult{ + OrderId: order.Id, + Status: pb.OrderStatus_COMPLETED, + }, nil +} + +func OrderWorkflow(ctx workflow.Context, order *pb.Order) (*pb.OrderResult, error) { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + var result *pb.OrderResult + err := workflow.ExecuteActivity(ctx, ProcessOrderActivity, order).Get(ctx, &result) + return result, err +} +``` + +## Custom Data Converter + +Create custom converters for special serialization needs. + +```go +import "go.temporal.io/sdk/converter" + +type CustomPayloadConverter struct { + converter.DefaultPayloadConverter +} + +func (c *CustomPayloadConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + // Custom serialization logic + return c.DefaultPayloadConverter.ToPayload(value) +} + +func (c *CustomPayloadConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { + // Custom deserialization logic + return c.DefaultPayloadConverter.FromPayload(payload, valuePtr) +} + +// Apply custom converter +dataConverter := converter.NewCompositeDataConverter( + converter.NewNilPayloadConverter(), + converter.NewByteSlicePayloadConverter(), + &CustomPayloadConverter{}, +) + +c, err := client.Dial(client.Options{ + DataConverter: dataConverter, +}) +``` + +## Payload Encryption + +Encrypt sensitive workflow data using a codec. + +```go +import ( + "go.temporal.io/sdk/converter" + "crypto/aes" + "crypto/cipher" + "crypto/rand" +) + +type EncryptionCodec struct { + gcm cipher.AEAD +} + +func NewEncryptionCodec(key []byte) (*EncryptionCodec, error) { + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + gcm, err := cipher.NewGCM(block) + if err != nil { + return nil, err + } + return &EncryptionCodec{gcm: gcm}, nil +} + +func (c *EncryptionCodec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + result := make([]*commonpb.Payload, len(payloads)) + for i, p := range payloads { + // Encrypt each payload + data, _ := proto.Marshal(p) + nonce := make([]byte, c.gcm.NonceSize()) + rand.Read(nonce) + encrypted := c.gcm.Seal(nonce, nonce, data, nil) + + result[i] = &commonpb.Payload{ + Metadata: map[string][]byte{ + "encoding": []byte("binary/encrypted"), + }, + Data: encrypted, + } + } + return result, nil +} + +func (c *EncryptionCodec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + result := make([]*commonpb.Payload, len(payloads)) + for i, p := range payloads { + if string(p.Metadata["encoding"]) == "binary/encrypted" { + // Decrypt + nonceSize := c.gcm.NonceSize() + nonce, ciphertext := p.Data[:nonceSize], p.Data[nonceSize:] + decrypted, err := c.gcm.Open(nil, nonce, ciphertext, nil) + if err != nil { + return nil, err + } + + decoded := &commonpb.Payload{} + proto.Unmarshal(decrypted, decoded) + result[i] = decoded + } else { + result[i] = p + } + } + return result, nil +} + +// Apply encryption codec +codec, _ := NewEncryptionCodec(encryptionKey) +dataConverter := converter.NewCodecDataConverter( + converter.GetDefaultDataConverter(), + codec, +) + +c, err := client.Dial(client.Options{ + DataConverter: dataConverter, +}) +``` + +## Search Attributes + +Custom searchable fields for workflow visibility. + +```go +import "go.temporal.io/sdk/temporal" + +// Define typed keys +var ( + OrderIDKey = temporal.NewSearchAttributeKeyString("OrderId") + OrderStatusKey = temporal.NewSearchAttributeKeyString("OrderStatus") + OrderTotalKey = temporal.NewSearchAttributeKeyFloat64("OrderTotal") + CreatedAtKey = temporal.NewSearchAttributeKeyTime("CreatedAt") +) + +// Set at workflow start +options := client.StartWorkflowOptions{ + ID: "order-123", + TaskQueue: "orders", + SearchAttributes: temporal.NewSearchAttributes( + OrderIDKey.ValueSet("123"), + OrderStatusKey.ValueSet("pending"), + OrderTotalKey.ValueSet(99.99), + CreatedAtKey.ValueSet(time.Now()), + ), +} + +// Upsert from within workflow +workflow.UpsertTypedSearchAttributes(ctx, + OrderStatusKey.ValueSet("completed"), +) +``` + +## Workflow Memo + +Store arbitrary metadata with workflows (not searchable). + +```go +// Set memo at workflow start +options := client.StartWorkflowOptions{ + ID: "order-123", + TaskQueue: "orders", + Memo: map[string]interface{}{ + "customerName": order.CustomerName, + "notes": "Priority customer", + }, +} + +// Read memo from workflow +func OrderWorkflow(ctx workflow.Context, order Order) (string, error) { + info := workflow.GetInfo(ctx) + memo := info.Memo + // Memo fields need to be decoded from payload + return "", nil +} +``` + +## SideEffect for Non-Deterministic Values + +Use `SideEffect` to capture values that would otherwise be non-deterministic. + +```go +func WorkflowWithUUID(ctx workflow.Context) (string, error) { + var uuid string + err := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return generateUUID() + }).Get(&uuid) + if err != nil { + return "", err + } + + return uuid, nil +} + +func WorkflowWithRandom(ctx workflow.Context) (int, error) { + var randomNum int + err := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return rand.Intn(100) + }).Get(&randomNum) + if err != nil { + return 0, err + } + + return randomNum, nil +} +``` + +## MutableSideEffect + +Use `MutableSideEffect` when the value might change and you want to capture updates. + +```go +func WorkflowWithConfig(ctx workflow.Context) error { + var config Config + + // Get initial config, and update if it changes + encoded := workflow.MutableSideEffect(ctx, "config", func(ctx workflow.Context) interface{} { + return fetchCurrentConfig() + }, func(a, b interface{}) bool { + return reflect.DeepEqual(a, b) + }) + + err := encoded.Get(&config) + if err != nil { + return err + } + + // Use config... + return nil +} +``` + +## Large Payloads + +For large data, consider: + +1. **Store externally**: Put large data in S3/GCS, pass references in workflows +2. **Use compression codec**: Compress payloads automatically +3. **Chunk data**: Split large slices across multiple activities + +```go +// Example: Reference pattern for large data +func UploadToStorageActivity(ctx context.Context, data []byte) (string, error) { + key := fmt.Sprintf("data/%s", uuid.New().String()) + err := storageClient.Upload(ctx, key, data) + return key, err +} + +func DownloadFromStorageActivity(ctx context.Context, key string) ([]byte, error) { + return storageClient.Download(ctx, key) +} +``` + +## Serialization Requirements + +Workflow and activity inputs/outputs must be serializable: + +```go +// GOOD - struct with exported fields +type OrderInput struct { + OrderID string `json:"orderId"` + Items []Item `json:"items"` + Total float64 `json:"total"` +} + +// BAD - cannot serialize +type BadInput struct { + Conn net.Conn // Cannot serialize + Fn func() string // Cannot serialize + Ch chan int // Cannot serialize +} +``` + +## Best Practices + +1. Use protobuf for cross-language compatibility +2. Keep payloads small (< 2MB recommended) +3. Encrypt sensitive data with PayloadCodec +4. Store large data externally with references +5. Use structs with proper json tags for JSON converter +6. Use `SideEffect` for random/UUID values in workflows +7. Use typed Search Attribute keys for type safety +8. Avoid channels, functions, and interfaces in workflow data diff --git a/references/go/determinism.md b/references/go/determinism.md new file mode 100644 index 0000000..f21a983 --- /dev/null +++ b/references/go/determinism.md @@ -0,0 +1,287 @@ +# Go SDK Determinism + +## Overview + +The Go SDK has no sandbox. Determinism must be enforced through code review and static analysis tools. + +## Why Determinism Matters: History Replay + +Temporal achieves durability through **history replay**. Understanding this mechanism is key to writing correct Workflow code. + +### How Replay Works + +1. **Initial Execution**: When your Workflow runs for the first time, the SDK records Commands (like "schedule activity") to the Event History stored by Temporal Server. + +2. **Recovery/Continuation**: When a Worker restarts, loses connectivity, or picks up a Workflow Task, it must restore the Workflow's state by replaying the code from the beginning. + +3. **Command Matching**: During replay, the SDK re-executes your Workflow code but doesn't actually run Activities again. Instead, it compares the Commands your code generates against the Events in history. + +4. **Non-determinism Detection**: If your code generates different Commands than what's in history (e.g., different Activity name, different order), the Workflow Task fails. + +### Example: Why time.Now() Breaks Replay + +```go +// BAD - Non-deterministic +func BadWorkflow(ctx workflow.Context) error { + if time.Now().Hour() < 12 { // Different value on replay! + workflow.ExecuteActivity(ctx, MorningActivity).Get(ctx, nil) + } else { + workflow.ExecuteActivity(ctx, AfternoonActivity).Get(ctx, nil) + } + return nil +} +``` + +If this runs at 11:59 AM initially and replays at 12:01 PM, it will try to schedule a different Activity, causing a non-determinism error. + +```go +// GOOD - Deterministic +func GoodWorkflow(ctx workflow.Context) error { + if workflow.Now(ctx).Hour() < 12 { // Consistent during replay + workflow.ExecuteActivity(ctx, MorningActivity).Get(ctx, nil) + } else { + workflow.ExecuteActivity(ctx, AfternoonActivity).Get(ctx, nil) + } + return nil +} +``` + +### Testing Replay Compatibility + +Use the `WorkflowReplayer` to verify your code changes are compatible with existing histories: + +```go +func TestReplayCompatibility(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(MyWorkflow) + + // Load a history from a JSON file + err := replayer.ReplayWorkflowHistoryFromJSONFile( + nil, + "testdata/workflow_history.json", + ) + require.NoError(t, err) +} +``` + +Or fetch history from a running cluster: + +```go +func TestReplayFromCluster(t *testing.T) { + c, _ := client.Dial(client.Options{}) + + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(MyWorkflow) + + // Replay using history from server + err := replayer.ReplayWorkflowHistory( + nil, + getWorkflowHistory(c, "workflow-id", "run-id"), + ) + require.NoError(t, err) +} +``` + +## Safe Alternatives + +| Forbidden | Safe Alternative | +|-----------|------------------| +| `time.Now()` | `workflow.Now(ctx)` | +| `time.Sleep()` | `workflow.Sleep(ctx, duration)` | +| `go func()` | `workflow.Go(ctx, func(ctx workflow.Context))` | +| Go channels | `workflow.Channel` | +| `select` | `workflow.Selector` | +| `rand.Int()` | `workflow.SideEffect()` | +| `uuid.New()` | `workflow.SideEffect()` | +| `range map` | Sort keys first | + +## Static Analysis Tool + +```bash +# Install +go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest + +# Run on your code +workflowcheck ./... + +# With config file +workflowcheck -config .workflowcheck.yaml ./... +``` + +### What workflowcheck Catches + +- Using `time.Now()` instead of `workflow.Now(ctx)` +- Using `time.Sleep()` instead of `workflow.Sleep(ctx, d)` +- Using `rand.*` instead of `workflow.SideEffect()` +- Using regular `map` iteration instead of sorted iteration +- Using `go func()` instead of `workflow.Go(ctx, fn)` +- And other determinism violations + +### Configuration (.workflowcheck.yaml) + +```yaml +# Suppress false positives for specific functions +decls: + your-package/helper.SafeMapCopy: false + maps.Copy: false + +# Skip generated files +skip: + - '.*_mock.go' + - '.*_gen.go' +``` + +### Suppress Inline with Comments + +```go +//workflowcheck:ignore +for k, v := range myMap { + // Safe because we only read, don't modify based on order +} +``` + +### Add to CI/CD + +```yaml +# .github/workflows/pull-request.yml +- name: Install workflowcheck + run: go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest + +- name: Run workflowcheck + run: workflowcheck -config .workflowcheck.yaml ./... +``` + +## workflow.Go() for Concurrency + +```go +func MyWorkflow(ctx workflow.Context) error { + var result1, result2 string + + workflow.Go(ctx, func(ctx workflow.Context) { + workflow.ExecuteActivity(ctx, Activity1).Get(ctx, &result1) + }) + + workflow.Go(ctx, func(ctx workflow.Context) { + workflow.ExecuteActivity(ctx, Activity2).Get(ctx, &result2) + }) + + // Wait for both + workflow.Await(ctx, func() bool { + return result1 != "" && result2 != "" + }) + + return nil +} +``` + +## workflow.Channel + +```go +func ChannelWorkflow(ctx workflow.Context) error { + ch := workflow.NewChannel(ctx) + + workflow.Go(ctx, func(ctx workflow.Context) { + ch.Send(ctx, "data") + }) + + var value string + ch.Receive(ctx, &value) + + return nil +} +``` + +## workflow.Selector + +```go +func SelectorWorkflow(ctx workflow.Context) error { + selector := workflow.NewSelector(ctx) + + ch := workflow.GetSignalChannel(ctx, "my-signal") + selector.AddReceive(ch, func(c workflow.ReceiveChannel, more bool) { + var signal string + c.Receive(ctx, &signal) + // Handle signal + }) + + future := workflow.ExecuteActivity(ctx, MyActivity) + selector.AddFuture(future, func(f workflow.Future) { + var result string + f.Get(ctx, &result) + // Handle result + }) + + selector.Select(ctx) + return nil +} +``` + +## SideEffect for Non-Deterministic Values + +```go +func WorkflowWithUUID(ctx workflow.Context) (string, error) { + var uuid string + workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return generateUUID() + }).Get(&uuid) + + return uuid, nil +} + +func WorkflowWithRandom(ctx workflow.Context) (int, error) { + var randomNum int + workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return rand.Intn(100) + }).Get(&randomNum) + + return randomNum, nil +} +``` + +## Map Iteration Warning + +```go +// WRONG - non-deterministic order +for k, v := range myMap { + process(k, v) +} + +// CORRECT - sort keys first +keys := make([]string, 0, len(myMap)) +for k := range myMap { + keys = append(keys, k) +} +sort.Strings(keys) +for _, k := range keys { + process(k, myMap[k]) +} + +// OR use workflow.DeterministicKeys helper +keys := workflow.DeterministicKeys(myMap) +for _, k := range keys { + process(k, myMap[k]) +} +``` + +## Commands and Events + +Understanding the relationship between your code and the Event History: + +| Workflow Code | Command Generated | Event Created | +|--------------|-------------------|---------------| +| `workflow.ExecuteActivity()` | ScheduleActivityTask | ActivityTaskScheduled | +| `workflow.Sleep()` | StartTimer | TimerStarted | +| `workflow.ExecuteChildWorkflow()` | StartChildWorkflowExecution | ChildWorkflowExecutionStarted | +| `workflow.NewContinueAsNewError()` | ContinueAsNewWorkflowExecution | WorkflowExecutionContinuedAsNew | +| Return from workflow function | CompleteWorkflowExecution | WorkflowExecutionCompleted | + +## Best Practices + +1. Run `workflowcheck` in CI pipeline +2. Never use `go` keyword in workflows +3. Never use native Go channels in workflows +4. Sort map keys before iteration +5. Use `workflow.SideEffect()` for random/UUID values +6. Use `workflow.GetVersion()` for code changes +7. Test with replay to catch non-determinism early +8. Use `workflow.GetLogger(ctx)` instead of fmt.Println() diff --git a/references/go/error-handling.md b/references/go/error-handling.md new file mode 100644 index 0000000..47bc115 --- /dev/null +++ b/references/go/error-handling.md @@ -0,0 +1,312 @@ +# Go SDK Error Handling + +## Overview + +The Go SDK uses `temporal.ApplicationError` for application-specific errors with support for error types and non-retryable marking. + +## Application Errors + +```go +import "go.temporal.io/sdk/temporal" + +func ValidateActivity(ctx context.Context, input string) error { + if !isValid(input) { + return temporal.NewApplicationError( + "Invalid input: " + input, // message + "ValidationError", // error type + ) + } + return nil +} +``` + +### With Details + +```go +func ProcessOrderActivity(ctx context.Context, order Order) error { + if order.Total < 0 { + return temporal.NewApplicationErrorWithCause( + "Invalid order total", + "ValidationError", + errors.New("total cannot be negative"), + order, // details - can be retrieved by workflow + ) + } + return nil +} +``` + +## Non-Retryable Errors + +Mark errors as non-retryable when retrying would not help: + +```go +func PermanentFailureActivity(ctx context.Context) error { + return temporal.NewNonRetryableApplicationError( + "Credit card permanently declined", + "PaymentError", + nil, // cause + ) +} +``` + +## Handling Errors in Workflows + +```go +func WorkflowWithErrorHandling(ctx workflow.Context) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, RiskyActivity).Get(ctx, nil) + if err != nil { + var appErr *temporal.ApplicationError + if errors.As(err, &appErr) { + logger := workflow.GetLogger(ctx) + logger.Error("Activity failed", + "type", appErr.Type(), + "message", appErr.Message(), + ) + + if appErr.Type() == "ValidationError" { + // Handle specific error type + return handleValidationError(ctx, appErr) + } + } + return err + } + return nil +} +``` + +### Extracting Error Details + +```go +func HandleErrorWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, ProcessOrder, order).Get(ctx, nil) + if err != nil { + var appErr *temporal.ApplicationError + if errors.As(err, &appErr) { + // Extract details from the error + var failedOrder Order + if appErr.HasDetails() { + appErr.Details(&failedOrder) + // Handle with details + } + } + return err + } + return nil +} +``` + +## Retry Policy Configuration + +```go +ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute * 10, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + MaximumAttempts: 5, + NonRetryableErrorTypes: []string{ + "ValidationError", + "PaymentError", + }, + }, +} +ctx = workflow.WithActivityOptions(ctx, ao) +``` + +### Retry Policy Parameters + +| Parameter | Description | +|-----------|-------------| +| `InitialInterval` | Delay before first retry | +| `BackoffCoefficient` | Multiplier for subsequent retry intervals | +| `MaximumInterval` | Cap on retry interval | +| `MaximumAttempts` | Total attempts including initial (0 = unlimited) | +| `NonRetryableErrorTypes` | Error types that should not be retried | + +## Timeout Configuration + +```go +ao := workflow.ActivityOptions{ + // Time from activity scheduled to single attempt completion + StartToCloseTimeout: time.Minute * 5, + + // Time from activity scheduled to final completion (including retries) + ScheduleToCloseTimeout: time.Minute * 30, + + // Maximum time between heartbeats + HeartbeatTimeout: time.Second * 30, +} +``` + +### Timeout Types + +| Timeout | Description | +|---------|-------------| +| `StartToCloseTimeout` | Max duration for a single attempt | +| `ScheduleToCloseTimeout` | Max total duration including all retries | +| `HeartbeatTimeout` | Max time between heartbeat calls | +| `ScheduleToStartTimeout` | Max time waiting in task queue (rarely used) | + +## Workflow Failure + +```go +func MyWorkflow(ctx workflow.Context) error { + if someCondition { + // Returning error fails the workflow + return temporal.NewApplicationError( + "Cannot process", + "BusinessError", + ) + } + return nil +} +``` + +## Panic vs Return Error + +```go +// Returning error fails the workflow execution +func FailWorkflow(ctx workflow.Context) error { + return errors.New("workflow failed") +} + +// Panic only fails the current workflow task (will be retried) +// Use for transient infrastructure issues, not business logic failures +func PanicWorkflow(ctx workflow.Context) error { + panic("temporary issue") +} +``` + +## Idempotency Patterns + +When Activities interact with external systems, making them idempotent ensures correctness during retries and replay. + +### Using Workflow IDs as Idempotency Keys + +```go +func ChargePaymentActivity(ctx context.Context, orderID string, amount float64) (string, error) { + // Use orderID as idempotency key with payment provider + result, err := paymentAPI.Charge(ctx, &ChargeRequest{ + Amount: amount, + IdempotencyKey: fmt.Sprintf("order-%s", orderID), + }) + if err != nil { + return "", err + } + return result.TransactionID, nil +} +``` + +### Tracking Operation Status in Workflow State + +```go +func OrderWorkflow(ctx workflow.Context, order Order) (string, error) { + ao := workflow.ActivityOptions{StartToCloseTimeout: 5 * time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + var state struct { + PaymentCompleted bool + TransactionID string + } + + // Check if payment already completed (e.g., after continue-as-new) + if !state.PaymentCompleted { + err := workflow.ExecuteActivity(ctx, ChargePaymentActivity, order.ID, order.Total). + Get(ctx, &state.TransactionID) + if err != nil { + return "", err + } + state.PaymentCompleted = true + } + + // Continue with order processing... + return state.TransactionID, nil +} +``` + +### Designing Idempotent Activities + +1. **Use unique identifiers** as idempotency keys (workflow ID, activity ID, or business ID) +2. **Check before acting**: Query external system state before making changes +3. **Make operations repeatable**: Ensure calling twice produces the same result +4. **Record outcomes**: Store transaction IDs or results for verification + +```go +func CreateUserActivity(ctx context.Context, req CreateUserRequest) (*User, error) { + // Check if user already exists (idempotent pattern) + existing, err := userService.GetByEmail(ctx, req.Email) + if err == nil && existing != nil { + return existing, nil // Already created, return existing + } + + // Create new user + return userService.Create(ctx, req) +} +``` + +## Activity Timeout Error Handling + +```go +func WorkflowWithTimeoutHandling(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, SlowActivity).Get(ctx, nil) + if err != nil { + var timeoutErr *temporal.TimeoutError + if errors.As(err, &timeoutErr) { + switch timeoutErr.TimeoutType() { + case enumspb.TIMEOUT_TYPE_START_TO_CLOSE: + // Activity took too long + case enumspb.TIMEOUT_TYPE_HEARTBEAT: + // Activity stopped heartbeating + case enumspb.TIMEOUT_TYPE_SCHEDULE_TO_CLOSE: + // Overall timeout including retries + } + } + return err + } + return nil +} +``` + +## Cancelled Error Handling + +```go +func WorkflowWithCancellation(ctx workflow.Context) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Hour} + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, nil) + if err != nil { + var canceledErr *temporal.CanceledError + if errors.As(err, &canceledErr) { + // Workflow or activity was cancelled + return handleCancellation(ctx) + } + return err + } + return nil +} +``` + +## Best Practices + +1. Use specific error types for different failure modes +2. Use `NewNonRetryableApplicationError` for permanent failures +3. Configure `NonRetryableErrorTypes` in retry policy +4. Log errors with context before handling +5. Use `errors.As()` to check error types +6. Design activities to be idempotent for safe retries +7. Set appropriate timeouts for each activity +8. Use heartbeats for long-running activities to detect failures quickly diff --git a/references/go/go.md b/references/go/go.md new file mode 100644 index 0000000..f9ac889 --- /dev/null +++ b/references/go/go.md @@ -0,0 +1,185 @@ +# Temporal Go SDK Reference + +## Overview + +The Temporal Go SDK (`go.temporal.io/sdk`) provides idiomatic Go patterns for building durable workflows. Workflows and activities are regular Go functions with special context parameters. Unlike Python and TypeScript, Go has no sandbox - determinism must be enforced through code review and static analysis. + +## Quick Start + +**activities.go** - Activity definitions (regular functions with context.Context): +```go +package activities + +import "context" + +func GreetActivity(ctx context.Context, name string) (string, error) { + return "Hello, " + name + "!", nil +} +``` + +**workflows.go** - Workflow definition (uses workflow.Context): +```go +package workflows + +import ( + "time" + "go.temporal.io/sdk/workflow" + "myapp/activities" +) + +func GreetingWorkflow(ctx workflow.Context, name string) (string, error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var result string + err := workflow.ExecuteActivity(ctx, activities.GreetActivity, name).Get(ctx, &result) + return result, err +} +``` + +**worker/main.go** - Worker setup: +```go +package main + +import ( + "log" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "myapp/activities" + "myapp/workflows" +) + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "greeting-queue", worker.Options{}) + w.RegisterWorkflow(workflows.GreetingWorkflow) + w.RegisterActivity(activities.GreetActivity) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} +``` + +## Key Concepts + +### Workflow Definition +- Regular Go function with `workflow.Context` as first parameter +- Returns result and error +- Use `workflow.ExecuteActivity()` to call activities +- Use `workflow.Go()` for concurrent operations (not the `go` keyword) + +### Activity Definition +- Regular Go function with `context.Context` as first parameter +- Can perform I/O, network calls, use `time.Now()`, etc. +- Use `activity.GetLogger(ctx)` for logging +- Use `activity.RecordHeartbeat(ctx, details)` for long operations + +### Worker Setup +- Create client with `client.Dial()` +- Create worker with `worker.New()` +- Register workflows and activities explicitly + +## Why Determinism Matters: History Replay + +Temporal achieves durability through **history replay**. When a Worker restarts or recovers from a failure, it re-executes the Workflow code from the beginning, but instead of re-running Activities, it uses the results stored in the Event History. + +**This is why Workflow code must be deterministic:** +- During replay, the SDK compares Commands generated by your code against the Events in history +- If the sequence differs (non-determinism), the Worker cannot restore state +- Non-determinism causes workflow task failures and blocks Workflow progress + +**Go SDK has no sandbox** - you must ensure determinism through code review and static analysis. + +See `determinism.md` for detailed rules and the workflowcheck tool. + +## Determinism Rules Summary + +**Safe alternatives:** +- `workflow.Now(ctx)` instead of `time.Now()` +- `workflow.Sleep(ctx, d)` instead of `time.Sleep(d)` +- `workflow.Go(ctx, fn)` instead of `go fn()` +- `workflow.Channel` instead of Go channels +- `workflow.Selector` instead of `select` +- `workflow.SideEffect()` for random/UUID values + +**Run the static checker:** +```bash +go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest +workflowcheck ./... +``` + +## Replay-Aware Logging + +Use `workflow.GetLogger(ctx)` inside Workflows for replay-safe logging that avoids duplicate messages: + +```go +func MyWorkflow(ctx workflow.Context, input string) (string, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Workflow started", "input", input) // Only logs on first execution + + var result string + err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, &result) + if err != nil { + logger.Error("Activity failed", "error", err) + return "", err + } + + logger.Info("Workflow completed", "result", result) + return result, nil +} +``` + +Use `activity.GetLogger(ctx)` in activities for context-aware logging: + +```go +func MyActivity(ctx context.Context, input string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Activity started", "input", input) // Includes activity context + // ... + return "result", nil +} +``` + +## Import Paths + +```go +import ( + "go.temporal.io/sdk/activity" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) +``` + +## Common Pitfalls + +1. **Using `go` keyword in workflows** - Use `workflow.Go()` instead +2. **Using Go channels** - Use `workflow.Channel` instead +3. **Using `time.Now()`** - Use `workflow.Now(ctx)` instead +4. **Using `time.Sleep()`** - Use `workflow.Sleep(ctx, d)` instead +5. **Iterating over maps** - Order is non-deterministic, sort keys first +6. **Missing activity registration** - Must register all activities on worker +7. **Using fmt.Println() in workflows** - Use `workflow.GetLogger()` instead + +## Additional Resources + +### Reference Files +- **`determinism.md`** - Safe alternatives, workflowcheck, concurrency patterns, history replay +- **`error-handling.md`** - ApplicationError, retry policies, error handling, idempotency +- **`testing.md`** - TestWorkflowEnvironment, mocking, replay testing +- **`patterns.md`** - Selectors, signals, queries, child workflows, saga pattern +- **`observability.md`** - Logging, metrics, tracing, Search Attributes +- **`advanced-features.md`** - Continue-as-new, schedules, updates, interceptors +- **`data-handling.md`** - Data converters, protobuf, payload encryption +- **`versioning.md`** - GetVersion API, workflow type versioning, Worker Versioning +- **`gotchas.md`** - Go-specific anti-patterns and common mistakes diff --git a/references/go/gotchas.md b/references/go/gotchas.md new file mode 100644 index 0000000..811072d --- /dev/null +++ b/references/go/gotchas.md @@ -0,0 +1,479 @@ +# Go Gotchas + +Go-specific mistakes and anti-patterns. See also [Common Gotchas](../core/common-gotchas.md) for language-agnostic concepts. + +## Determinism Violations + +### Using Native Concurrency + +```go +// BAD - go keyword is non-deterministic +func BadWorkflow(ctx workflow.Context) error { + go func() { + // This goroutine won't be tracked + processAsync() + }() + return nil +} + +// GOOD - use workflow.Go +func GoodWorkflow(ctx workflow.Context) error { + workflow.Go(ctx, func(ctx workflow.Context) { + // Deterministic coroutine + processAsync(ctx) + }) + return nil +} +``` + +### Using Native Channels + +```go +// BAD - native channels are non-deterministic +func BadWorkflow(ctx workflow.Context) error { + ch := make(chan string) + go func() { + ch <- "data" + }() + <-ch + return nil +} + +// GOOD - use workflow.Channel +func GoodWorkflow(ctx workflow.Context) error { + ch := workflow.NewChannel(ctx) + workflow.Go(ctx, func(ctx workflow.Context) { + ch.Send(ctx, "data") + }) + var value string + ch.Receive(ctx, &value) + return nil +} +``` + +### Using time.Now() + +```go +// BAD - different time on replay +func BadWorkflow(ctx workflow.Context) error { + if time.Now().Hour() < 12 { + return morningLogic(ctx) + } + return afternoonLogic(ctx) +} + +// GOOD - consistent across replays +func GoodWorkflow(ctx workflow.Context) error { + if workflow.Now(ctx).Hour() < 12 { + return morningLogic(ctx) + } + return afternoonLogic(ctx) +} +``` + +### Using time.Sleep() + +```go +// BAD - blocks worker thread, not durable +func BadWorkflow(ctx workflow.Context) error { + time.Sleep(time.Hour) + return nil +} + +// GOOD - durable timer +func GoodWorkflow(ctx workflow.Context) error { + workflow.Sleep(ctx, time.Hour) + return nil +} +``` + +### Iterating Over Maps + +```go +// BAD - non-deterministic order +func BadWorkflow(ctx workflow.Context, items map[string]int) error { + for k, v := range items { + processItem(ctx, k, v) // Order varies! + } + return nil +} + +// GOOD - sort keys first +func GoodWorkflow(ctx workflow.Context, items map[string]int) error { + keys := workflow.DeterministicKeys(items) + for _, k := range keys { + processItem(ctx, k, items[k]) + } + return nil +} +``` + +### Random Values and UUIDs + +```go +// BAD - different values on replay +func BadWorkflow(ctx workflow.Context) (string, error) { + id := uuid.New().String() + value := rand.Intn(100) + return id, nil +} + +// GOOD - use SideEffect +func GoodWorkflow(ctx workflow.Context) (string, error) { + var id string + workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return uuid.New().String() + }).Get(&id) + return id, nil +} +``` + +## Idempotency + +```go +// BAD - May charge customer multiple times on retry +func ChargePaymentActivity(ctx context.Context, orderID string, amount float64) error { + return paymentAPI.Charge(amount) +} + +// GOOD - Safe for retries +func ChargePaymentActivity(ctx context.Context, orderID string, amount float64) error { + return paymentAPI.Charge(amount, paymentAPI.WithIdempotencyKey("order-"+orderID)) +} +``` + +## Replay Safety + +### Side Effects in Workflows + +```go +// BAD - Prints on every replay, notification runs in workflow +func BadWorkflow(ctx workflow.Context) error { + fmt.Println("Starting workflow") // Runs on replay too + sendSlackNotification("Started") // Side effect in workflow! + return nil +} + +// GOOD - Replay-safe +func GoodWorkflow(ctx workflow.Context) error { + logger := workflow.GetLogger(ctx) + logger.Info("Starting workflow") // Only logs on first execution + + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + return workflow.ExecuteActivity(ctx, SendNotificationActivity, "Started").Get(ctx, nil) +} +``` + +## Query Handlers + +### Modifying State in Queries + +```go +// BAD - Query modifies state +func BadWorkflow(ctx workflow.Context) error { + var counter int + + workflow.SetQueryHandler(ctx, "increment", func() (int, error) { + counter++ // Mutates state! + return counter, nil + }) + + // ... + return nil +} + +// GOOD - Query reads, Update modifies +func GoodWorkflow(ctx workflow.Context) error { + var counter int + + workflow.SetQueryHandler(ctx, "getCount", func() (int, error) { + return counter, nil // Read-only + }) + + workflow.SetUpdateHandler(ctx, "increment", func() (int, error) { + counter++ + return counter, nil + }) + + // ... + return nil +} +``` + +## Activity Registration + +### Missing Activity Registration + +```go +// BAD - Activity not registered, will fail at runtime +func main() { + w := worker.New(c, "my-queue", worker.Options{}) + w.RegisterWorkflow(MyWorkflow) + // Missing: w.RegisterActivity(MyActivity) + w.Run(worker.InterruptCh()) +} + +// GOOD - All activities registered +func main() { + w := worker.New(c, "my-queue", worker.Options{}) + w.RegisterWorkflow(MyWorkflow) + w.RegisterActivity(MyActivity) + w.RegisterActivity(AnotherActivity) + w.Run(worker.InterruptCh()) +} +``` + +### Activity Naming Collisions + +```go +// BAD - potential collision in multi-package projects +w.RegisterActivity(orders.ProcessActivity) +w.RegisterActivity(payments.ProcessActivity) // Same name! + +// GOOD - prefixed +w.RegisterActivityWithOptions(orders.ProcessActivity, activity.RegisterOptions{ + Name: "orders.ProcessActivity", +}) +w.RegisterActivityWithOptions(payments.ProcessActivity, activity.RegisterOptions{ + Name: "payments.ProcessActivity", +}) +``` + +## Error Handling + +### Swallowing Errors + +```go +// BAD - Error is hidden +func BadWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, RiskyActivity).Get(ctx, nil) + if err != nil { + // Error is lost! + return nil + } + return nil +} + +// GOOD - Handle appropriately +func GoodWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, RiskyActivity).Get(ctx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("Activity failed", "error", err) + return err // Or use fallback, compensate, etc. + } + return nil +} +``` + +### Wrong Retry Classification + +```go +// BAD - Network errors should be retried +func BadActivity(ctx context.Context) error { + err := callAPI() + if err != nil { + if isNetworkError(err) { + return temporal.NewNonRetryableApplicationError("network failed", "NetworkError", err) + } + } + return err +} + +// GOOD - Only permanent failures are non-retryable +func GoodActivity(ctx context.Context) error { + err := callAPI() + if err != nil { + if isInvalidCredentials(err) { + return temporal.NewNonRetryableApplicationError("invalid API key", "AuthError", err) + } + return err // Let Temporal retry network errors + } + return nil +} +``` + +## Retry Policies + +### Too Aggressive + +```go +// BAD - Gives up too easily +ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Second, + RetryPolicy: &temporal.RetryPolicy{ + MaximumAttempts: 1, // No retries! + }, +} + +// GOOD - Resilient to transient failures +ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + MaximumAttempts: 10, + }, +} +``` + +## Heartbeating + +### Forgetting to Heartbeat Long Activities + +```go +// BAD - No heartbeat, can't detect stuck activities +func BadActivity(ctx context.Context, items []string) error { + for _, item := range items { + processItem(item) // Takes hours, no heartbeat + } + return nil +} + +// GOOD - Regular heartbeats with progress +func GoodActivity(ctx context.Context, items []string) error { + for i, item := range items { + activity.RecordHeartbeat(ctx, fmt.Sprintf("Processing item %d/%d", i+1, len(items))) + + if ctx.Err() != nil { + return ctx.Err() // Check for cancellation + } + + processItem(item) + } + return nil +} +``` + +### Heartbeat Timeout Too Short + +```go +// BAD - Heartbeat timeout shorter than processing time +ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 10 * time.Second, // Too short! +} + +// GOOD - Heartbeat timeout allows for processing variance +ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 2 * time.Minute, +} +``` + +## Context Handling + +### Using context.Context in Workflows + +```go +// BAD - wrong context type +func BadWorkflow(ctx context.Context) error { // Should be workflow.Context! + return nil +} + +// GOOD - correct context type +func GoodWorkflow(ctx workflow.Context) error { + return nil +} +``` + +### Using workflow.Context in Activities + +```go +// BAD - wrong context type +func BadActivity(ctx workflow.Context, input string) error { // Should be context.Context! + return nil +} + +// GOOD - correct context type +func GoodActivity(ctx context.Context, input string) error { + return nil +} +``` + +## Testing + +### Not Testing Failures + +```go +// Test failure scenarios +func (s *UnitTestSuite) TestActivityFailureHandling() { + s.env.RegisterWorkflow(MyWorkflow) + + // Mock activity to fail + s.env.OnActivity(MyActivity, mock.Anything, mock.Anything). + Return("", errors.New("activity failed")) + + s.env.ExecuteWorkflow(MyWorkflow, "input") + + s.True(s.env.IsWorkflowCompleted()) + s.Error(s.env.GetWorkflowError()) +} +``` + +### Not Testing Replay + +```go +func TestReplayCompatibility(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(MyWorkflow) + + // Load history from file (captured from production/staging) + err := replayer.ReplayWorkflowHistoryFromJSONFile(nil, "workflow_history.json") + + // Fails if current code is incompatible with history + require.NoError(t, err) +} +``` + +## Cancellation Handling + +### Not Using Disconnected Context for Cleanup + +```go +// BAD - cleanup won't run after cancellation +func BadWorkflow(ctx workflow.Context) error { + defer func() { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + workflow.ExecuteActivity(ctx, CleanupActivity).Get(ctx, nil) // Will fail if cancelled + }() + + return workflow.ExecuteActivity(ctx, MainActivity).Get(ctx, nil) +} + +// GOOD - cleanup runs even after cancellation +func GoodWorkflow(ctx workflow.Context) error { + defer func() { + if errors.Is(ctx.Err(), workflow.ErrCanceled) { + newCtx, _ := workflow.NewDisconnectedContext(ctx) + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + newCtx = workflow.WithActivityOptions(newCtx, ao) + workflow.ExecuteActivity(newCtx, CleanupActivity).Get(newCtx, nil) + } + }() + + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Hour} + ctx = workflow.WithActivityOptions(ctx, ao) + return workflow.ExecuteActivity(ctx, MainActivity).Get(ctx, nil) +} +``` + +## Static Analysis + +### Not Running workflowcheck + +```bash +# Add to CI/CD pipeline +go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest +workflowcheck ./... +``` + +This catches many determinism issues at compile time rather than runtime. diff --git a/references/go/observability.md b/references/go/observability.md new file mode 100644 index 0000000..2e9669f --- /dev/null +++ b/references/go/observability.md @@ -0,0 +1,309 @@ +# Go SDK Observability + +## Overview + +The Go SDK provides comprehensive observability through logging, metrics, tracing, and visibility (Search Attributes). + +## Logging + +### Workflow Logging (Replay-Safe) + +Use `workflow.GetLogger(ctx)` for replay-safe logging: + +```go +func MyWorkflow(ctx workflow.Context, input string) (string, error) { + logger := workflow.GetLogger(ctx) + + // These logs are automatically suppressed during replay + logger.Info("Workflow started", "input", input) + logger.Debug("Processing step 1") + + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + var result string + err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, &result) + if err != nil { + logger.Error("Activity failed", "error", err) + return "", err + } + + logger.Info("Workflow completed", "result", result) + return result, nil +} +``` + +The workflow logger: +- Suppresses duplicate logs during replay +- Includes workflow context (workflow ID, run ID, etc.) +- Uses structured key-value logging + +### Activity Logging + +Use `activity.GetLogger(ctx)` for context-aware activity logging: + +```go +func ProcessOrderActivity(ctx context.Context, orderID string) (string, error) { + logger := activity.GetLogger(ctx) + + logger.Info("Processing order", "orderID", orderID) + + // Perform work... + + logger.Info("Order processed successfully") + return "completed", nil +} +``` + +Activity logger includes: +- Activity ID, type, and task queue +- Workflow ID and run ID +- Attempt number (for retries) + +### Custom Logger + +```go +import ( + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/log" + "github.com/sirupsen/logrus" + logrusadapter "logur.dev/adapter/logrus" +) + +// Use logrus via adapter +logrusLogger := logrus.New() +logrusLogger.SetLevel(logrus.InfoLevel) +logger := log.NewStructuredLogger(logrusadapter.New(logrusLogger)) + +c, err := client.Dial(client.Options{ + Logger: logger, +}) +``` + +### slog Integration (Go 1.21+) + +```go +import ( + "log/slog" + "go.temporal.io/sdk/log" +) + +slogger := slog.Default() +temporalLogger := log.NewStructuredLogger(slogger) + +c, err := client.Dial(client.Options{ + Logger: temporalLogger, +}) +``` + +## Metrics + +### Enabling Prometheus Metrics + +```go +import ( + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/tally" + tallyprometheus "github.com/uber-go/tally/v4/prometheus" +) + +// Create Prometheus reporter +reporter := tallyprometheus.NewReporter(tallyprometheus.Options{}) + +// Create metrics scope +scope, closer := tally.NewRootScope(tally.ScopeOptions{ + Reporter: reporter, +}, time.Second) +defer closer.Close() + +// Create metrics handler +metricsHandler := tally.NewMetricsHandler(scope) + +// Apply to client +c, err := client.Dial(client.Options{ + MetricsHandler: metricsHandler, +}) +``` + +### Key SDK Metrics + +| Metric | Description | +|--------|-------------| +| `temporal_request` | Client requests to server | +| `temporal_workflow_task_execution_latency` | Workflow task processing time | +| `temporal_activity_execution_latency` | Activity execution time | +| `temporal_workflow_task_replay_latency` | Replay duration | + +### Custom Metrics in Workflows + +```go +func MyWorkflow(ctx workflow.Context) error { + metricsHandler := workflow.GetMetricsHandler(ctx) + + // Record custom metrics (replay-safe) + counter := metricsHandler.Counter("my_custom_counter") + counter.Inc(1) + + gauge := metricsHandler.Gauge("my_custom_gauge") + gauge.Update(42.0) + + return nil +} +``` + +## Tracing + +### OpenTelemetry Integration + +```go +import ( + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/contrib/opentelemetry" + "go.temporal.io/sdk/interceptor" +) + +// Create tracing interceptor +tracingInterceptor, err := opentelemetry.NewTracingInterceptor( + opentelemetry.TracerOptions{}, +) +if err != nil { + log.Fatal(err) +} + +// Apply to client +c, err := client.Dial(client.Options{ + Interceptors: []interceptor.ClientInterceptor{tracingInterceptor}, +}) + +// Apply to worker +w := worker.New(c, "my-queue", worker.Options{ + Interceptors: []interceptor.WorkerInterceptor{tracingInterceptor}, +}) +``` + +### Datadog Integration + +```go +import "go.temporal.io/sdk/contrib/datadog/tracing" + +tracingInterceptor, err := tracing.NewTracingInterceptor(tracing.TracerOptions{}) +``` + +## Search Attributes (Visibility) + +### Setting Search Attributes at Start + +```go +import "go.temporal.io/sdk/temporal" + +options := client.StartWorkflowOptions{ + ID: "order-123", + TaskQueue: "orders", + SearchAttributes: temporal.NewSearchAttributes( + temporal.NewSearchAttributeKeyString("OrderId").ValueSet("123"), + temporal.NewSearchAttributeKeyString("CustomerType").ValueSet("premium"), + temporal.NewSearchAttributeKeyFloat64("OrderTotal").ValueSet(99.99), + ), +} + +we, err := c.ExecuteWorkflow(ctx, options, OrderWorkflow, order) +``` + +### Typed Search Attribute Keys + +```go +import "go.temporal.io/sdk/temporal" + +// Define typed keys +var ( + OrderIDKey = temporal.NewSearchAttributeKeyString("OrderId") + OrderStatusKey = temporal.NewSearchAttributeKeyString("OrderStatus") + OrderTotalKey = temporal.NewSearchAttributeKeyFloat64("OrderTotal") + CreatedAtKey = temporal.NewSearchAttributeKeyTime("CreatedAt") +) + +// Use typed keys +options := client.StartWorkflowOptions{ + SearchAttributes: temporal.NewSearchAttributes( + OrderIDKey.ValueSet("123"), + OrderStatusKey.ValueSet("pending"), + OrderTotalKey.ValueSet(99.99), + CreatedAtKey.ValueSet(time.Now()), + ), +} +``` + +### Upserting Search Attributes from Workflow + +```go +func OrderWorkflow(ctx workflow.Context, order Order) (string, error) { + // Update status as workflow progresses + workflow.UpsertTypedSearchAttributes(ctx, + temporal.NewSearchAttributeKeyString("OrderStatus").ValueSet("processing"), + ) + + // Process order... + + workflow.UpsertTypedSearchAttributes(ctx, + temporal.NewSearchAttributeKeyString("OrderStatus").ValueSet("completed"), + ) + + return "done", nil +} +``` + +### Querying Workflows by Search Attributes + +```go +// List workflows using search attributes +iter := c.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Query: `OrderStatus = "processing" AND CustomerType = "premium"`, +}) + +for iter.HasNext() { + we, err := iter.Next() + if err != nil { + return err + } + fmt.Printf("Workflow %s is still processing\n", we.Execution.WorkflowId) +} +``` + +## Debugging with Event History + +### Fetching Workflow History + +```go +iter := c.GetWorkflowHistory(ctx, workflowID, runID, false, enums.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + +for iter.HasNext() { + event, err := iter.Next() + if err != nil { + return err + } + fmt.Printf("Event %d: %s\n", event.EventId, event.EventType) +} +``` + +### Using Temporal CLI + +```bash +# Get workflow history +temporal workflow show -w my-workflow-id + +# View in JSON format +temporal workflow show -w my-workflow-id --output json + +# List workflows with query +temporal workflow list --query 'OrderStatus = "processing"' +``` + +## Best Practices + +1. Use `workflow.GetLogger()` in workflows, `activity.GetLogger()` in activities +2. Don't use fmt.Println() in workflows - it produces duplicate output on replay +3. Configure metrics for production monitoring +4. Use Search Attributes for business-level visibility +5. Add tracing for distributed debugging +6. Log with structured key-value pairs for easier querying +7. Use typed Search Attribute keys for type safety diff --git a/references/go/patterns.md b/references/go/patterns.md new file mode 100644 index 0000000..8ecc63b --- /dev/null +++ b/references/go/patterns.md @@ -0,0 +1,536 @@ +# Go SDK Patterns + +## Signals + +### WHY: Use signals to send data or commands to a running workflow from external sources +### WHEN: +- **Order approval workflows** - Wait for human approval before proceeding +- **Live configuration updates** - Change workflow behavior without restarting +- **External event notifications** - Notify workflow of events from other systems +- **Workflow coordination** - Allow workflows to communicate with each other + +**Signals vs Queries vs Updates:** +- Signals: Fire-and-forget, no response, can modify state +- Queries: Read-only, returns data, cannot modify state +- Updates: Synchronous, returns response, can modify state + +```go +func SignalWorkflow(ctx workflow.Context) error { + var approved bool + signalChan := workflow.GetSignalChannel(ctx, "approve") + + // Wait for signal + signalChan.Receive(ctx, &approved) + + if approved { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + return workflow.ExecuteActivity(ctx, ProcessActivity).Get(ctx, nil) + } + return errors.New("not approved") +} + +// With selector for multiple signals +func MultiSignalWorkflow(ctx workflow.Context) error { + selector := workflow.NewSelector(ctx) + + approveChan := workflow.GetSignalChannel(ctx, "approve") + rejectChan := workflow.GetSignalChannel(ctx, "reject") + + var result string + selector.AddReceive(approveChan, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + result = "approved" + }) + selector.AddReceive(rejectChan, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, nil) + result = "rejected" + }) + + selector.Select(ctx) + return nil +} +``` + +### Sending Signals from Client + +```go +// From client code +err := c.SignalWorkflow(ctx, workflowID, runID, "approve", true) +if err != nil { + log.Fatalln("Unable to signal workflow", err) +} +``` + +## Signal-with-Start + +### WHY: Atomically start a workflow and send it a signal in a single operation +### WHEN: +- **Idempotent workflow triggering** - Ensure signal reaches workflow whether it exists or not +- **Event-driven workflow initialization** - Start workflow with initial event data +- **Race condition prevention** - Avoid window where workflow exists but hasn't received signal + +```go +// From client code - starts workflow if not running, then sends signal +run, err := c.SignalWithStartWorkflow( + ctx, + "order-123", // workflow ID + "add-item", // signal name + itemData, // signal arg + client.StartWorkflowOptions{ + TaskQueue: "orders", + }, + OrderWorkflow, // workflow function + orderInput, // workflow args +) +``` + +## Queries + +### WHY: Read workflow state without affecting execution - queries are read-only +### WHEN: +- **Progress tracking dashboards** - Display workflow progress to users +- **Status checks** - Check if workflow is ready for next step +- **Debugging** - Inspect internal workflow state +- **Health checks** - Verify workflow is functioning correctly + +**Important:** Queries must NOT modify workflow state or have side effects. + +```go +func QueryableWorkflow(ctx workflow.Context) error { + var status string = "running" + var progress int = 0 + + // Register query handlers + workflow.SetQueryHandler(ctx, "status", func() (string, error) { + return status, nil + }) + + workflow.SetQueryHandler(ctx, "progress", func() (int, error) { + return progress, nil + }) + + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + // Workflow logic + for i := 0; i < 100; i++ { + progress = i + workflow.ExecuteActivity(ctx, ProcessItem, i).Get(ctx, nil) + } + + status = "completed" + return nil +} +``` + +### Querying from Client + +```go +response, err := c.QueryWorkflow(ctx, workflowID, runID, "status") +if err != nil { + log.Fatalln("Unable to query workflow", err) +} + +var status string +err = response.Get(&status) +``` + +## Child Workflows + +### WHY: Break complex workflows into smaller, manageable units with independent failure domains +### WHEN: +- **Failure domain isolation** - Child failures don't automatically fail parent +- **Different retry policies** - Each child can have its own retry configuration +- **Reusability** - Share workflow logic across multiple parent workflows +- **Independent scaling** - Child workflows can run on different task queues +- **History size management** - Each child has its own event history + +**Use activities instead when:** Operation is short-lived, doesn't need its own failure domain, or doesn't need independent retry policies. + +```go +func ParentWorkflow(ctx workflow.Context, orders []Order) error { + for _, order := range orders { + cwo := workflow.ChildWorkflowOptions{ + WorkflowID: "order-" + order.ID, + // ParentClosePolicy controls what happens to child when parent completes + ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_ABANDON, + } + ctx := workflow.WithChildOptions(ctx, cwo) + + err := workflow.ExecuteChildWorkflow(ctx, ProcessOrderWorkflow, order).Get(ctx, nil) + if err != nil { + return err + } + } + return nil +} +``` + +## External Workflow Signaling + +### WHY: Send signals to workflows that are not children of the current workflow +### WHEN: +- **Cross-workflow coordination** - Coordinate between independent workflows +- **Event broadcasting** - Notify multiple unrelated workflows of an event +- **Workflow-to-workflow communication** - Allow workflows to communicate without a parent-child relationship + +```go +func CoordinatorWorkflow(ctx workflow.Context, targetWorkflowID string) error { + // Signal an external workflow (not a child) + err := workflow.SignalExternalWorkflow(ctx, targetWorkflowID, "", "data-ready", dataPayload).Get(ctx, nil) + if err != nil { + return err + } + return nil +} +``` + +## Parallel Execution + +### WHY: Execute multiple independent operations concurrently for better throughput +### WHEN: +- **Batch processing** - Process multiple items simultaneously +- **Fan-out patterns** - Distribute work across multiple activities +- **Independent operations** - Operations that don't depend on each other's results + +```go +func ParallelWorkflow(ctx workflow.Context, items []string) ([]string, error) { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + var futures []workflow.Future + for _, item := range items { + future := workflow.ExecuteActivity(ctx, ProcessItem, item) + futures = append(futures, future) + } + + var results []string + for _, future := range futures { + var result string + if err := future.Get(ctx, &result); err != nil { + return nil, err + } + results = append(results, result) + } + return results, nil +} +``` + +## Continue-as-New + +### WHY: Prevent unbounded event history growth in long-running or infinite workflows +### WHEN: +- **Event history approaching 10,000+ events** - Temporal recommends continue-as-new before hitting limits +- **Infinite/long-running workflows** - Polling, subscription, or daemon-style workflows +- **Memory optimization** - Reset workflow state to reduce memory footprint + +**Recommendation:** Check history length periodically and continue-as-new around 10,000 events. + +```go +func LongRunningWorkflow(ctx workflow.Context, state State) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: 5 * time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + for { + err := workflow.ExecuteActivity(ctx, ProcessBatch, state).Get(ctx, &state) + if err != nil { + return err + } + + if state.IsComplete { + return nil + } + + // Check history size - continue-as-new before hitting limits + info := workflow.GetInfo(ctx) + if info.GetCurrentHistoryLength() > 10000 { + return workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state) + } + } +} +``` + +## Saga Pattern (Compensations) + +### WHY: Implement distributed transactions with compensating actions for rollback +### WHEN: +- **Multi-step transactions** - Operations that span multiple services +- **Eventual consistency** - When you can't use traditional ACID transactions +- **Rollback requirements** - When partial failures require undoing previous steps + +**Important:** Compensation activities should be idempotent - they may be retried. + +```go +func SagaWorkflow(ctx workflow.Context, order Order) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute * 5} + ctx = workflow.WithActivityOptions(ctx, ao) + + var compensations []func(workflow.Context) error + + // Reserve inventory + err := workflow.ExecuteActivity(ctx, ReserveInventory, order).Get(ctx, nil) + if err != nil { + return err + } + compensations = append(compensations, func(ctx workflow.Context) error { + return workflow.ExecuteActivity(ctx, ReleaseInventory, order).Get(ctx, nil) + }) + + // Charge payment + err = workflow.ExecuteActivity(ctx, ChargePayment, order).Get(ctx, nil) + if err != nil { + return runCompensations(ctx, compensations) + } + compensations = append(compensations, func(ctx workflow.Context) error { + return workflow.ExecuteActivity(ctx, RefundPayment, order).Get(ctx, nil) + }) + + // Ship order + err = workflow.ExecuteActivity(ctx, ShipOrder, order).Get(ctx, nil) + if err != nil { + return runCompensations(ctx, compensations) + } + + return nil +} + +func runCompensations(ctx workflow.Context, compensations []func(workflow.Context) error) error { + logger := workflow.GetLogger(ctx) + // Run compensations in reverse order + for i := len(compensations) - 1; i >= 0; i-- { + if err := compensations[i](ctx); err != nil { + logger.Error("Compensation failed", "error", err) + // Continue with other compensations even if one fails + } + } + return errors.New("saga failed") +} +``` + +## Timers + +### WHY: Schedule delays or deadlines within workflows in a durable way +### WHEN: +- **Scheduled delays** - Wait for a specific duration before continuing +- **Deadlines** - Set timeouts for operations +- **Reminder patterns** - Schedule future notifications +- **Rate limiting** - Pace workflow operations + +```go +func TimerWorkflow(ctx workflow.Context) error { + // Simple sleep + workflow.Sleep(ctx, time.Hour) + + // Timer with selector for cancellation + timer := workflow.NewTimer(ctx, time.Hour) + cancelChan := workflow.GetSignalChannel(ctx, "cancel") + + selector := workflow.NewSelector(ctx) + selector.AddFuture(timer, func(f workflow.Future) { + // Timer fired + }) + selector.AddReceive(cancelChan, func(c workflow.ReceiveChannel, more bool) { + // Cancelled + }) + selector.Select(ctx) + + return nil +} +``` + +## Cancellation Handling + +### WHY: Gracefully handle workflow cancellation requests and perform cleanup +### WHEN: +- **Graceful shutdown** - Clean up resources when workflow is cancelled +- **External cancellation** - Respond to cancellation requests from clients +- **Cleanup activities** - Run cleanup logic even after cancellation + +**Critical:** Use `workflow.NewDisconnectedContext()` to execute activities after cancellation. + +```go +func CancellableWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + HeartbeatTimeout: 5 * time.Second, + WaitForCancellation: true, // Wait for activities to handle cancellation + } + ctx = workflow.WithActivityOptions(ctx, ao) + + defer func() { + // Only run cleanup if workflow was cancelled + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + return + } + + // Create disconnected context for cleanup - this context won't be cancelled + newCtx, _ := workflow.NewDisconnectedContext(ctx) + err := workflow.ExecuteActivity(newCtx, CleanupActivity).Get(newCtx, nil) + if err != nil { + workflow.GetLogger(ctx).Error("Cleanup failed", "error", err) + } + }() + + // Main workflow logic + err := workflow.ExecuteActivity(ctx, LongRunningActivity).Get(ctx, nil) + return err +} +``` + +## Wait Condition with Timeout + +### WHY: Wait for a condition with a deadline +### WHEN: +- **Approval workflows with deadlines** - Auto-reject if not approved in time +- **Conditional waits with timeouts** - Proceed with default after timeout + +```go +func ApprovalWorkflow(ctx workflow.Context) (string, error) { + var approved bool + + // Set up signal handler + signalChan := workflow.GetSignalChannel(ctx, "approve") + + // Wait for approval with 24-hour timeout + selector := workflow.NewSelector(ctx) + selector.AddReceive(signalChan, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &approved) + }) + + timer := workflow.NewTimer(ctx, 24*time.Hour) + selector.AddFuture(timer, func(f workflow.Future) { + // Timeout reached + }) + + selector.Select(ctx) + + if approved { + return "approved", nil + } + return "auto-rejected due to timeout", nil +} +``` + +## Heartbeating Long-Running Activities + +### WHY: Report progress and detect worker failures for long-running activities +### WHEN: +- **Long-running operations** - Activities that take minutes or hours +- **Progress reporting** - Track activity progress from workflow +- **Fast failure detection** - Detect worker crashes quickly + +```go +func LongRunningActivity(ctx context.Context, items []Item) error { + for i, item := range items { + // Process item... + processItem(item) + + // Record heartbeat with progress + activity.RecordHeartbeat(ctx, i) + + // Check for cancellation + if ctx.Err() != nil { + return ctx.Err() + } + } + return nil +} +``` + +Configure heartbeat timeout in workflow: +```go +ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour, + HeartbeatTimeout: time.Minute, +} +``` + +## Activity Heartbeat Details + +### WHY: Resume activity progress after worker failure +### WHEN: +- **Long-running activities** - Track progress for resumability +- **Checkpointing** - Save progress periodically + +```go +func ProcessLargeFileActivity(ctx context.Context, filePath string) error { + // Get heartbeat details from previous attempt (if any) + if activity.HasHeartbeatDetails(ctx) { + var startLine int + activity.GetHeartbeatDetails(ctx, &startLine) + // Resume from startLine + } + + // Process file and heartbeat progress + for lineNum := startLine; lineNum < totalLines; lineNum++ { + processLine(lineNum) + activity.RecordHeartbeat(ctx, lineNum) + } + + return nil +} +``` + +## Deterministic Map Iteration + +### WHY: Iterate over maps deterministically for workflow replay compatibility +### WHEN: +- **Iterating over maps in workflows** - Go map iteration order is non-deterministic + +```go +func WorkflowWithMap(ctx workflow.Context, data map[string]int) error { + // Use DeterministicKeys for deterministic iteration order + keys := workflow.DeterministicKeys(data) + for _, key := range keys { + value := data[key] + // Process key-value pair... + } + return nil +} +``` + +## Local Activities + +### WHY: Reduce latency for short, lightweight operations by skipping the task queue +### WHEN: +- **Short operations** - Activities completing in milliseconds/seconds +- **High-frequency calls** - When task queue overhead is significant +- **Low-latency requirements** - When you can't afford task queue round-trip + +```go +func WorkflowWithLocalActivity(ctx workflow.Context) error { + lao := workflow.LocalActivityOptions{ + ScheduleToCloseTimeout: 5 * time.Second, + } + ctx = workflow.WithLocalActivityOptions(ctx, lao) + + var result string + err := workflow.ExecuteLocalActivity(ctx, QuickLookup, "key").Get(ctx, &result) + return err +} +``` + +## Versioning with Patching + +### WHY: Safely deploy workflow code changes without breaking running workflows +### WHEN: +- **Adding new steps** - New code path for new executions, old path for replays +- **Changing activity calls** - Modify activity parameters or logic +- **Deprecating features** - Gradually remove old code paths + +```go +func VersionedWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + v := workflow.GetVersion(ctx, "new-greeting", workflow.DefaultVersion, 1) + + if v == workflow.DefaultVersion { + // Old implementation + return workflow.ExecuteActivity(ctx, OldGreetActivity).Get(ctx, nil) + } + // New implementation + return workflow.ExecuteActivity(ctx, NewGreetActivity).Get(ctx, nil) +} +``` diff --git a/references/go/testing.md b/references/go/testing.md new file mode 100644 index 0000000..8fe3276 --- /dev/null +++ b/references/go/testing.md @@ -0,0 +1,392 @@ +# Go SDK Testing + +## Overview + +The Go SDK provides the `testsuite` package for unit testing workflows with activity mocking and time control. + +## Test Suite Setup + +```go +import ( + "testing" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + env *testsuite.TestWorkflowEnvironment +} + +func (s *UnitTestSuite) SetupTest() { + s.env = s.NewTestWorkflowEnvironment() +} + +func (s *UnitTestSuite) AfterTest(suiteName, testName string) { + s.env.AssertExpectations(s.T()) +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} +``` + +## Testing Workflows + +```go +func (s *UnitTestSuite) TestGreetingWorkflow() { + s.env.RegisterWorkflow(GreetingWorkflow) + s.env.RegisterActivity(GreetActivity) + + s.env.ExecuteWorkflow(GreetingWorkflow, "World") + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) + + var result string + s.NoError(s.env.GetWorkflowResult(&result)) + s.Equal("Hello, World!", result) +} +``` + +## Mocking Activities + +```go +func (s *UnitTestSuite) TestWithMockedActivity() { + s.env.RegisterWorkflow(MyWorkflow) + + // Mock activity to return specific value + s.env.OnActivity(MyActivity, mock.Anything, "input").Return("mocked result", nil) + + s.env.ExecuteWorkflow(MyWorkflow, "input") + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) +} + +func (s *UnitTestSuite) TestActivityError() { + s.env.RegisterWorkflow(MyWorkflow) + + // Mock activity to return error + s.env.OnActivity(MyActivity, mock.Anything, mock.Anything).Return("", errors.New("activity failed")) + + s.env.ExecuteWorkflow(MyWorkflow, "input") + + s.True(s.env.IsWorkflowCompleted()) + s.Error(s.env.GetWorkflowError()) +} +``` + +### Mock Expectations Best Practices + +**Avoid using `mock.Anything` for all parameters - be explicit:** + +```go +// WRONG - too permissive +s.env.OnActivity(ProcessPayment, mock.Anything, mock.Anything).Return("success", nil) + +// CORRECT - explicit expectations +s.env.OnActivity(ProcessPayment, mock.Anything, "order-123").Return("success", nil) +// ^^^^^^^^^^^^ ^^^^^^^^^^^^^ +// context OK explicit input +``` + +**Exception:** Use `mock.Anything` only for: +1. `context.Context` parameters (timing/values vary) +2. Complex inputs that need custom matchers + +```go +// Custom matcher for complex validation +s.env.OnActivity( + ProcessPayment, + mock.Anything, // context + mock.MatchedBy(func(req *PaymentRequest) bool { + return req.Amount > 0 && req.Currency == "USD" + }), +).Return("success", nil) +``` + +## Testing with Timers + +```go +func (s *UnitTestSuite) TestWorkflowWithTimer() { + s.env.RegisterWorkflow(TimerWorkflow) + + // Time automatically advances in test environment + s.env.ExecuteWorkflow(TimerWorkflow) + + s.True(s.env.IsWorkflowCompleted()) +} +``` + +## Testing Signals + +```go +func (s *UnitTestSuite) TestSignalWorkflow() { + s.env.RegisterWorkflow(SignalWorkflow) + + // Send signal during workflow execution + s.env.RegisterDelayedCallback(func() { + s.env.SignalWorkflow("approve-signal", true) + }, time.Second) + + s.env.ExecuteWorkflow(SignalWorkflow) + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) +} +``` + +## Testing Queries + +```go +func (s *UnitTestSuite) TestQueryWorkflow() { + s.env.RegisterWorkflow(QueryableWorkflow) + + // Execute workflow + s.env.ExecuteWorkflow(QueryableWorkflow) + + // Query result + result, err := s.env.QueryWorkflow("status") + s.NoError(err) + + var status string + s.NoError(result.Get(&status)) + s.Equal("completed", status) +} +``` + +## Activity Unit Testing + +```go +func TestActivity(t *testing.T) { + // Activities can be tested directly without workflow environment + ctx := context.Background() + result, err := GreetActivity(ctx, "World") + + assert.NoError(t, err) + assert.Equal(t, "Hello, World!", result) +} + +// Or with the activity test environment +func TestActivityWithEnv(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + env.RegisterActivity(GreetActivity) + + result, err := env.ExecuteActivity(GreetActivity, "World") + require.NoError(t, err) + + var output string + require.NoError(t, result.Get(&output)) + require.Equal(t, "Hello, World!", output) +} +``` + +## Mock Workflow Start Time + +Control `workflow.Now(ctx)` in tests: + +```go +func (s *UnitTestSuite) TestTimeBasedWorkflow() { + s.env.RegisterWorkflow(TimeBasedWorkflow) + + // Set specific start time + startTime := time.Date(2024, 1, 15, 10, 0, 0, 0, time.UTC) + s.env.SetStartTime(startTime) + + // Now workflow.Now(ctx) returns this time + s.env.ExecuteWorkflow(TimeBasedWorkflow) + + s.True(s.env.IsWorkflowCompleted()) +} +``` + +## Table-Driven Tests + +Use named structs and snake_case for test names: + +```go +func TestValidateOrder(t *testing.T) { + type testCase struct { + name string + input string + expected error + wantErr bool + } + + tests := []testCase{ + { + name: "valid_order", // Use snake_case + input: "order-123", + expected: nil, + wantErr: false, + }, + { + name: "empty_order", // Not "empty order" + input: "", + expected: ErrEmptyInput, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := ValidateOrder(tt.input) + + if tt.wantErr { + require.Error(t, err) + require.Equal(t, tt.expected, err) + } else { + require.NoError(t, err) + } + }) + } +} +``` + +## Replay Testing for Determinism Verification + +### WHY: Verify workflow code changes don't break determinism for existing executions +### WHEN: +- **Before deploying workflow changes** - Ensure backwards compatibility +- **CI/CD pipelines** - Automate determinism checks +- **Debugging non-determinism** - Replay production history locally + +Replay testing executes workflow code against recorded event histories to detect non-deterministic changes. + +### Using WorkflowReplayer + +```go +import ( + "testing" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/client" +) + +func TestReplayWorkflowHistory(t *testing.T) { + // Create a replayer + replayer := worker.NewWorkflowReplayer() + + // Register the workflow(s) you want to replay + replayer.RegisterWorkflow(MyWorkflow) + + // Replay from a JSON history file (exported from Temporal UI or CLI) + err := replayer.ReplayWorkflowHistoryFromJSONFile(nil, "testdata/workflow_history.json") + if err != nil { + t.Fatalf("Replay failed: %v", err) + } +} +``` + +### Replaying from History Object + +```go +func TestReplayFromHistory(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(MyWorkflow) + + // Load history from JSON reader + file, _ := os.Open("testdata/workflow_history.json") + defer file.Close() + + history, err := client.HistoryFromJSON(file, client.HistoryJSONOptions{}) + if err != nil { + t.Fatalf("Failed to load history: %v", err) + } + + // Replay the history + err = replayer.ReplayWorkflowHistory(nil, history) + if err != nil { + t.Fatalf("Replay failed: %v", err) + } +} +``` + +### Exporting Workflow History + +To get history for replay testing: + +```bash +# Using temporal CLI +temporal workflow show --workflow-id my-workflow-id --output json > workflow_history.json + +# Or from Temporal UI: Download JSON from workflow execution page +``` + +### Replaying Multiple Production Histories + +```go +func TestProductionWorkflowReplays(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(ProcessOrder) + replayer.RegisterWorkflow(HandleRefund) + + histories := []string{ + "testdata/order-success.json", + "testdata/order-with-retry.json", + "testdata/refund-flow.json", + } + + for _, history := range histories { + t.Run(history, func(t *testing.T) { + err := replayer.ReplayWorkflowHistoryFromJSONFile(nil, history) + require.NoError(t, err) + }) + } +} +``` + +### Partial Replay for Debugging + +```go +// Replay only up to event ID 16 (useful for debugging) +err := replayer.ReplayPartialWorkflowHistoryFromJSONFile(nil, "history.json", 16) +``` + +## Testing Failure Scenarios + +```go +func (s *UnitTestSuite) TestActivityFailureHandling() { + s.env.RegisterWorkflow(OrderWorkflow) + + // Mock activity to fail with non-retryable error + s.env.OnActivity(ProcessPayment, mock.Anything, mock.Anything). + Return("", temporal.NewNonRetryableApplicationError("payment declined", "PaymentError", nil)) + + s.env.ExecuteWorkflow(OrderWorkflow, testOrder) + + s.True(s.env.IsWorkflowCompleted()) + s.Error(s.env.GetWorkflowError()) +} +``` + +## Testing Child Workflows + +```go +func (s *UnitTestSuite) TestParentWithChildWorkflow() { + s.env.RegisterWorkflow(ParentWorkflow) + s.env.RegisterWorkflow(ChildWorkflow) + + // Mock child workflow + s.env.OnWorkflow(ChildWorkflow, mock.Anything).Return("child result", nil) + + s.env.ExecuteWorkflow(ParentWorkflow, "input") + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) +} +``` + +## Best Practices + +1. Use test suite for consistent setup/teardown +2. Mock activities for isolated workflow testing +3. Register delayed callbacks for signal testing +4. Test error scenarios explicitly +5. Use explicit mock expectations (avoid `mock.Anything` for inputs) +6. **Include replay tests in CI/CD** to catch determinism issues before deployment +7. **Save production histories** from critical workflows for regression testing +8. Test both happy path and error/cancellation paths +9. Use snake_case for test names for consistent output diff --git a/references/go/versioning.md b/references/go/versioning.md new file mode 100644 index 0000000..8dbdf57 --- /dev/null +++ b/references/go/versioning.md @@ -0,0 +1,298 @@ +# Go SDK Versioning + +## Overview + +Workflow versioning allows you to safely deploy incompatible changes to Workflow code while maintaining backwards compatibility with open Workflow Executions. The Go SDK provides multiple approaches: the GetVersion API for code-level branching, Workflow Type versioning for simple cases, and Worker Versioning for deployment-level control. + +## Why Versioning Matters + +When Workers restart after deployment, they resume open Workflow Executions through **history replay**. During replay, the SDK re-executes your Workflow code and compares the generated Commands against the Events in history. If your updated code produces different Commands than the original execution, it causes a non-determinism error. + +Versioning allows old and new code paths to coexist, ensuring that: +- Open Workflow Executions replay correctly with original behavior +- New Workflow Executions use the updated behavior + +## Workflow Versioning with GetVersion API + +### GetVersion Function Signature + +```go +workflow.GetVersion(ctx workflow.Context, changeID string, minSupported, maxSupported workflow.Version) workflow.Version +``` + +**Parameters:** +- `ctx` - The Workflow context +- `changeID` - A string uniquely identifying this change +- `minSupported` - Minimum supported version (oldest compatible revision) +- `maxSupported` - Maximum supported version (current revision) + +**Return value:** The version number to use for branching logic. + +### Basic Usage + +When you need to make an incompatible change, wrap the old and new code paths with `GetVersion`: + +```go +func NotifyWorkflow(ctx workflow.Context, customer Customer) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + // Branch based on version + version := workflow.GetVersion(ctx, "NotificationMethod", workflow.DefaultVersion, 1) + + if version == workflow.DefaultVersion { + // Original code path - send fax + return workflow.ExecuteActivity(ctx, SendFax, customer).Get(ctx, nil) + } + // New code path - send email + return workflow.ExecuteActivity(ctx, SendEmail, customer).Get(ctx, nil) +} +``` + +The `workflow.DefaultVersion` constant (value `-1`) identifies the original code that existed before `GetVersion` was added. + +### How GetVersion Works + +When `GetVersion` executes: +1. For **new Workflow Executions**: Records a `MarkerRecorded` Event in history with the current version number +2. For **replaying Workflow Executions**: Reads the version from the existing Marker and returns it + +This ensures the same code path executes during replay as during the original execution. + +### Branching with Multiple Versions + +A single Workflow can have multiple Change IDs, each tracking a different modification: + +```go +func OrderWorkflow(ctx workflow.Context, order Order) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: 5 * time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + // First versioned change + v1 := workflow.GetVersion(ctx, "PaymentMethod", workflow.DefaultVersion, 1) + if v1 == workflow.DefaultVersion { + workflow.ExecuteActivity(ctx, ProcessCash, order).Get(ctx, nil) + } else { + workflow.ExecuteActivity(ctx, ProcessCard, order).Get(ctx, nil) + } + + // Second versioned change (independent) + v2 := workflow.GetVersion(ctx, "ShippingProvider", workflow.DefaultVersion, 1) + if v2 == workflow.DefaultVersion { + workflow.ExecuteActivity(ctx, ShipUSPS, order).Get(ctx, nil) + } else { + workflow.ExecuteActivity(ctx, ShipFedEx, order).Get(ctx, nil) + } + + return nil +} +``` + +You can use the same Change ID for multiple changes **only** if they are deployed together. Never reuse a Change ID for modifications deployed at different times. + +### Adding Support for Additional Versions + +As requirements evolve, add new version branches: + +```go +func NotifyWorkflow(ctx workflow.Context, customer Customer) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + version := workflow.GetVersion(ctx, "NotificationMethod", workflow.DefaultVersion, 3) + + switch version { + case workflow.DefaultVersion: + return workflow.ExecuteActivity(ctx, SendFax, customer).Get(ctx, nil) + case 1: + return workflow.ExecuteActivity(ctx, SendEmail, customer).Get(ctx, nil) + case 2: + return workflow.ExecuteActivity(ctx, SendSMS, customer).Get(ctx, nil) + default: // version 3 + return workflow.ExecuteActivity(ctx, SendPushNotification, customer).Get(ctx, nil) + } +} +``` + +### Removing Support for Old Versions + +Once no open Workflow Executions use an old version, you can remove its code path. First, verify no executions use that version using List Filters. + +**Find workflows using a specific version:** +``` +WorkflowType = "NotifyWorkflow" + AND ExecutionStatus = "Running" + AND TemporalChangeVersion = "NotificationMethod-1" +``` + +**Find workflows predating GetVersion (no marker):** +``` +WorkflowType = "NotifyWorkflow" + AND ExecutionStatus = "Running" + AND TemporalChangeVersion IS NULL +``` + +**CLI command:** +```bash +temporal workflow list --query 'WorkflowType = "NotifyWorkflow" AND ExecutionStatus = "Running" AND TemporalChangeVersion = "NotificationMethod-1"' +``` + +After confirming no open executions, update `minSupported` and remove old code: + +```go +func NotifyWorkflow(ctx workflow.Context, customer Customer) error { + ao := workflow.ActivityOptions{StartToCloseTimeout: time.Minute} + ctx = workflow.WithActivityOptions(ctx, ao) + + // Removed support for versions DefaultVersion and 1 + version := workflow.GetVersion(ctx, "NotificationMethod", 2, 3) + + if version == 2 { + return workflow.ExecuteActivity(ctx, SendSMS, customer).Get(ctx, nil) + } + return workflow.ExecuteActivity(ctx, SendPushNotification, customer).Get(ctx, nil) +} +``` + +**Warning:** Any Workflow Execution with a version less than `minSupported` will fail. Verify no such executions exist before updating. + +### Two-Step Cleanup Process + +After your namespace's retention period passes (e.g., 30, 60, or 90 days), all workflow histories using old code will be purged, making it safe to remove versioning code. + +```go +// Step 1: Keep GetVersion, remove old branch (first PR) +_ = workflow.GetVersion(ctx, "NotificationMethod", workflow.DefaultVersion, 1) +newLogic() + +// Step 2: Remove GetVersion entirely (separate PR after deployment) +newLogic() +``` + +Why 2 steps? To avoid non-determinism in new workflows if the first PR has to be rolled back. + +## Workflow Type Versioning + +For simpler cases, create a new Workflow Type instead of using GetVersion: + +```go +// Original workflow +func PizzaWorkflow(ctx workflow.Context, order PizzaOrder) (OrderConfirmation, error) { + // Original implementation +} + +// New version with incompatible changes +func PizzaWorkflowV2(ctx workflow.Context, order PizzaOrder) (OrderConfirmation, error) { + // Updated implementation +} +``` + +Register both with the Worker: + +```go +w.RegisterWorkflow(PizzaWorkflow) +w.RegisterWorkflow(PizzaWorkflowV2) +``` + +Update client code to start new executions with the new type. Use List Filters to monitor when the original type has no open executions: + +``` +WorkflowType = "PizzaWorkflow" AND ExecutionStatus = "Running" +``` + +**Trade-offs:** +- Simpler than GetVersion branching +- Requires duplicating code +- Requires updating all code/commands that start the Workflow + +## Worker Versioning + +Worker Versioning shifts version management from code-level branching to deployment infrastructure, enabling blue-green and rainbow deployments. + +### Key Concepts + +**Worker Deployment:** A logical service grouping all versions of your Workers (e.g., "loan-processor"). Identified by a deployment name. + +**Worker Deployment Version:** A specific build within a deployment, identified by combining deployment name and Build ID (e.g., "loan-processor:abc123"). + +**Build ID:** A unique identifier for a specific code build (git commit hash, version number, or timestamp). + +### Configuring Workers for Versioning + +```go +import ( + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" +) + +func main() { + c, _ := client.Dial(client.Options{}) + defer c.Close() + + buildID := os.Getenv("BUILD_ID") // e.g., git commit hash or version number + + w := worker.New(c, "my-task-queue", worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: worker.WorkerDeploymentVersion{ + DeploymentName: "order-processor", + BuildId: buildID, + }, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, + }) + + w.RegisterWorkflow(OrderWorkflow) + w.RegisterActivity(ProcessOrder) + + w.Run(worker.InterruptCh()) +} +``` + +### Versioning Behaviors: PINNED vs AUTO_UPGRADE + +**PINNED:** +- Workflow runs only on the Worker version assigned at first execution +- No patching required - code never changes mid-execution +- Cannot use other versioning APIs (GetVersion) + +**AUTO_UPGRADE:** +- Workflow can move to newer Worker versions +- Can be rerouted when Current Deployment Version changes +- Patching (GetVersion) often required for safe transitions +- Useful for long-running Workflows that need bug fixes + +### When to Use Each Behavior + +**Choose PINNED when:** +- Workflows are short-running (minutes to hours) +- Consistency and stability are critical +- You want to eliminate version compatibility complexity + +**Choose AUTO_UPGRADE when:** +- Workflows run for weeks or months +- You need to apply bug fixes to in-progress Workflows +- Infrastructure cost of maintaining old versions is prohibitive + +### Querying Workflows by Version + +The `TemporalWorkerDeploymentVersion` search attribute tracks which version processed each Workflow: + +``` +WorkflowType = "OrderWorkflow" + AND ExecutionStatus = "Running" + AND TemporalWorkerDeploymentVersion = "order-processor:v1.0.0" +``` + +## Best Practices + +1. **Identify open executions first** - Check for running Workflows before deciding if versioning is needed +2. **Use meaningful Change IDs** - Names should describe what changed (e.g., "PaymentProviderSwitch") +3. **Never reuse Change IDs** - Each deployment should use unique Change IDs +4. **Test replay compatibility** - Use `worker.NewWorkflowReplayer()` to verify changes do not break existing histories +5. **Clean up old versions** - Remove unsupported code paths once no executions use them +6. **Choose the right approach:** + - GetVersion for code-level branching with gradual migration + - Workflow Type versioning for simple, one-time changes + - Worker Versioning for deployment-level control with rainbow deploys