Documentation
¶
Overview ¶
Package ewf implements an Extensible Workflow Framework.
This file contains the custom backoff implementation that supports: - Constant backoff: fixed delay between retries - Exponential backoff: delay increases exponentially with each retry - Exponential backoff with jitter: adds randomness to prevent thundering herd - Zero backoff: no delay between retries - Stop backoff: signals that no more retries should be attempted
All backoff implementations are serializable to JSON, allowing workflows with retry policies to be persisted and resumed.
Package ewf provides a workflow engine for defining and executing workflows in Go.
Index ¶
- Constants
- Variables
- func IsPermanent(err error) bool
- func MarshalBackOff(b BackOff) ([]byte, error)
- func Retry(operation func() error, b BackOff) error
- func RetryNotify(operation func() error, b BackOff, notify func(error, time.Duration)) error
- type AfterStepHook
- type AfterWorkflowHook
- type BackOff
- func ConstantBackoff(delay time.Duration) BackOff
- func ExponentialBackoff(initialInterval, maxInterval time.Duration, multiplier float64) BackOff
- func ExponentialBackoffWithJitter(initialInterval, maxInterval time.Duration, multiplier float64, jitter float64) BackOff
- func NewConstantBackOff(interval time.Duration) BackOff
- func NewExponentialBackOff(initialInterval, maxInterval time.Duration, multiplier float64) BackOff
- func NewExponentialBackOffWithJitter(initialInterval, maxInterval time.Duration, multiplier float64, jitter float64) BackOff
- func NewStopBackOff() BackOff
- func NewZeroBackOff() BackOff
- func UnmarshalBackOff(data []byte) (BackOff, error)
- func ZeroBackoff() BackOff
- type BackOffContext
- type BackOffData
- type BackOffType
- type BeforeStepHook
- type BeforeWorkflowHook
- type Clock
- type ConstantBackOffImpl
- type Engine
- func (e *Engine) Close(ctx context.Context) error
- func (e *Engine) CloseQueue(ctx context.Context, queueName string) error
- func (e *Engine) CreateQueue(ctx context.Context, queueName string, workerDef WorkersDefinition, ...) error
- func (e *Engine) NewWorkflow(name string, opts ...WorkflowOption) (Workflow, error)
- func (e *Engine) Register(name string, activity StepFn)
- func (e *Engine) RegisterTemplate(name string, def *WorkflowTemplate)
- func (e *Engine) ResumeWorkflows()
- func (e *Engine) Run(ctx context.Context, w Workflow, opts ...RunOption) error
- func (e *Engine) Store() Store
- type EngineOption
- type ExponentialBackOffImpl
- type Permanent
- type Queue
- type QueueEngine
- type QueueMetadata
- type QueueOptions
- type RetryPolicy
- type RunOption
- type SQLiteStore
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) DeleteQueueMetadata(ctx context.Context, name string) error
- func (s *SQLiteStore) DeleteWorkflow(ctx context.Context, uuid string) error
- func (s *SQLiteStore) ListWorkflowUUIDsByStatus(ctx context.Context, status WorkflowStatus) ([]string, error)
- func (s *SQLiteStore) LoadAllQueueMetadata(ctx context.Context) ([]*QueueMetadata, error)
- func (s *SQLiteStore) LoadAllWorkflowTemplates(ctx context.Context) (map[string]*WorkflowTemplate, error)
- func (s *SQLiteStore) LoadWorkflowByName(ctx context.Context, name string) (Workflow, error)
- func (s *SQLiteStore) LoadWorkflowByUUID(ctx context.Context, uuid string) (Workflow, error)
- func (s *SQLiteStore) LoadWorkflowTemplate(ctx context.Context, name string) (*WorkflowTemplate, error)
- func (s *SQLiteStore) SaveQueueMetadata(ctx context.Context, settings *QueueMetadata) error
- func (s *SQLiteStore) SaveWorkflow(ctx context.Context, workflow Workflow) error
- func (s *SQLiteStore) SaveWorkflowTemplate(ctx context.Context, name string, tmpl *WorkflowTemplate) error
- func (s *SQLiteStore) Setup() error
- type State
- type Step
- type StepFn
- type StopBackOffImpl
- type Store
- type SystemClock
- type WorkersDefinition
- type Workflow
- type WorkflowOption
- type WorkflowStatus
- type WorkflowTemplate
- type ZeroBackOffImpl
Constants ¶
const (
// StepNameContextKey is used to store the current step name in the context
StepNameContextKey contextKey = "stepName"
)
Context keys used by the workflow engine
const Stop time.Duration = -1
Stop is a special duration value that indicates that no more retries should be made.
Variables ¶
var ErrFailWorkflowNow = errors.New("fail workflow now")
ErrFailWorkflowNow is a special error that can be returned by a step to indicate that the workflow should be failed immediately.
var ErrQueueAlreadyExists error = errors.New("queue already exists")
ErrQueueAlreadyExists indicates that the queue already exists in queue engine
var ErrQueueNotFound error = errors.New("queue doesn't exist")
ErrQueueNotFound indicates that the queue is deleted or never created
var ErrWorkflowNotFound = errors.New("workflow not found")
ErrWorkflowNotFound is returned when a workflow is not found in the database.
Functions ¶
func IsPermanent ¶
IsPermanent returns true if the given error is a Permanent error.
func MarshalBackOff ¶
MarshalBackOff serializes a BackOff implementation to JSON
func Retry ¶
Retry retries the operation function using the specified BackOff policy. If the operation returns nil, Retry returns nil. If the operation returns an error, Retry retries as specified by the BackOff policy. If the BackOff policy indicates no more retries, the most recent error is returned.
Types ¶
type AfterStepHook ¶
AfterStepHook is a function run after a step finishes.
type AfterWorkflowHook ¶
AfterWorkflowHook is a function run after a workflow finishes.
type BackOff ¶
type BackOff interface {
// NextBackOff returns the duration to wait before retrying the operation.
// If the implementation returns Stop, the operation should not be retried.
NextBackOff() time.Duration
// Reset resets the backoff to its initial state.
// This is typically called before starting a new sequence of backoff operations.
Reset()
}
BackOff interface defines the operations required for backoff implementations. This is the core interface that all backoff strategies must implement. It provides methods to calculate the next backoff duration and reset the backoff state.
func ConstantBackoff ¶
ConstantBackoff returns a BackOff that always waits for the specified delay.
func ExponentialBackoff ¶
ExponentialBackoff returns a BackOff with exponential delays. initialInterval: first delay; maxInterval: cap for delay; multiplier: growth factor (e.g. 2.0).
func ExponentialBackoffWithJitter ¶
func ExponentialBackoffWithJitter(initialInterval, maxInterval time.Duration, multiplier float64, jitter float64) BackOff
ExponentialBackoffWithJitter returns a BackOff with exponential delays and jitter. initialInterval: first delay; maxInterval: cap for delay; multiplier: growth factor (e.g. 2.0). jitter: randomization factor between 0 and 1 that determines the amount of randomness.
func NewConstantBackOff ¶
NewConstantBackOff creates a backoff policy that always returns the same backoff delay.
func NewExponentialBackOff ¶
NewExponentialBackOff creates a new exponential backoff policy. The formula used is: currentInterval = initialInterval * multiplier^(n-1) with jitter.
func NewExponentialBackOffWithJitter ¶
func NewExponentialBackOffWithJitter(initialInterval, maxInterval time.Duration, multiplier float64, jitter float64) BackOff
NewExponentialBackOffWithJitter creates a new exponential backoff policy with jitter.
func NewStopBackOff ¶
func NewStopBackOff() BackOff
NewStopBackOff creates a backoff policy that always returns Stop.
func NewZeroBackOff ¶
func NewZeroBackOff() BackOff
NewZeroBackOff creates a backoff policy that always returns 0 (no waiting).
func UnmarshalBackOff ¶
UnmarshalBackOff deserializes a JSON representation into a BackOff implementation
func ZeroBackoff ¶
func ZeroBackoff() BackOff
ZeroBackoff returns a BackOff that always returns zero delay (for immediate retries).
type BackOffContext ¶
BackOffContext wraps a BackOff with a context to support cancellation.
func WithContext ¶
func WithContext(b BackOff, ctx context.Context) BackOffContext
WithContext returns a BackOffContext with the given context and backoff.
type BackOffData ¶
type BackOffData struct {
Type BackOffType `json:"type"`
Interval time.Duration `json:"interval,omitempty"` // For ConstantBackOffImpl
InitialInterval time.Duration `json:"initial_interval,omitempty"` // For ExponentialBackOffImpl
CurrentInterval time.Duration `json:"current_interval,omitempty"` // For ExponentialBackOffImpl
MaxInterval time.Duration `json:"max_interval,omitempty"` // For ExponentialBackOffImpl
Multiplier float64 `json:"multiplier,omitempty"` // For ExponentialBackOffImpl
RandomizationFactor float64 `json:"randomization_factor,omitempty"` // For ExponentialBackOffImpl with jitter
MaxElapsedTime time.Duration `json:"max_elapsed_time,omitempty"` // For ExponentialBackOffImpl
}
BackOffData is used for serializing/deserializing BackOff implementations
type BackOffType ¶
type BackOffType string
BackOffType identifies the type of backoff implementation
const ( ConstantBackOffType BackOffType = "constant" ExponentialBackOffType BackOffType = "exponential" ZeroBackOffType BackOffType = "zero" StopBackOffType BackOffType = "stop" )
type BeforeStepHook ¶
BeforeStepHook is a function run before a step starts.
type BeforeWorkflowHook ¶
BeforeWorkflowHook is a function run before a workflow starts.
type ConstantBackOffImpl ¶
ConstantBackOffImpl implements a constant backoff strategy.
func (*ConstantBackOffImpl) NextBackOff ¶
func (b *ConstantBackOffImpl) NextBackOff() time.Duration
NextBackOff returns the constant interval.
func (*ConstantBackOffImpl) Reset ¶
func (b *ConstantBackOffImpl) Reset()
Reset is a no-op for constant backoff.
type Engine ¶
type Engine struct {
// contains filtered or unexported fields
}
Engine is the central component for managing and executing workflows. It holds a registry of all available activities and a store for persistence. Engine is the central component for managing and executing workflows.
func NewEngine ¶
func NewEngine(opts ...EngineOption) (*Engine, error)
NewEngine creates a new workflow engine.
func (*Engine) CloseQueue ¶
CloseQueue closes a queue by its name
func (*Engine) CreateQueue ¶
func (e *Engine) CreateQueue(ctx context.Context, queueName string, workerDef WorkersDefinition, queueOptions QueueOptions) error
CreateQueue creates a new queue and starts workers for it
func (*Engine) NewWorkflow ¶
func (e *Engine) NewWorkflow(name string, opts ...WorkflowOption) (Workflow, error)
NewWorkflow creates a new workflow instance from a registered definition.
func (*Engine) Register ¶
Register registers an activity function with the engine. This allows the activity to be used in workflow steps by its name. Register registers an activity function with the engine.
func (*Engine) RegisterTemplate ¶
func (e *Engine) RegisterTemplate(name string, def *WorkflowTemplate)
NewWorkflow creates a new workflow instance with the given name and steps. The workflow is associated with the engine's store. RegisterTemplate registers a workflow template with the engine.
func (*Engine) ResumeWorkflows ¶
func (e *Engine) ResumeWorkflows()
ResumeWorkflows resumes all workflows in the background. It resumes running workflows and pending workflows.
func (*Engine) Run ¶
Run executes a workflow. By default, it runs synchronously and blocks until completion. Use WithAsync() option to run asynchronously in a goroutine. If the workflow has a queue name set, it will be enqueued instead of executed directly. Accepts a copy (value) to prevent sharing issues.
func (*Engine) Store ¶
Store returns the store associated with the engine. This allows external access to the workflow store for querying workflow status and information. Users can use this method to retrieve workflow details by UUID, check workflow execution status, or list workflows with specific statuses. For example, after starting a workflow and receiving its UUID, clients can use engine.Store().LoadWorkflowByUUID(ctx, uuid) to get the workflow's current state. Store returns the store associated with the engine.
type EngineOption ¶
type EngineOption func(*Engine)
EngineOption defines options for creating a new Engine.
func WithQueueEngine ¶
func WithQueueEngine(queueEngine QueueEngine) EngineOption
func WithStore ¶
func WithStore(store Store) EngineOption
type ExponentialBackOffImpl ¶
type ExponentialBackOffImpl struct {
InitialInterval time.Duration `json:"initial_interval"`
CurrentInterval time.Duration `json:"-"` // Not serialized
Multiplier float64 `json:"multiplier"`
MaxInterval time.Duration `json:"max_interval"`
RandomizationFactor float64 `json:"randomization_factor"` // For jitter
MaxElapsedTime time.Duration `json:"max_elapsed_time"`
Clock Clock `json:"-"` // Not serialized
StartTime time.Time `json:"-"` // Not serialized
// contains filtered or unexported fields
}
ExponentialBackOffImpl implements an exponential backoff strategy.
func (*ExponentialBackOffImpl) NextBackOff ¶
func (b *ExponentialBackOffImpl) NextBackOff() time.Duration
NextBackOff calculates the next backoff interval using the exponential formula.
func (*ExponentialBackOffImpl) Reset ¶
func (b *ExponentialBackOffImpl) Reset()
Reset resets the backoff to its initial state.
type Permanent ¶
type Permanent struct {
Err error
}
Permanent wraps an error to indicate that it should not be retried.
func PermanentError ¶
PermanentError wraps an error to indicate that it should not be retried.
type Queue ¶
type Queue interface {
Name() string
Enqueue(ctx context.Context, workflow Workflow) error
Dequeue(ctx context.Context) (Workflow, error)
Close(ctx context.Context) error
CloseCh() <-chan struct{}
ActivityCh() <-chan struct{}
Length(ctx context.Context) (int64, error)
}
Queue represents a workflow queue.
type QueueEngine ¶
type QueueEngine interface {
CreateQueue(ctx context.Context, queueName string, queueOptions QueueOptions) (Queue, error)
GetQueue(ctx context.Context, queueName string) (Queue, error)
CloseQueue(ctx context.Context, queueName string) error
Close(ctx context.Context) error
}
QueueEngine defines the interface for a queue engine that manages multiple queues
type QueueMetadata ¶
type QueueMetadata struct {
Name string `json:"name"`
WorkersDef WorkersDefinition `json:"worker_def"`
QueueOptions QueueOptions `json:"queue_options"`
}
QueueMetadata defines the queue data that needs to persist
type QueueOptions ¶
type QueueOptions struct {
AutoDelete bool `json:"auto_delete"` // Delete when no longer in use
DeleteAfter time.Duration `json:"delete_after"` // Ignored if AutoDelete is false
PopTimeout time.Duration `json:"pop_timeout"` // timeout for dequeue operations
}
QueueOptions defines options for the queue
type RetryPolicy ¶
RetryPolicy defines the retry behavior for a step.
func (*RetryPolicy) MarshalJSON ¶
func (rp *RetryPolicy) MarshalJSON() ([]byte, error)
MarshalJSON implements custom JSON marshaling for RetryPolicy
func (*RetryPolicy) UnmarshalJSON ¶
func (rp *RetryPolicy) UnmarshalJSON(data []byte) error
UnmarshalJSON implements custom JSON unmarshaling for RetryPolicy
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
SQLiteStore implements the Store interface using SQLite for persistence.
func NewSQLiteStore ¶
func NewSQLiteStore(dsn string) (*SQLiteStore, error)
NewSQLiteStore creates a new SQLiteStore with the given DSN.
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
Close closes the SQLite database connection.
func (*SQLiteStore) DeleteQueueMetadata ¶
func (s *SQLiteStore) DeleteQueueMetadata(ctx context.Context, name string) error
DeleteQueueMetadata removes a queue by name from the SQLite store.
func (*SQLiteStore) DeleteWorkflow ¶
func (s *SQLiteStore) DeleteWorkflow(ctx context.Context, uuid string) error
func (*SQLiteStore) ListWorkflowUUIDsByStatus ¶
func (s *SQLiteStore) ListWorkflowUUIDsByStatus(ctx context.Context, status WorkflowStatus) ([]string, error)
func (*SQLiteStore) LoadAllQueueMetadata ¶
func (s *SQLiteStore) LoadAllQueueMetadata(ctx context.Context) ([]*QueueMetadata, error)
LoadAllQueues loads all queues from the SQLite database.
func (*SQLiteStore) LoadAllWorkflowTemplates ¶
func (s *SQLiteStore) LoadAllWorkflowTemplates(ctx context.Context) (map[string]*WorkflowTemplate, error)
LoadAllWorkflowTemplates loads all workflow templates from the SQLite database.
func (*SQLiteStore) LoadWorkflowByName ¶
func (*SQLiteStore) LoadWorkflowByUUID ¶
func (*SQLiteStore) LoadWorkflowTemplate ¶
func (s *SQLiteStore) LoadWorkflowTemplate(ctx context.Context, name string) (*WorkflowTemplate, error)
LoadWorkflowTemplate loads a workflow template by name from the SQLite database.
func (*SQLiteStore) SaveQueueMetadata ¶
func (s *SQLiteStore) SaveQueueMetadata(ctx context.Context, settings *QueueMetadata) error
SaveQueueMetadata saves the QueueMetadata into sqlite store
func (*SQLiteStore) SaveWorkflow ¶
func (s *SQLiteStore) SaveWorkflow(ctx context.Context, workflow Workflow) error
SaveWorkflow saves the given workflow to the SQLite database. Receives a copy to prevent sharing issues.
func (*SQLiteStore) SaveWorkflowTemplate ¶
func (s *SQLiteStore) SaveWorkflowTemplate(ctx context.Context, name string, tmpl *WorkflowTemplate) error
SaveWorkflowTemplate saves a workflow template to the SQLite database.
func (*SQLiteStore) Setup ¶
func (s *SQLiteStore) Setup() error
Setup prepares the SQLite database for use.
type Step ¶
type Step struct {
Name string
RetryPolicy *RetryPolicy
Timeout time.Duration // Maximum execution time for the step (including retries)
}
Step represents a single step in a workflow.
type StopBackOffImpl ¶
type StopBackOffImpl struct{}
StopBackOffImpl implements a backoff that always returns Stop.
func (*StopBackOffImpl) NextBackOff ¶
func (b *StopBackOffImpl) NextBackOff() time.Duration
NextBackOff always returns Stop.
func (*StopBackOffImpl) Reset ¶
func (b *StopBackOffImpl) Reset()
Reset is a no-op for stop backoff.
type Store ¶
type Store interface {
Setup() error // could be a no-op, no problem.
SaveWorkflow(ctx context.Context, workflow Workflow) error
DeleteWorkflow(ctx context.Context, uuid string) error
LoadWorkflowByName(ctx context.Context, name string) (Workflow, error)
LoadWorkflowByUUID(ctx context.Context, uuid string) (Workflow, error)
ListWorkflowUUIDsByStatus(ctx context.Context, status WorkflowStatus) ([]string, error)
SaveWorkflowTemplate(ctx context.Context, name string, tmpl *WorkflowTemplate) error
LoadWorkflowTemplate(ctx context.Context, name string) (*WorkflowTemplate, error)
LoadAllWorkflowTemplates(ctx context.Context) (map[string]*WorkflowTemplate, error)
SaveQueueMetadata(ctx context.Context, meta *QueueMetadata) error
DeleteQueueMetadata(ctx context.Context, name string) error
LoadAllQueueMetadata(ctx context.Context) ([]*QueueMetadata, error)
Close() error // could be a no-op, no problem.
}
Store defines the interface for workflow persistence.
type SystemClock ¶
type SystemClock struct{}
SystemClock implements the Clock interface using the system clock.
type WorkersDefinition ¶
type WorkersDefinition struct {
Count int `json:"worker_count"` // number of workers
PollInterval time.Duration `json:"poll_interval"` // interval between polling the queue for new workflows
WorkTimeout time.Duration `json:"work_timeout"` // timeout for worker to process work
}
WorkersDefinition defines the worker pool for processing workflows in the queue
type Workflow ¶
type Workflow struct {
UUID string `json:"uuid"`
Name string `json:"name"`
Status WorkflowStatus `json:"status"`
State State `json:"state"`
CurrentStep int `json:"current_step"`
CreatedAt time.Time `json:"created_at"`
Steps []Step `json:"steps"`
QueueName string `json:"queue_name"`
DisplayName string `json:"display_name"`
Metadata map[string]string `json:"metadata"`
// contains filtered or unexported fields
}
Workflow represents a workflow instance.
func NewWorkflow ¶
func NewWorkflow(name string, opts ...WorkflowOption) Workflow
NewWorkflow creates a new workflow instance with the given name and options.
type WorkflowOption ¶
type WorkflowOption func(w *Workflow)
WorkflowOption defines options for creating a new Workflow.
func WithDisplayName ¶
func WithDisplayName(displayName string) WorkflowOption
WithDisplayName specifies the display name of the workflow.
func WithMetadata ¶
func WithMetadata(metadata map[string]string) WorkflowOption
WithMetadata specifies the metadata of the workflow.
func WithQueue ¶
func WithQueue(queueName string) WorkflowOption
WithQueue specifies the queue name to enqueue the workflow into.
type WorkflowStatus ¶
type WorkflowStatus string
WorkflowStatus represents the status of a workflow.
const ( // StatusPending indicates the workflow is pending and has not started. StatusPending WorkflowStatus = "pending" // StatusRunning indicates the workflow is currently running. StatusRunning WorkflowStatus = "running" // StatusCompleted indicates the workflow has completed successfully. StatusCompleted WorkflowStatus = "completed" // StatusFailed indicates the workflow has failed. StatusFailed WorkflowStatus = "failed" )
type WorkflowTemplate ¶
type WorkflowTemplate struct {
Steps []Step
BeforeWorkflowHooks []BeforeWorkflowHook
AfterWorkflowHooks []AfterWorkflowHook
BeforeStepHooks []BeforeStepHook
AfterStepHooks []AfterStepHook
}
WorkflowTemplate defines the structure and hooks for a workflow definition.
type ZeroBackOffImpl ¶
type ZeroBackOffImpl struct{}
ZeroBackOffImpl implements a backoff that always returns zero.
func (*ZeroBackOffImpl) NextBackOff ¶
func (b *ZeroBackOffImpl) NextBackOff() time.Duration
NextBackOff always returns zero.
func (*ZeroBackOffImpl) Reset ¶
func (b *ZeroBackOffImpl) Reset()
Reset is a no-op for zero backoff.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
cliexample
command
Package main provides a CLI example for using the ewf workflow engine.
|
Package main provides a CLI example for using the ewf workflow engine. |
|
httpexample
command
Package main provides an HTTP example for using the ewf workflow engine.
|
Package main provides an HTTP example for using the ewf workflow engine. |
|
structexample
command
Package main provides an example of storing and retrieving complex structs in workflow state.
|
Package main provides an example of storing and retrieving complex structs in workflow state. |