Documentation
¶
Overview ¶
Package controller implements K8s-style reconciliation loop controllers for self-healing background operations.
Controllers periodically reconcile the desired state of the system with its actual state. Each controller runs in its own goroutine and handles a specific aspect of the system: - AgentHealthController: Marks stale agents as offline, cleans up expired leases - JobRecoveryController: Recovers stuck jobs and re-queues them - QueuePriorityController: Recalculates queue priorities for fair scheduling - TokenCleanupController: Cleans up expired bootstrap tokens - AuditRetentionController: Manages audit log retention
Design principles: - Each controller is independent and can fail without affecting others - Controllers are idempotent - running multiple times has the same effect - Controllers use optimistic locking to handle concurrent modifications - All state changes are logged for debugging and monitoring
Index ¶
- type AgentHealthController
- type AgentHealthControllerConfig
- type AuditRetentionController
- type AuditRetentionControllerConfig
- type Controller
- type JobRecoveryController
- type JobRecoveryControllerConfig
- type Manager
- type ManagerConfig
- type Metrics
- type NoopMetrics
- func (m *NoopMetrics) IncrementReconcileErrors(controller string)
- func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
- func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
- type PrometheusMetrics
- func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
- func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
- func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
- func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
- type QueuePriorityController
- type QueuePriorityControllerConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentHealthController ¶
type AgentHealthController struct {
// contains filtered or unexported fields
}
AgentHealthController periodically checks agent health and marks stale agents as offline. This is a K8s-style controller that reconciles the desired state (agents with recent heartbeats are online, agents without recent heartbeats are offline) with the actual state.
func NewAgentHealthController ¶
func NewAgentHealthController( agentRepo agent.Repository, config *AgentHealthControllerConfig, ) *AgentHealthController
NewAgentHealthController creates a new AgentHealthController.
func (*AgentHealthController) Interval ¶
func (c *AgentHealthController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AgentHealthController) Name ¶
func (c *AgentHealthController) Name() string
Name returns the controller name.
type AgentHealthControllerConfig ¶
type AgentHealthControllerConfig struct {
// Interval is how often to run the health check.
// Default: 30 seconds.
Interval time.Duration
// StaleTimeout is how long since last heartbeat before marking an agent as offline.
// Default: 90 seconds (1.5x the typical heartbeat interval of 60s).
StaleTimeout time.Duration
// Logger for logging.
Logger *logger.Logger
}
AgentHealthControllerConfig configures the AgentHealthController.
type AuditRetentionController ¶
type AuditRetentionController struct {
// contains filtered or unexported fields
}
AuditRetentionController manages audit log retention. This is a compliance-critical controller that: 1. Deletes audit logs older than the retention period 2. Logs deletion activities for meta-audit purposes
The retention period should be configured based on compliance requirements: - GDPR: Typically 2-7 years depending on data type - SOC 2: At least 1 year - PCI DSS: At least 1 year - HIPAA: 6 years
IMPORTANT: Ensure proper backup before running this controller. Deleted audit logs cannot be recovered.
func NewAuditRetentionController ¶
func NewAuditRetentionController( auditRepo admin.AuditLogRepository, config *AuditRetentionControllerConfig, ) *AuditRetentionController
NewAuditRetentionController creates a new AuditRetentionController.
func (*AuditRetentionController) Interval ¶
func (c *AuditRetentionController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*AuditRetentionController) Name ¶
func (c *AuditRetentionController) Name() string
Name returns the controller name.
type AuditRetentionControllerConfig ¶
type AuditRetentionControllerConfig struct {
// Interval is how often to run the retention check.
// Default: 24 hours (once a day).
Interval time.Duration
// RetentionDays is how long to keep audit logs.
// Logs older than this will be deleted.
// Default: 365 days (1 year).
RetentionDays int
// BatchSize is the maximum number of logs to delete in one batch.
// This prevents long-running transactions.
// Default: 10000.
BatchSize int
// DryRun if true, only counts logs that would be deleted without actually deleting.
// Useful for testing retention policies.
// Default: false.
DryRun bool
// Logger for logging.
Logger *logger.Logger
}
AuditRetentionControllerConfig configures the AuditRetentionController.
type Controller ¶
type Controller interface {
// Name returns the unique name of this controller.
Name() string
// Interval returns how often this controller should run.
Interval() time.Duration
// Reconcile performs the reconciliation logic.
// It should be idempotent - running multiple times should have the same effect.
// Returns the number of items processed and any error encountered.
Reconcile(ctx context.Context) (int, error)
}
Controller defines the interface for a reconciliation loop controller. Controllers are responsible for maintaining a specific aspect of system state.
type JobRecoveryController ¶
type JobRecoveryController struct {
// contains filtered or unexported fields
}
JobRecoveryController recovers stuck jobs and re-queues them. This is a K8s-style controller that ensures jobs don't get lost if an agent goes offline or fails to complete them.
The controller performs three main tasks:
- Recover stuck jobs: Return jobs to the queue if they've been assigned but haven't progressed (agent went offline or crashed)
- Expire old jobs: Mark jobs as expired if they've been in queue too long
- Clean up: Mark orphaned jobs as failed if they exceed retry limit
func NewJobRecoveryController ¶
func NewJobRecoveryController( commandRepo command.Repository, config *JobRecoveryControllerConfig, ) *JobRecoveryController
NewJobRecoveryController creates a new JobRecoveryController.
func (*JobRecoveryController) Interval ¶
func (c *JobRecoveryController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*JobRecoveryController) Name ¶
func (c *JobRecoveryController) Name() string
Name returns the controller name.
type JobRecoveryControllerConfig ¶
type JobRecoveryControllerConfig struct {
// Interval is how often to run the job recovery check.
// Default: 60 seconds.
Interval time.Duration
// StuckThresholdMinutes is how long a job can be in acknowledged/running state
// without progress before being considered stuck.
// Default: 30 minutes.
StuckThresholdMinutes int
// TenantStuckThresholdMinutes is how long a tenant command can be assigned
// to an agent without being picked up before being reassigned.
// Default: 10 minutes (shorter than platform jobs as tenant agents poll more frequently).
TenantStuckThresholdMinutes int
// MaxRetries is the maximum number of retry attempts for a job.
// After this many retries, the job will be marked as failed.
// Default: 3.
MaxRetries int
// MaxQueueMinutes is how long a job can wait in the queue before expiring.
// Default: 60 minutes.
MaxQueueMinutes int
// Logger for logging.
Logger *logger.Logger
}
JobRecoveryControllerConfig configures the JobRecoveryController.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager manages multiple controllers, running them in parallel goroutines.
func NewManager ¶
func NewManager(cfg *ManagerConfig) *Manager
NewManager creates a new controller manager.
func (*Manager) ControllerCount ¶
ControllerCount returns the number of registered controllers.
func (*Manager) ControllerNames ¶
ControllerNames returns the names of all registered controllers.
func (*Manager) Register ¶
func (m *Manager) Register(c Controller)
Register adds a controller to the manager.
type ManagerConfig ¶
type ManagerConfig struct {
// Metrics collector (optional)
Metrics Metrics
// Logger (required)
Logger *logger.Logger
}
ManagerConfig configures the controller manager.
type Metrics ¶
type Metrics interface {
// RecordReconcile records a reconciliation run.
RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
// SetControllerRunning sets whether a controller is running.
SetControllerRunning(controller string, running bool)
// IncrementReconcileErrors increments the error counter.
IncrementReconcileErrors(controller string)
// SetLastReconcileTime sets the last reconcile timestamp.
SetLastReconcileTime(controller string, t time.Time)
}
Metrics defines the interface for controller metrics collection.
type NoopMetrics ¶
type NoopMetrics struct{}
NoopMetrics is a no-op implementation of Metrics for testing.
func (*NoopMetrics) IncrementReconcileErrors ¶
func (m *NoopMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors does nothing.
func (*NoopMetrics) RecordReconcile ¶
func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile does nothing.
func (*NoopMetrics) SetControllerRunning ¶
func (m *NoopMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning does nothing.
func (*NoopMetrics) SetLastReconcileTime ¶
func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime does nothing.
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements the Metrics interface using Prometheus.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(namespace string) *PrometheusMetrics
NewPrometheusMetrics creates a new PrometheusMetrics.
func (*PrometheusMetrics) IncrementReconcileErrors ¶
func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)
IncrementReconcileErrors increments the error counter.
func (*PrometheusMetrics) RecordReconcile ¶
func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)
RecordReconcile records a reconciliation run.
func (*PrometheusMetrics) SetControllerRunning ¶
func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)
SetControllerRunning sets whether a controller is running.
func (*PrometheusMetrics) SetLastReconcileTime ¶
func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)
SetLastReconcileTime sets the last reconcile timestamp.
type QueuePriorityController ¶
type QueuePriorityController struct {
// contains filtered or unexported fields
}
QueuePriorityController periodically recalculates queue priorities for platform jobs. This ensures fair scheduling across tenants by adjusting priorities based on: - Tenant's plan tier (higher tiers get base priority boost) - Job age (older jobs get priority bonus to prevent starvation) - Tenant's current queue depth (tenants with fewer jobs get slight boost)
The priority calculation is done in the database for efficiency: new_priority = plan_base_priority + (wait_time_minutes * age_bonus_per_minute)
This is a soft-priority system - higher priority jobs are processed first, but no tenant can completely starve others.
func NewQueuePriorityController ¶
func NewQueuePriorityController( commandRepo command.Repository, config *QueuePriorityControllerConfig, ) *QueuePriorityController
NewQueuePriorityController creates a new QueuePriorityController.
func (*QueuePriorityController) Interval ¶
func (c *QueuePriorityController) Interval() time.Duration
Interval returns the reconciliation interval.
func (*QueuePriorityController) Name ¶
func (c *QueuePriorityController) Name() string
Name returns the controller name.
type QueuePriorityControllerConfig ¶
type QueuePriorityControllerConfig struct {
// Interval is how often to recalculate queue priorities.
// Default: 60 seconds.
Interval time.Duration
// Logger for logging.
Logger *logger.Logger
}
QueuePriorityControllerConfig configures the QueuePriorityController.