Documentation
¶
Index ¶
- Variables
- func SetDefaultLogConfig(config LogConfig)
- func SetDefaultLogger(l Logger)
- func StartProcessor[In, Out any](ctx context.Context, in <-chan In, proc Processor[In, Out], ...) <-chan Out
- type BackoffFunc
- type CancelFunc
- type FanIn
- type FanInConfig
- type Generator
- type LogConfig
- type LogLevel
- type Logger
- type Metadata
- type MetadataProvider
- type Metrics
- type MetricsCollector
- type MiddlewareFunc
- type Option
- func WithBuffer[In, Out any](buffer int) Option[In, Out]
- func WithCancel[In, Out any](cancel func(In, error)) Option[In, Out]
- func WithCleanup[In, Out any](cleanup func(ctx context.Context), timeout time.Duration) Option[In, Out]
- func WithConcurrency[In, Out any](concurrency int) Option[In, Out]
- func WithLogConfig[In, Out any](logConfig LogConfig) Option[In, Out]
- func WithMetadataProvider[In, Out any](provider MetadataProvider[In]) Option[In, Out]
- func WithMetricsCollector[In, Out any](collector MetricsCollector) Option[In, Out]
- func WithMiddleware[In, Out any](middleware MiddlewareFunc[In, Out]) Option[In, Out]
- func WithRecover[In, Out any]() Option[In, Out]
- func WithRetryConfig[In, Out any](retryConfig RetryConfig) Option[In, Out]
- func WithTimeout[In, Out any](timeout time.Duration) Option[In, Out]
- func WithoutContextPropagation[In, Out any]() Option[In, Out]
- type Pipe
- func ApplyPipe[In, Inter, Out any](a Pipe[In, Inter], b Pipe[Inter, Out]) Pipe[In, Out]
- func NewBatchPipe[In any, Out any](handle func(context.Context, []In) ([]Out, error), maxSize int, ...) Pipe[In, Out]
- func NewFilterPipe[In any](handle func(context.Context, In) (bool, error), opts ...Option[In, In]) Pipe[In, In]
- func NewProcessPipe[In, Out any](handle func(context.Context, In) ([]Out, error), opts ...Option[In, Out]) Pipe[In, Out]
- func NewSinkPipe[In any](handle func(context.Context, In) error, opts ...Option[In, struct{}]) Pipe[In, struct{}]
- func NewTransformPipe[In, Out any](handle func(context.Context, In) (Out, error), opts ...Option[In, Out]) Pipe[In, Out]
- type ProcessFunc
- type Processor
- type RecoveryError
- type RetryConfig
- type RetryState
- type ShouldRetryFunc
Constants ¶
This section is empty.
Variables ¶
var ( // ErrFailure indicates a processing failure. ErrFailure = errors.New("gopipe: processing failed") // ErrCancel indicates that processing was canceled. ErrCancel = errors.New("gopipe: processing canceled") )
var ( // ErrRetry is the base error for retry operations ErrRetry = errors.New("gopipe retry") // ErrRetryMaxAttempts is returned when all retry attempts fail ErrRetryMaxAttempts = fmt.Errorf("%w: max attempts reached", ErrRetry) // ErrRetryTimeout is returned when the overall retry operation times out ErrRetryTimeout = fmt.Errorf("%w: timeout reached", ErrRetry) // ErrRetryNotRetryable is returned when an error is not retryable ErrRetryNotRetryable = fmt.Errorf("%w: not retryable", ErrRetry) )
Functions ¶
func SetDefaultLogConfig ¶ added in v0.7.0
func SetDefaultLogConfig(config LogConfig)
SetDefaultLogConfig sets the default logger configuration for all pipes. May be overridden per-pipe using WithLoggerConfig.
func SetDefaultLogger ¶ added in v0.7.0
func SetDefaultLogger(l Logger)
SetDefaultLogger sets the default logger for all pipes. slog.Default() is used by default.
func StartProcessor ¶ added in v0.6.0
func StartProcessor[In, Out any]( ctx context.Context, in <-chan In, proc Processor[In, Out], opts ...Option[In, Out], ) <-chan Out
StartProcessor processes items from the input channel using the provided processor and returns a channel that will receive the processed outputs.
Processing will continue until the input channel is closed or the context is canceled. The output channel is closed when processing is complete. Behavior can be customized with options.
Types ¶
type BackoffFunc ¶ added in v0.8.0
BackoffFunc returns the wait duration for a retry attempt. The attempt parameter is one-based (1 for first retry, 2 for second, etc.).
func ConstantBackoff ¶ added in v0.8.0
func ConstantBackoff( delay time.Duration, jitter float64, ) BackoffFunc
ConstantBackoff creates a backoff function that returns a constant duration with optional jitter. The delay parameter specifies the base wait time for all retry attempts. The jitter parameter controls randomization: 0.0 = no jitter, 0.2 = ±20% variation. Jitter helps prevent thundering herd problems in distributed systems.
func ExponentialBackoff ¶ added in v0.8.0
func ExponentialBackoff( initialDelay time.Duration, factor float64, maxDelay time.Duration, jitter float64, ) BackoffFunc
ExponentialBackoff creates a backoff function with exponential backoff and jitter. Each retry attempt uses baseDelay * factor^(attempt-1) with random jitter applied. The factor parameter controls growth rate (e.g., 2.0 doubles delay each attempt). The maxDelay parameter caps the maximum backoff duration (0 = no limit). The jitter parameter controls randomization: 0.0 = no jitter, 0.2 = ±20% variation. Jitter is applied after the exponential calculation and max delay capping.
type CancelFunc ¶ added in v0.3.0
CancelFunc is the function used by Processor.Cancel.
type FanIn ¶ added in v0.9.0
type FanIn[T any] struct { // contains filtered or unexported fields }
FanIn merges multiple input channels into a single output channel. It safely handles concurrent Add() calls and provides graceful shutdown.
func NewFanIn ¶ added in v0.9.0
func NewFanIn[T any](config FanInConfig) *FanIn[T]
NewFanIn creates a new FanIn instance. Add input channels with Add(), then call Start() exactly once.
func (*FanIn[T]) Add ¶ added in v0.9.0
Add registers an input channel to be merged into the output. Safe to call concurrently. Returns a done channel that closes when all messages from the input channel have been processed, and an error if FanIn is already closed.
type FanInConfig ¶ added in v0.9.0
type FanInConfig struct {
// Buffer size for the output channel
Buffer int
// ShutdownTimeout is the max time to wait for input channels to drain.
// If 0, waits indefinitely for clean shutdown.
ShutdownTimeout time.Duration
}
FanInConfig configures FanIn behavior.
type Generator ¶ added in v0.8.0
type Generator[Out any] interface { // Generate returns a channel that emits generated values until context cancellation or error. Generate(ctx context.Context) <-chan Out }
Generator produces a stream of values using a provided function.
func NewGenerator ¶ added in v0.8.0
func NewGenerator[Out any]( handle func(context.Context) ([]Out, error), opts ...Option[struct{}, Out], ) Generator[Out]
NewGenerator creates a Generator that produces values using the provided handle function. The handle function is called repeatedly until context cancellation.
type LogConfig ¶ added in v0.7.0
type LogConfig struct {
// Args are additional arguments to include in all log messages.
Args []any
// LevelSuccess is the log level used for successful processing.
// Defaults to LogLevelDebug.
LevelSuccess LogLevel
// LevelCancel is the log level used when processing is canceled.
// Defaults to LogLevelWarn.
LevelCancel LogLevel
// LevelFailure is the log level used when processing fails.
// Defaults to LogLevelError.
LevelFailure LogLevel
// LevelRetry is the log level used when a retry is attempted.
// Defaults to LogLevelWarn.
LevelRetry LogLevel
// MessageSuccess is the message logged on successful processing.
// Defaults to "GOPIPE: Success".
MessageSuccess string
// MessageCancel is the message logged when processing is canceled.
// Defaults to "GOPIPE: Cancel".
MessageCancel string
// MessageFailure is the message logged when processing fails.
// Defaults to "GOPIPE: Failure".
MessageFailure string
// MessageRetry is the message logged when a retry is attempted.
// Defaults to "GOPIPE: Retry".
MessageRetry string
// Disabled disables all logging when set to true.
Disabled bool
}
LogConfig holds configuration for the logger middleware. All fields can be customized individually. Defaults from the global defaultLoggerConfig are used for any fields not set.
type LogLevel ¶ added in v0.7.0
type LogLevel string
LogLevel represents the severity level for logging messages.
const ( // LogLevelDebug is used for detailed information. LogLevelDebug LogLevel = "debug" // LogLevelInfo is used for general information messages. LogLevelInfo LogLevel = "info" // LogLevelWarn is used for warning conditions. LogLevelWarn LogLevel = "warn" // LogLevelError is used for error conditions. LogLevelError LogLevel = "error" )
type Logger ¶ added in v0.7.0
type Logger interface {
// Debug logs a message at debug level.
Debug(msg string, args ...any)
// Info logs a message at info level.
Info(msg string, args ...any)
// Warn logs a message at warning level.
Warn(msg string, args ...any)
// Error logs a message at error level.
Error(msg string, args ...any)
}
Logger defines an interface for logging at different severity levels.
type Metadata ¶ added in v0.7.0
Metadata is a key-value store for additional information about pipeline items.
func MetadataFromContext ¶ added in v0.7.0
MetadataFromContext extracts metadata from a context. Returns nil if no metadata is present.
func MetadataFromError ¶ added in v0.7.0
MetadataFromError extracts metadata from an error. Returns nil if no metadata is present.
type MetadataProvider ¶ added in v0.7.0
MetadataProvider is a function that provides Metadata for a processing context. It may extract information from the input value.
type Metrics ¶ added in v0.7.0
type Metrics struct {
Start time.Time
Duration time.Duration
Input int
Output int
InFlight int
Metadata Metadata
RetryState *RetryState
Error error
}
Metrics holds processing metrics for a single input.
func (*Metrics) Cancel ¶ added in v0.7.0
Cancel returns a numeric indicator of cancellation (1 for cancel, 0 otherwise).
func (*Metrics) Failure ¶ added in v0.7.0
Failure returns a numeric indicator of failure (1 for failure, 0 otherwise).
type MetricsCollector ¶ added in v0.7.0
type MetricsCollector func(metrics *Metrics)
MetricsCollector defines a function that collects single input metrics.
type MiddlewareFunc ¶ added in v0.6.0
MiddlewareFunc wraps a Processor to add additional behavior to processing and cancellation.
type Option ¶
type Option[In, Out any] func(*config[In, Out])
Option configures behavior of a Pipe.
func WithBuffer ¶
WithBuffer sets output channel buffer size.
func WithCancel ¶ added in v0.6.0
WithCancel provides an additional cancel function to the processor.
func WithCleanup ¶ added in v0.9.0
func WithCleanup[In, Out any](cleanup func(ctx context.Context), timeout time.Duration) Option[In, Out]
WithCleanup adds a cleanup function to be called when processing is complete. If timeout is greater than zero, context will be canceled after the timeout duration.
func WithConcurrency ¶
WithConcurrency sets worker count for concurrent processing.
func WithLogConfig ¶ added in v0.7.0
WithLogConfig overrides the default logger configuration for the pipe.
func WithMetadataProvider ¶ added in v0.8.0
func WithMetadataProvider[In, Out any](provider MetadataProvider[In]) Option[In, Out]
WithMetadataProvider adds a metadata provider to enrich context with metadata for each input. Can be used multiple times to add multiple providers. Metadata is available via MetadataFromContext or MetadataFromError. Metadata is used in logging and metrics collection.
func WithMetricsCollector ¶ added in v0.8.0
func WithMetricsCollector[In, Out any](collector MetricsCollector) Option[In, Out]
WithMetricsCollector adds a metrics collector to the processing pipeline. Can be used multiple times to add multiple collectors.
func WithMiddleware ¶ added in v0.6.0
func WithMiddleware[In, Out any](middleware MiddlewareFunc[In, Out]) Option[In, Out]
WithMiddleware adds middleware to the processing pipeline. Can be used multiple times. Middleware is applied in reverse order: for middlewares A, B, C, the execution flow is A→B→C→process.
func WithRecover ¶ added in v0.7.0
WithRecover enables panic recovery in process functions. When enabled, any panic that occurs during processing is caught and converted into a RecoveryError. The stack trace is captured and included in the RecoveryError. The stack trace is also printed to stderr in the CancelFunc.
func WithRetryConfig ¶ added in v0.8.0
func WithRetryConfig[In, Out any](retryConfig RetryConfig) Option[In, Out]
WithRetryConfig adds retry middleware to the processing pipeline. Failed operations are retried based on ShouldRetry logic, with Backoff between attempts, until MaxAttempts is reached or Timeout expires. Nil fields use default values.
func WithTimeout ¶ added in v0.3.0
WithTimeout sets maximum duration for each process operation. If the processing exceeds the timeout, the context will be cancelled.
func WithoutContextPropagation ¶ added in v0.3.0
WithoutContextPropagation disables passing parent context to process functions. Each process call will receive a background context instead.
type Pipe ¶ added in v0.6.0
type Pipe[Pre, Out any] interface { // Start begins processing items from the input channel and returns a channel for outputs. // Processing continues until the input channel is closed or the context is canceled. Start(ctx context.Context, pre <-chan Pre) <-chan Out }
Pipe represents a complete processing pipeline that transforms input values to output values. It combines preprocessing with a Processor and optional configuration.
func ApplyPipe ¶ added in v0.10.0
ApplyPipe combines two Pipes into one, connecting the output of the first to the input of the second. The resulting Pipe takes inputs of type In and produces outputs of type Out.
func NewBatchPipe ¶ added in v0.6.0
func NewBatchPipe[In any, Out any]( handle func(context.Context, []In) ([]Out, error), maxSize int, maxDuration time.Duration, opts ...Option[[]In, Out], ) Pipe[In, Out]
NewBatchPipe creates a Pipe that groups inputs into batches before processing. Each batch is processed as a whole by the handle function, which can return multiple outputs. Batches are created when either maxSize items are collected or maxDuration elapses since the first item.
func NewFilterPipe ¶ added in v0.6.0
func NewFilterPipe[In any]( handle func(context.Context, In) (bool, error), opts ...Option[In, In], ) Pipe[In, In]
NewFilterPipe creates a Pipe that selectively passes through inputs based on a predicate function. If the handle function returns true, the input is passed through; if false, the input is discarded. If the handle function returns an error, processing for that item stops and the error is handled.
func NewProcessPipe ¶ added in v0.6.0
func NewProcessPipe[In, Out any]( handle func(context.Context, In) ([]Out, error), opts ...Option[In, Out], ) Pipe[In, Out]
NewProcessPipe creates a Pipe that can transform each input into multiple outputs. Unlike NewTransformPipe, this can produce zero, one, or many outputs for each input. The handle function receives a context and input item, and returns a slice of outputs or an error.
func NewSinkPipe ¶ added in v0.7.0
func NewSinkPipe[In any]( handle func(context.Context, In) error, opts ...Option[In, struct{}], ) Pipe[In, struct{}]
NewSinkPipe creates a Pipe that applies handle to each value from in. The returned channel is closed after in is closed and all values are processed.
func NewTransformPipe ¶ added in v0.6.0
func NewTransformPipe[In, Out any]( handle func(context.Context, In) (Out, error), opts ...Option[In, Out], ) Pipe[In, Out]
NewTransformPipe creates a Pipe that transforms each input into exactly one output. Unlike NewProcessPipe, this always produces exactly one output for each successful input. The handle function receives a context and input item, and returns a single output or an error.
type ProcessFunc ¶ added in v0.3.0
ProcessFunc is the function used by Processor.Process.
type Processor ¶ added in v0.5.0
type Processor[In, Out any] interface { // Process processes a single input item with context awareness. // It transforms the input into zero or more output items, or returns an error. Process(context.Context, In) ([]Out, error) // Cancel handles errors when processing fails. Cancel(In, error) }
Processor combines processing and cancellation logic into a single abstraction. This abstraction allows controlling and manipulating the flow of data and errors.
func NewProcessor ¶ added in v0.5.0
func NewProcessor[In, Out any]( process ProcessFunc[In, Out], cancel CancelFunc[In], ) Processor[In, Out]
NewProcessor creates a new Processor with the provided process and cancel functions.
Panics if process is nil. If cancel is nil, a no-op function is used.
type RecoveryError ¶ added in v0.7.0
type RecoveryError struct {
// PanicValue is the original value that was passed to panic().
PanicValue any
// StackTrace contains the full stack trace at the point of panic.
StackTrace string
}
RecoveryError wraps a panic value with the stack trace. This allows panics to be converted to regular errors and handled gracefully.
func (*RecoveryError) Error ¶ added in v0.7.0
func (e *RecoveryError) Error() string
type RetryConfig ¶ added in v0.8.0
type RetryConfig struct {
// ShouldRetry determines which errors trigger retry attempts.
// If nil, defaults to retrying all errors.
ShouldRetry ShouldRetryFunc
// Backoff produces the wait duration between retry attempts.
// If nil, defaults to 1 second constant backoff with jitter ±20%.
Backoff BackoffFunc
// MaxAttempts limits the total number of processing attempts, including the initial attempt.
// Default is 3 attempts. Negative values allow unlimited retries.
MaxAttempts int
// Timeout sets the overall time limit for all processing attempts combined.
// Zero or negative value means no timeout. Default is 1 minute.
Timeout time.Duration
}
RetryConfig configures retry behavior for failed operations.
type RetryState ¶ added in v0.8.0
type RetryState struct {
// Timeout is the configured overall timeout for all attempts.
Timeout time.Duration
// MaxAttempts is the configured maximum number of attempts.
MaxAttempts int
// Start is the time when the first attempt started. Duration and
// Attempts are measured relative to this timestamp.
Start time.Time
// Attempts is the total number of processing attempts made. It is
// 1-based and includes the initial (first) attempt. For example,
// Attempts==1 means the initial attempt has been made and no retries
// occurred yet.
Attempts int
// Duration is the total elapsed time since Start. This includes the
// time spent in each attempt as well as any backoff/wait time between
// attempts.
Duration time.Duration
// Causes is a list of all errors encountered during attempts.
Causes []error
// Err is the error that caused the retry process to abort (final error).
Err error
}
RetryState tracks the progress and history of retry attempts.
func RetryStateFromContext ¶ added in v0.8.0
func RetryStateFromContext(ctx context.Context) *RetryState
RetryStateFromContext extracts the RetryState from a context. Returns nil if no RetryState is present.
func RetryStateFromError ¶ added in v0.8.0
func RetryStateFromError(err error) *RetryState
RetryStateFromError extracts the RetryState from an error. Returns nil if no RetryState is present.
type ShouldRetryFunc ¶ added in v0.8.0
ShouldRetryFunc determines whether an error should trigger a retry attempt.
func ShouldNotRetry ¶ added in v0.8.0
func ShouldNotRetry(errs ...error) ShouldRetryFunc
ShouldNotRetry creates a function that skips retries on specific errors. If no errors are specified, no errors trigger retries. If errors are specified, matching errors (using errors.Is) skip retries.
func ShouldRetry ¶ added in v0.8.0
func ShouldRetry(errs ...error) ShouldRetryFunc
ShouldRetry creates a function that retries on specific errors. If no errors are specified, all errors trigger retries. If errors are specified, only matching errors (using errors.Is) trigger retries.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
examples
|
|
|
advanced-channel-operations
command
|
|
|
basic-channel-operations
command
|
|
|
batch-pipe
command
|
|
|
fanin
command
|
|
|
generator
command
|
|
|
message-ack
command
|
|
|
transform-pipe
command
|
|
|
internal
|
|
|
cloudevents
Package cloudevents provides CloudEvents v1.0.2 serialization and deserialization for gopipe messages.
|
Package cloudevents provides CloudEvents v1.0.2 serialization and deserialization for gopipe messages. |
|
cqrs
Package cqrs provides Command Query Responsibility Segregation patterns for gopipe.
|
Package cqrs provides Command Query Responsibility Segregation patterns for gopipe. |
|
Package middleware provides reusable middleware for message processing pipelines.
|
Package middleware provides reusable middleware for message processing pipelines. |
|
pipe
module
|