ewf

package module
v0.0.0-...-96a8e62 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

README

EWF - Embeddable Workflow Framework for Go

EWF is a simple, lightweight, and embeddable workflow framework for Go applications. It allows you to define stateful, multi-step processes that are resilient to application crashes and interruptions.

Core Features

  • Centralized Engine: A powerful Engine manages workflow definitions, activities, and execution.
  • Queue-Based Execution (New): Integrates a lightweight queue for simultaneous and concurrent workflow processing.
  • Stateful & Resilient Workflows: Each workflow maintains its own state, which is persisted after each step to a Store.
  • Automatic Resumption: The engine automatically finds and resumes interrupted workflows on startup, ensuring no work is lost.
  • Asynchronous Execution: Run workflows in the background using Run with WithAsync() option, perfect for use in HTTP servers and other concurrent applications.
  • Pluggable Storage: Comes with a built-in SQLiteStore, but you can implement the Store interface to use any key-value backend.
  • Context-Aware Retries: Define robust retry policies for steps that might fail, with delays that respect context cancellation to prevent resource leaks.
  • Lifecycle Hooks: Execute custom logic before or after a workflow or a specific step.

Feature Matrix

Feature Supported Notes
Step Retry Policies Per-step, with customizable attempts and flexible backoff (constant, exponential, etc)
Step Timeouts Per-step, context-based cancellation
Idempotency Helpers/Patterns Ergonomic, context-based, with docs/examples
Before/After Workflow Hooks For setup, teardown, logging, etc.
Before/After Step Hooks For auditing, metrics, etc.
State Persistence SQLite built-in; pluggable interface
Workflow Resumption Survives crashes/restarts
Asynchronous Execution Run workflows in background
Synchronous Execution For tests and CLI
Pluggable Storage Implement your own Store
Queue Engine Provides an interface for concurrent background processing for queued tasks and workflows
Queue Built-in interface; supports enqueue, dequeue, and worker pool management
CLI/HTTP Example Workflows See examples/ directory
Context Propagation Step context carries deadlines, values
Step Metadata in Context Step name injected for idempotency
Testing Support Unit, integration, E2E patterns
GoDoc & User Guide See docs/userguide.md

Installation

go get github.com/xmonader/ewf

Concepts

  • Engine: The central hub of the framework. It holds registered Activity functions and WorkflowTemplate definitions. It's responsible for creating and running workflows.
  • Activity: A simple Go function (StepFn) that represents a single unit of work. Activities are registered with the engine by a unique name.
  • WorkflowTemplate: A blueprint for a workflow, defining the sequence of activities (steps) to be executed.
  • Workflow: A running instance of a WorkflowTemplate. Each workflow has a unique ID, its own state, and tracks its progress through the steps.
  • Store: A persistence layer (e.g., SQLiteStore) that saves and loads workflow state and queue metadata, enabling resilience.
  • Queue: A concurrent-safe structure that holds pending tasks or workflow jobs.
  • QueueEngine: It acts as a scheduler and execution manager for queued jobs, ensuring:
    • Automatic worker startup when a queue is created.
    • Graceful shutdowns respecting context cancellation.
    • Optional persistence layer integration for durable queues.

Basic Usage

Here's a simple example of a two-step workflow using the modern, engine-centric approach.

package main

import (
 "context"
 "log"
 "time"

 "github.com/xmonader/ewf"
)

// An activity that waits for a given duration.
func waitActivity(duration time.Duration) ewf.StepFn {
 return func(ctx context.Context, state ewf.State) error {
  log.Printf("Waiting for %s...", duration)
  time.Sleep(duration)
  return nil
 }
}

func main() {
 // 1. Set up a store for persistence.
 store, err := ewf.NewSQLiteStore("cli_example.db")
 if err != nil {
  log.Fatalf("store error: %v", err)
 }
 defer store.Close()

// 2. Create a new engine.
engine, err := ewf.NewEngine(ewf.WithStore(store))
 if err != nil {
  log.Fatalf("engine error: %v", err)
 }

 // 3. Register your activities (the building blocks of workflows).
 engine.Register("wait_5s", waitActivity(5*time.Second))
 engine.Register("wait_10s", waitActivity(10*time.Second))

 // 4. Define and register a workflow template.
 myWorkflow := &ewf.WorkflowTemplate{
  Steps: []ewf.Step{
   {
    Name: "wait_5s",
    RetryPolicy: &ewf.RetryPolicy{
     MaxAttempts: 3,
     BackOff:     ewf.ConstantBackoff(2 * time.Second),
    },
   },
   {
    Name: "wait_10s",
    RetryPolicy: &ewf.RetryPolicy{
     MaxAttempts: 5,
     BackOff:     ewf.ExponentialBackoff(500*time.Millisecond, 10*time.Second, 2.0),
    },
   },
  },
 }
 engine.RegisterTemplate("my_workflow", myWorkflow)

 // 5. Create a new workflow instance from the template.
 wf, err := engine.NewWorkflow("my_workflow")
 if err != nil {
  log.Fatalf("failed to create workflow: %v", err)
 }

// 6. Run the workflow synchronously.
log.Println("Starting workflow...")
if err := engine.Run(context.Background(), wf); err != nil {
 log.Fatalf("Workflow failed: %v", err)
}

// Reload from the store to inspect the most recent workflow state.
latest, err := store.LoadWorkflowByUUID(context.Background(), wf.UUID)
if err != nil {
 log.Fatalf("failed to load workflow: %v", err)
}
log.Printf("Workflow completed successfully at step %d!\n", latest.CurrentStep)
}

This example shows the usage of the Queue Engine:

// first, implement QueueEngine, Queue Interface

 wfengine, err := NewEngine(WithQueueEngine(qEngine))
 if err != nil {
  log.Fatalf("wf engine error: %v", err)
 }
 defer func() {
  if err := wfengine.Close(context); err != nil {
   log.Fatalf("failed to close engine: %v", err)
  }
 }

    // queue with custom options
 err = wfengine.CreateQueue(
  context,
  name,
  WorkersDefinition{
   Count:        1,
   PollInterval: 300 * time.Millisecond,
  },
  QueueOptions{
   AutoDelete:  true,
   DeleteAfter: 2 * time.Second,
   PopTimeout:  1 * time.Second,
  },
 )
 if err != nil {
  log.Fatalf("failed to create queue: %v", err)
 }

 workflow, err := wfengine.NewWorkflow(wfName, WithQueue(name))
 if err != nil {
  log.Fatalf("failed to create workflow: %v", err)
 }
 wfengine.Run(context, workflow)
Workflow Options

When creating a workflow instance you can attach metadata or routing hints via options:

wf, err := engine.NewWorkflow(
    "my_workflow",
    ewf.WithQueue("billing-jobs"),                   // enqueue instead of running inline
    ewf.WithDisplayName("Quarterly Billing Run"),    // nicer display for dashboards/logs
    ewf.WithMetadata(map[string]string{"region": "us"}),// arbitrary key/value context
)
  • WithQueue wires the workflow to a queue so engine.Run enqueues instead of executing immediately.
  • WithDisplayName is useful for logs, observability tools, or UIs that show human-readable labels.
  • WithMetadata persists arbitrary contextual data alongside the workflow and is available wherever the workflow is reloaded.

Retry Policy & Backoff Examples

You can use helpers from backoffs.go for ergonomic retry strategies. For example:

step := ewf.Step{
    Name: "StepA",
    RetryPolicy: &ewf.RetryPolicy{
        MaxAttempts: 3,
        BackOff:     ewf.ConstantBackoff(2 * time.Second),
    },
}

step := ewf.Step{
    Name: "StepB",
    RetryPolicy: &ewf.RetryPolicy{
        MaxAttempts: 5,
        BackOff:     ewf.ExponentialBackoff(500*time.Millisecond, 10*time.Second, 2.0),
    },
}
  • MaxAttempts is the total number of attempts (including the first try).
  • BackOff controls the delay pattern (constant, exponential, etc.).
  • If BackOff is nil, the step will not be retried.
  • Return ewf.ErrFailWorkflowNow to fail the workflow immediately, skipping retries.

HTTP Server Example

The framework is perfect for building robust, asynchronous services. The included httpexample shows how to:

  • Run the engine in a standard Go HTTP server.
  • Start workflows asynchronously from an API endpoint.
  • Immediately return a workflow_id to the client.
  • Provide a separate /status endpoint to check the progress of a workflow.
  • Automatically resume interrupted workflows when the server restarts.

To run the example:

cd examples/httpexample
go run main.go

In another terminal:

# Start a new workflow
curl -v http://localhost:8090/greet/EWF

# Check its status using the returned ID
curl http://localhost:8090/status/<workflow-id>

Complex Struct State Example

The structexample demonstrates how to store and retrieve complex, nested structs in workflow state with type safety. Important Note: The workflow state uses map[string]any, so when retrieving structs, you must use type assertions (e.g., person, ok := state["person"].(Person)). This provides runtime type safety but requires careful type handling to avoid panics.

Key points:

  • Store structs directly in state: state["key"] = myStruct
  • Retrieve with type assertion: myStruct, ok := state["key"].(MyStructType)
  • Always check the ok boolean to handle type assertion failures gracefully
  • Nested structs work seamlessly with this approach

To run the example:

cd examples/structexample
go run main.go

Running Tests

To run the test suite for the library:

go test -v ./...

Documentation

Overview

Package ewf implements an Extensible Workflow Framework.

This file contains the custom backoff implementation that supports: - Constant backoff: fixed delay between retries - Exponential backoff: delay increases exponentially with each retry - Exponential backoff with jitter: adds randomness to prevent thundering herd - Zero backoff: no delay between retries - Stop backoff: signals that no more retries should be attempted

All backoff implementations are serializable to JSON, allowing workflows with retry policies to be persisted and resumed.

Package ewf provides a workflow engine for defining and executing workflows in Go.

Index

Constants

View Source
const (
	// StepNameContextKey is used to store the current step name in the context
	StepNameContextKey contextKey = "stepName"
)

Context keys used by the workflow engine

View Source
const Stop time.Duration = -1

Stop is a special duration value that indicates that no more retries should be made.

Variables

View Source
var ErrFailWorkflowNow = errors.New("fail workflow now")

ErrFailWorkflowNow is a special error that can be returned by a step to indicate that the workflow should be failed immediately.

View Source
var ErrQueueAlreadyExists error = errors.New("queue already exists")

ErrQueueAlreadyExists indicates that the queue already exists in queue engine

View Source
var ErrQueueNotFound error = errors.New("queue doesn't exist")

ErrQueueNotFound indicates that the queue is deleted or never created

View Source
var ErrWorkflowNotFound = errors.New("workflow not found")

ErrWorkflowNotFound is returned when a workflow is not found in the database.

Functions

func IsPermanent

func IsPermanent(err error) bool

IsPermanent returns true if the given error is a Permanent error.

func MarshalBackOff

func MarshalBackOff(b BackOff) ([]byte, error)

MarshalBackOff serializes a BackOff implementation to JSON

func Retry

func Retry(operation func() error, b BackOff) error

Retry retries the operation function using the specified BackOff policy. If the operation returns nil, Retry returns nil. If the operation returns an error, Retry retries as specified by the BackOff policy. If the BackOff policy indicates no more retries, the most recent error is returned.

func RetryNotify

func RetryNotify(operation func() error, b BackOff, notify func(error, time.Duration)) error

RetryNotify is the same as Retry but calls notify after each failed attempt.

Types

type AfterStepHook

type AfterStepHook func(ctx context.Context, w *Workflow, step *Step, err error)

AfterStepHook is a function run after a step finishes.

type AfterWorkflowHook

type AfterWorkflowHook func(ctx context.Context, w *Workflow, err error)

AfterWorkflowHook is a function run after a workflow finishes.

type BackOff

type BackOff interface {
	// NextBackOff returns the duration to wait before retrying the operation.
	// If the implementation returns Stop, the operation should not be retried.
	NextBackOff() time.Duration

	// Reset resets the backoff to its initial state.
	// This is typically called before starting a new sequence of backoff operations.
	Reset()
}

BackOff interface defines the operations required for backoff implementations. This is the core interface that all backoff strategies must implement. It provides methods to calculate the next backoff duration and reset the backoff state.

func ConstantBackoff

func ConstantBackoff(delay time.Duration) BackOff

ConstantBackoff returns a BackOff that always waits for the specified delay.

func ExponentialBackoff

func ExponentialBackoff(initialInterval, maxInterval time.Duration, multiplier float64) BackOff

ExponentialBackoff returns a BackOff with exponential delays. initialInterval: first delay; maxInterval: cap for delay; multiplier: growth factor (e.g. 2.0).

func ExponentialBackoffWithJitter

func ExponentialBackoffWithJitter(initialInterval, maxInterval time.Duration, multiplier float64, jitter float64) BackOff

ExponentialBackoffWithJitter returns a BackOff with exponential delays and jitter. initialInterval: first delay; maxInterval: cap for delay; multiplier: growth factor (e.g. 2.0). jitter: randomization factor between 0 and 1 that determines the amount of randomness.

func NewConstantBackOff

func NewConstantBackOff(interval time.Duration) BackOff

NewConstantBackOff creates a backoff policy that always returns the same backoff delay.

func NewExponentialBackOff

func NewExponentialBackOff(initialInterval, maxInterval time.Duration, multiplier float64) BackOff

NewExponentialBackOff creates a new exponential backoff policy. The formula used is: currentInterval = initialInterval * multiplier^(n-1) with jitter.

func NewExponentialBackOffWithJitter

func NewExponentialBackOffWithJitter(initialInterval, maxInterval time.Duration, multiplier float64, jitter float64) BackOff

NewExponentialBackOffWithJitter creates a new exponential backoff policy with jitter.

func NewStopBackOff

func NewStopBackOff() BackOff

NewStopBackOff creates a backoff policy that always returns Stop.

func NewZeroBackOff

func NewZeroBackOff() BackOff

NewZeroBackOff creates a backoff policy that always returns 0 (no waiting).

func UnmarshalBackOff

func UnmarshalBackOff(data []byte) (BackOff, error)

UnmarshalBackOff deserializes a JSON representation into a BackOff implementation

func ZeroBackoff

func ZeroBackoff() BackOff

ZeroBackoff returns a BackOff that always returns zero delay (for immediate retries).

type BackOffContext

type BackOffContext interface {
	BackOff
	Context() context.Context
}

BackOffContext wraps a BackOff with a context to support cancellation.

func WithContext

func WithContext(b BackOff, ctx context.Context) BackOffContext

WithContext returns a BackOffContext with the given context and backoff.

type BackOffData

type BackOffData struct {
	Type                BackOffType   `json:"type"`
	Interval            time.Duration `json:"interval,omitempty"`             // For ConstantBackOffImpl
	InitialInterval     time.Duration `json:"initial_interval,omitempty"`     // For ExponentialBackOffImpl
	CurrentInterval     time.Duration `json:"current_interval,omitempty"`     // For ExponentialBackOffImpl
	MaxInterval         time.Duration `json:"max_interval,omitempty"`         // For ExponentialBackOffImpl
	Multiplier          float64       `json:"multiplier,omitempty"`           // For ExponentialBackOffImpl
	RandomizationFactor float64       `json:"randomization_factor,omitempty"` // For ExponentialBackOffImpl with jitter
	MaxElapsedTime      time.Duration `json:"max_elapsed_time,omitempty"`     // For ExponentialBackOffImpl
}

BackOffData is used for serializing/deserializing BackOff implementations

type BackOffType

type BackOffType string

BackOffType identifies the type of backoff implementation

const (
	ConstantBackOffType    BackOffType = "constant"
	ExponentialBackOffType BackOffType = "exponential"
	ZeroBackOffType        BackOffType = "zero"
	StopBackOffType        BackOffType = "stop"
)

type BeforeStepHook

type BeforeStepHook func(ctx context.Context, w *Workflow, step *Step)

BeforeStepHook is a function run before a step starts.

type BeforeWorkflowHook

type BeforeWorkflowHook func(ctx context.Context, w *Workflow)

BeforeWorkflowHook is a function run before a workflow starts.

type Clock

type Clock interface {
	Now() time.Time
}

Clock interface abstracts time operations for easier testing.

type ConstantBackOffImpl

type ConstantBackOffImpl struct {
	Interval time.Duration `json:"interval"` // Serializable field
}

ConstantBackOffImpl implements a constant backoff strategy.

func (*ConstantBackOffImpl) NextBackOff

func (b *ConstantBackOffImpl) NextBackOff() time.Duration

NextBackOff returns the constant interval.

func (*ConstantBackOffImpl) Reset

func (b *ConstantBackOffImpl) Reset()

Reset is a no-op for constant backoff.

type Engine

type Engine struct {
	// contains filtered or unexported fields
}

Engine is the central component for managing and executing workflows. It holds a registry of all available activities and a store for persistence. Engine is the central component for managing and executing workflows.

func NewEngine

func NewEngine(opts ...EngineOption) (*Engine, error)

NewEngine creates a new workflow engine.

func (*Engine) Close

func (e *Engine) Close(ctx context.Context) error

Close shutdowns the engine gracefully

func (*Engine) CloseQueue

func (e *Engine) CloseQueue(ctx context.Context, queueName string) error

CloseQueue closes a queue by its name

func (*Engine) CreateQueue

func (e *Engine) CreateQueue(ctx context.Context, queueName string, workerDef WorkersDefinition, queueOptions QueueOptions) error

CreateQueue creates a new queue and starts workers for it

func (*Engine) NewWorkflow

func (e *Engine) NewWorkflow(name string, opts ...WorkflowOption) (Workflow, error)

NewWorkflow creates a new workflow instance from a registered definition.

func (*Engine) Register

func (e *Engine) Register(name string, activity StepFn)

Register registers an activity function with the engine. This allows the activity to be used in workflow steps by its name. Register registers an activity function with the engine.

func (*Engine) RegisterTemplate

func (e *Engine) RegisterTemplate(name string, def *WorkflowTemplate)

NewWorkflow creates a new workflow instance with the given name and steps. The workflow is associated with the engine's store. RegisterTemplate registers a workflow template with the engine.

func (*Engine) ResumeWorkflows

func (e *Engine) ResumeWorkflows()

ResumeWorkflows resumes all workflows in the background. It resumes running workflows and pending workflows.

func (*Engine) Run

func (e *Engine) Run(ctx context.Context, w Workflow, opts ...RunOption) error

Run executes a workflow. By default, it runs synchronously and blocks until completion. Use WithAsync() option to run asynchronously in a goroutine. If the workflow has a queue name set, it will be enqueued instead of executed directly. Accepts a copy (value) to prevent sharing issues.

func (*Engine) Store

func (e *Engine) Store() Store

Store returns the store associated with the engine. This allows external access to the workflow store for querying workflow status and information. Users can use this method to retrieve workflow details by UUID, check workflow execution status, or list workflows with specific statuses. For example, after starting a workflow and receiving its UUID, clients can use engine.Store().LoadWorkflowByUUID(ctx, uuid) to get the workflow's current state. Store returns the store associated with the engine.

type EngineOption

type EngineOption func(*Engine)

EngineOption defines options for creating a new Engine.

func WithQueueEngine

func WithQueueEngine(queueEngine QueueEngine) EngineOption

func WithStore

func WithStore(store Store) EngineOption

type ExponentialBackOffImpl

type ExponentialBackOffImpl struct {
	InitialInterval     time.Duration `json:"initial_interval"`
	CurrentInterval     time.Duration `json:"-"` // Not serialized
	Multiplier          float64       `json:"multiplier"`
	MaxInterval         time.Duration `json:"max_interval"`
	RandomizationFactor float64       `json:"randomization_factor"` // For jitter
	MaxElapsedTime      time.Duration `json:"max_elapsed_time"`
	Clock               Clock         `json:"-"` // Not serialized
	StartTime           time.Time     `json:"-"` // Not serialized
	// contains filtered or unexported fields
}

ExponentialBackOffImpl implements an exponential backoff strategy.

func (*ExponentialBackOffImpl) NextBackOff

func (b *ExponentialBackOffImpl) NextBackOff() time.Duration

NextBackOff calculates the next backoff interval using the exponential formula.

func (*ExponentialBackOffImpl) Reset

func (b *ExponentialBackOffImpl) Reset()

Reset resets the backoff to its initial state.

type Permanent

type Permanent struct {
	Err error
}

Permanent wraps an error to indicate that it should not be retried.

func PermanentError

func PermanentError(err error) *Permanent

PermanentError wraps an error to indicate that it should not be retried.

func (*Permanent) Error

func (p *Permanent) Error() string

Error returns the wrapped error's message.

func (*Permanent) Unwrap

func (p *Permanent) Unwrap() error

Unwrap returns the wrapped error.

type Queue

type Queue interface {
	Name() string
	Enqueue(ctx context.Context, workflow Workflow) error
	Dequeue(ctx context.Context) (Workflow, error)
	Close(ctx context.Context) error
	CloseCh() <-chan struct{}
	ActivityCh() <-chan struct{}
	Length(ctx context.Context) (int64, error)
}

Queue represents a workflow queue.

type QueueEngine

type QueueEngine interface {
	CreateQueue(ctx context.Context, queueName string, queueOptions QueueOptions) (Queue, error)
	GetQueue(ctx context.Context, queueName string) (Queue, error)
	CloseQueue(ctx context.Context, queueName string) error
	Close(ctx context.Context) error
}

QueueEngine defines the interface for a queue engine that manages multiple queues

type QueueMetadata

type QueueMetadata struct {
	Name         string            `json:"name"`
	WorkersDef   WorkersDefinition `json:"worker_def"`
	QueueOptions QueueOptions      `json:"queue_options"`
}

QueueMetadata defines the queue data that needs to persist

type QueueOptions

type QueueOptions struct {
	AutoDelete  bool          `json:"auto_delete"`  // Delete when no longer in use
	DeleteAfter time.Duration `json:"delete_after"` // Ignored if AutoDelete is false
	PopTimeout  time.Duration `json:"pop_timeout"`  // timeout for dequeue operations
}

QueueOptions defines options for the queue

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts uint
	BackOff     BackOff
}

RetryPolicy defines the retry behavior for a step.

func (*RetryPolicy) MarshalJSON

func (rp *RetryPolicy) MarshalJSON() ([]byte, error)

MarshalJSON implements custom JSON marshaling for RetryPolicy

func (*RetryPolicy) UnmarshalJSON

func (rp *RetryPolicy) UnmarshalJSON(data []byte) error

UnmarshalJSON implements custom JSON unmarshaling for RetryPolicy

type RunOption

type RunOption func(*runOptions)

RunOption defines options for running workflows.

func WithAsync

func WithAsync() RunOption

WithAsync specifies whether to run the workflow asynchronously.

type SQLiteStore

type SQLiteStore struct {
	// contains filtered or unexported fields
}

SQLiteStore implements the Store interface using SQLite for persistence.

func NewSQLiteStore

func NewSQLiteStore(dsn string) (*SQLiteStore, error)

NewSQLiteStore creates a new SQLiteStore with the given DSN.

func (*SQLiteStore) Close

func (s *SQLiteStore) Close() error

Close closes the SQLite database connection.

func (*SQLiteStore) DeleteQueueMetadata

func (s *SQLiteStore) DeleteQueueMetadata(ctx context.Context, name string) error

DeleteQueueMetadata removes a queue by name from the SQLite store.

func (*SQLiteStore) DeleteWorkflow

func (s *SQLiteStore) DeleteWorkflow(ctx context.Context, uuid string) error

func (*SQLiteStore) ListWorkflowUUIDsByStatus

func (s *SQLiteStore) ListWorkflowUUIDsByStatus(ctx context.Context, status WorkflowStatus) ([]string, error)

func (*SQLiteStore) LoadAllQueueMetadata

func (s *SQLiteStore) LoadAllQueueMetadata(ctx context.Context) ([]*QueueMetadata, error)

LoadAllQueues loads all queues from the SQLite database.

func (*SQLiteStore) LoadAllWorkflowTemplates

func (s *SQLiteStore) LoadAllWorkflowTemplates(ctx context.Context) (map[string]*WorkflowTemplate, error)

LoadAllWorkflowTemplates loads all workflow templates from the SQLite database.

func (*SQLiteStore) LoadWorkflowByName

func (s *SQLiteStore) LoadWorkflowByName(ctx context.Context, name string) (Workflow, error)

func (*SQLiteStore) LoadWorkflowByUUID

func (s *SQLiteStore) LoadWorkflowByUUID(ctx context.Context, uuid string) (Workflow, error)

func (*SQLiteStore) LoadWorkflowTemplate

func (s *SQLiteStore) LoadWorkflowTemplate(ctx context.Context, name string) (*WorkflowTemplate, error)

LoadWorkflowTemplate loads a workflow template by name from the SQLite database.

func (*SQLiteStore) SaveQueueMetadata

func (s *SQLiteStore) SaveQueueMetadata(ctx context.Context, settings *QueueMetadata) error

SaveQueueMetadata saves the QueueMetadata into sqlite store

func (*SQLiteStore) SaveWorkflow

func (s *SQLiteStore) SaveWorkflow(ctx context.Context, workflow Workflow) error

SaveWorkflow saves the given workflow to the SQLite database. Receives a copy to prevent sharing issues.

func (*SQLiteStore) SaveWorkflowTemplate

func (s *SQLiteStore) SaveWorkflowTemplate(ctx context.Context, name string, tmpl *WorkflowTemplate) error

SaveWorkflowTemplate saves a workflow template to the SQLite database.

func (*SQLiteStore) Setup

func (s *SQLiteStore) Setup() error

Setup prepares the SQLite database for use.

type State

type State map[string]any

State represents the workflow state as a generic key-value map.

type Step

type Step struct {
	Name        string
	RetryPolicy *RetryPolicy
	Timeout     time.Duration // Maximum execution time for the step (including retries)
}

Step represents a single step in a workflow.

type StepFn

type StepFn func(ctx context.Context, state State) error

StepFn defines the function signature for a workflow step.

type StopBackOffImpl

type StopBackOffImpl struct{}

StopBackOffImpl implements a backoff that always returns Stop.

func (*StopBackOffImpl) NextBackOff

func (b *StopBackOffImpl) NextBackOff() time.Duration

NextBackOff always returns Stop.

func (*StopBackOffImpl) Reset

func (b *StopBackOffImpl) Reset()

Reset is a no-op for stop backoff.

type Store

type Store interface {
	Setup() error // could be a no-op, no problem.
	SaveWorkflow(ctx context.Context, workflow Workflow) error
	DeleteWorkflow(ctx context.Context, uuid string) error
	LoadWorkflowByName(ctx context.Context, name string) (Workflow, error)
	LoadWorkflowByUUID(ctx context.Context, uuid string) (Workflow, error)
	ListWorkflowUUIDsByStatus(ctx context.Context, status WorkflowStatus) ([]string, error)
	SaveWorkflowTemplate(ctx context.Context, name string, tmpl *WorkflowTemplate) error
	LoadWorkflowTemplate(ctx context.Context, name string) (*WorkflowTemplate, error)
	LoadAllWorkflowTemplates(ctx context.Context) (map[string]*WorkflowTemplate, error)
	SaveQueueMetadata(ctx context.Context, meta *QueueMetadata) error
	DeleteQueueMetadata(ctx context.Context, name string) error
	LoadAllQueueMetadata(ctx context.Context) ([]*QueueMetadata, error)
	Close() error // could be a no-op, no problem.
}

Store defines the interface for workflow persistence.

type SystemClock

type SystemClock struct{}

SystemClock implements the Clock interface using the system clock.

func (SystemClock) Now

func (c SystemClock) Now() time.Time

Now returns the current time.

type WorkersDefinition

type WorkersDefinition struct {
	Count        int           `json:"worker_count"`  // number of workers
	PollInterval time.Duration `json:"poll_interval"` // interval between polling the queue for new workflows
	WorkTimeout  time.Duration `json:"work_timeout"`  // timeout for worker to process work
}

WorkersDefinition defines the worker pool for processing workflows in the queue

type Workflow

type Workflow struct {
	UUID        string            `json:"uuid"`
	Name        string            `json:"name"`
	Status      WorkflowStatus    `json:"status"`
	State       State             `json:"state"`
	CurrentStep int               `json:"current_step"`
	CreatedAt   time.Time         `json:"created_at"`
	Steps       []Step            `json:"steps"`
	QueueName   string            `json:"queue_name"`
	DisplayName string            `json:"display_name"`
	Metadata    map[string]string `json:"metadata"`
	// contains filtered or unexported fields
}

Workflow represents a workflow instance.

func NewWorkflow

func NewWorkflow(name string, opts ...WorkflowOption) Workflow

NewWorkflow creates a new workflow instance with the given name and options.

type WorkflowOption

type WorkflowOption func(w *Workflow)

WorkflowOption defines options for creating a new Workflow.

func WithDisplayName

func WithDisplayName(displayName string) WorkflowOption

WithDisplayName specifies the display name of the workflow.

func WithMetadata

func WithMetadata(metadata map[string]string) WorkflowOption

WithMetadata specifies the metadata of the workflow.

func WithQueue

func WithQueue(queueName string) WorkflowOption

WithQueue specifies the queue name to enqueue the workflow into.

type WorkflowStatus

type WorkflowStatus string

WorkflowStatus represents the status of a workflow.

const (
	// StatusPending indicates the workflow is pending and has not started.
	StatusPending WorkflowStatus = "pending"
	// StatusRunning indicates the workflow is currently running.
	StatusRunning WorkflowStatus = "running"
	// StatusCompleted indicates the workflow has completed successfully.
	StatusCompleted WorkflowStatus = "completed"
	// StatusFailed indicates the workflow has failed.
	StatusFailed WorkflowStatus = "failed"
)

type WorkflowTemplate

type WorkflowTemplate struct {
	Steps               []Step
	BeforeWorkflowHooks []BeforeWorkflowHook
	AfterWorkflowHooks  []AfterWorkflowHook
	BeforeStepHooks     []BeforeStepHook
	AfterStepHooks      []AfterStepHook
}

WorkflowTemplate defines the structure and hooks for a workflow definition.

type ZeroBackOffImpl

type ZeroBackOffImpl struct{}

ZeroBackOffImpl implements a backoff that always returns zero.

func (*ZeroBackOffImpl) NextBackOff

func (b *ZeroBackOffImpl) NextBackOff() time.Duration

NextBackOff always returns zero.

func (*ZeroBackOffImpl) Reset

func (b *ZeroBackOffImpl) Reset()

Reset is a no-op for zero backoff.

Directories

Path Synopsis
examples
cliexample command
Package main provides a CLI example for using the ewf workflow engine.
Package main provides a CLI example for using the ewf workflow engine.
httpexample command
Package main provides an HTTP example for using the ewf workflow engine.
Package main provides an HTTP example for using the ewf workflow engine.
structexample command
Package main provides an example of storing and retrieving complex structs in workflow state.
Package main provides an example of storing and retrieving complex structs in workflow state.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL