controller

package
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package controller implements K8s-style reconciliation loop controllers for self-healing background operations.

Controllers periodically reconcile the desired state of the system with its actual state. Each controller runs in its own goroutine and handles a specific aspect of the system: - AgentHealthController: Marks stale agents as offline, cleans up expired leases - JobRecoveryController: Recovers stuck jobs and re-queues them - QueuePriorityController: Recalculates queue priorities for fair scheduling - TokenCleanupController: Cleans up expired bootstrap tokens - AuditRetentionController: Manages audit log retention

Design principles: - Each controller is independent and can fail without affecting others - Controllers are idempotent - running multiple times has the same effect - Controllers use optimistic locking to handle concurrent modifications - All state changes are logged for debugging and monitoring

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AgentHealthController

type AgentHealthController struct {
	// contains filtered or unexported fields
}

AgentHealthController periodically checks agent health and marks stale agents as offline. This is a K8s-style controller that reconciles the desired state (agents with recent heartbeats are online, agents without recent heartbeats are offline) with the actual state.

func NewAgentHealthController

func NewAgentHealthController(
	agentRepo agent.Repository,
	config *AgentHealthControllerConfig,
) *AgentHealthController

NewAgentHealthController creates a new AgentHealthController.

func (*AgentHealthController) Interval

func (c *AgentHealthController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*AgentHealthController) Name

func (c *AgentHealthController) Name() string

Name returns the controller name.

func (*AgentHealthController) Reconcile

func (c *AgentHealthController) Reconcile(ctx context.Context) (int, error)

Reconcile checks agent health and marks stale agents as offline. Uses the MarkStaleAgentsOffline method which also updates last_offline_at timestamp.

type AgentHealthControllerConfig

type AgentHealthControllerConfig struct {
	// Interval is how often to run the health check.
	// Default: 30 seconds.
	Interval time.Duration

	// StaleTimeout is how long since last heartbeat before marking an agent as offline.
	// Default: 90 seconds (1.5x the typical heartbeat interval of 60s).
	StaleTimeout time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

AgentHealthControllerConfig configures the AgentHealthController.

type AuditRetentionController

type AuditRetentionController struct {
	// contains filtered or unexported fields
}

AuditRetentionController manages audit log retention. This is a compliance-critical controller that: 1. Deletes audit logs older than the retention period 2. Logs deletion activities for meta-audit purposes

The retention period should be configured based on compliance requirements: - GDPR: Typically 2-7 years depending on data type - SOC 2: At least 1 year - PCI DSS: At least 1 year - HIPAA: 6 years

IMPORTANT: Ensure proper backup before running this controller. Deleted audit logs cannot be recovered.

func NewAuditRetentionController

func NewAuditRetentionController(
	auditRepo admin.AuditLogRepository,
	config *AuditRetentionControllerConfig,
) *AuditRetentionController

NewAuditRetentionController creates a new AuditRetentionController.

func (*AuditRetentionController) Interval

func (c *AuditRetentionController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*AuditRetentionController) Name

func (c *AuditRetentionController) Name() string

Name returns the controller name.

func (*AuditRetentionController) Reconcile

func (c *AuditRetentionController) Reconcile(ctx context.Context) (int, error)

Reconcile deletes audit logs older than the retention period.

type AuditRetentionControllerConfig

type AuditRetentionControllerConfig struct {
	// Interval is how often to run the retention check.
	// Default: 24 hours (once a day).
	Interval time.Duration

	// RetentionDays is how long to keep audit logs.
	// Logs older than this will be deleted.
	// Default: 365 days (1 year).
	RetentionDays int

	// BatchSize is the maximum number of logs to delete in one batch.
	// This prevents long-running transactions.
	// Default: 10000.
	BatchSize int

	// DryRun if true, only counts logs that would be deleted without actually deleting.
	// Useful for testing retention policies.
	// Default: false.
	DryRun bool

	// Logger for logging.
	Logger *logger.Logger
}

AuditRetentionControllerConfig configures the AuditRetentionController.

type Controller

type Controller interface {
	// Name returns the unique name of this controller.
	Name() string

	// Interval returns how often this controller should run.
	Interval() time.Duration

	// Reconcile performs the reconciliation logic.
	// It should be idempotent - running multiple times should have the same effect.
	// Returns the number of items processed and any error encountered.
	Reconcile(ctx context.Context) (int, error)
}

Controller defines the interface for a reconciliation loop controller. Controllers are responsible for maintaining a specific aspect of system state.

type JobRecoveryController

type JobRecoveryController struct {
	// contains filtered or unexported fields
}

JobRecoveryController recovers stuck jobs and re-queues them. This is a K8s-style controller that ensures jobs don't get lost if an agent goes offline or fails to complete them.

The controller performs three main tasks:

  1. Recover stuck jobs: Return jobs to the queue if they've been assigned but haven't progressed (agent went offline or crashed)
  2. Expire old jobs: Mark jobs as expired if they've been in queue too long
  3. Clean up: Mark orphaned jobs as failed if they exceed retry limit

func NewJobRecoveryController

func NewJobRecoveryController(
	commandRepo command.Repository,
	config *JobRecoveryControllerConfig,
) *JobRecoveryController

NewJobRecoveryController creates a new JobRecoveryController.

func (*JobRecoveryController) Interval

func (c *JobRecoveryController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*JobRecoveryController) Name

func (c *JobRecoveryController) Name() string

Name returns the controller name.

func (*JobRecoveryController) Reconcile

func (c *JobRecoveryController) Reconcile(ctx context.Context) (int, error)

Reconcile recovers stuck jobs and expires old ones.

type JobRecoveryControllerConfig

type JobRecoveryControllerConfig struct {
	// Interval is how often to run the job recovery check.
	// Default: 60 seconds.
	Interval time.Duration

	// StuckThresholdMinutes is how long a job can be in acknowledged/running state
	// without progress before being considered stuck.
	// Default: 30 minutes.
	StuckThresholdMinutes int

	// TenantStuckThresholdMinutes is how long a tenant command can be assigned
	// to an agent without being picked up before being reassigned.
	// Default: 10 minutes (shorter than platform jobs as tenant agents poll more frequently).
	TenantStuckThresholdMinutes int

	// MaxRetries is the maximum number of retry attempts for a job.
	// After this many retries, the job will be marked as failed.
	// Default: 3.
	MaxRetries int

	// MaxQueueMinutes is how long a job can wait in the queue before expiring.
	// Default: 60 minutes.
	MaxQueueMinutes int

	// Logger for logging.
	Logger *logger.Logger
}

JobRecoveryControllerConfig configures the JobRecoveryController.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager manages multiple controllers, running them in parallel goroutines.

func NewManager

func NewManager(cfg *ManagerConfig) *Manager

NewManager creates a new controller manager.

func (*Manager) ControllerCount

func (m *Manager) ControllerCount() int

ControllerCount returns the number of registered controllers.

func (*Manager) ControllerNames

func (m *Manager) ControllerNames() []string

ControllerNames returns the names of all registered controllers.

func (*Manager) IsRunning

func (m *Manager) IsRunning() bool

IsRunning checks if the manager is running.

func (*Manager) Register

func (m *Manager) Register(c Controller)

Register adds a controller to the manager.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start starts all registered controllers.

func (*Manager) Stop

func (m *Manager) Stop() error

Stop stops all controllers gracefully.

type ManagerConfig

type ManagerConfig struct {
	// Metrics collector (optional)
	Metrics Metrics

	// Logger (required)
	Logger *logger.Logger
}

ManagerConfig configures the controller manager.

type Metrics

type Metrics interface {
	// RecordReconcile records a reconciliation run.
	RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

	// SetControllerRunning sets whether a controller is running.
	SetControllerRunning(controller string, running bool)

	// IncrementReconcileErrors increments the error counter.
	IncrementReconcileErrors(controller string)

	// SetLastReconcileTime sets the last reconcile timestamp.
	SetLastReconcileTime(controller string, t time.Time)
}

Metrics defines the interface for controller metrics collection.

type NoopMetrics

type NoopMetrics struct{}

NoopMetrics is a no-op implementation of Metrics for testing.

func (*NoopMetrics) IncrementReconcileErrors

func (m *NoopMetrics) IncrementReconcileErrors(controller string)

IncrementReconcileErrors does nothing.

func (*NoopMetrics) RecordReconcile

func (m *NoopMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

RecordReconcile does nothing.

func (*NoopMetrics) SetControllerRunning

func (m *NoopMetrics) SetControllerRunning(controller string, running bool)

SetControllerRunning does nothing.

func (*NoopMetrics) SetLastReconcileTime

func (m *NoopMetrics) SetLastReconcileTime(controller string, t time.Time)

SetLastReconcileTime does nothing.

type PrometheusMetrics

type PrometheusMetrics struct {
	// contains filtered or unexported fields
}

PrometheusMetrics implements the Metrics interface using Prometheus.

func NewPrometheusMetrics

func NewPrometheusMetrics(namespace string) *PrometheusMetrics

NewPrometheusMetrics creates a new PrometheusMetrics.

func (*PrometheusMetrics) IncrementReconcileErrors

func (m *PrometheusMetrics) IncrementReconcileErrors(controller string)

IncrementReconcileErrors increments the error counter.

func (*PrometheusMetrics) RecordReconcile

func (m *PrometheusMetrics) RecordReconcile(controller string, itemsProcessed int, duration time.Duration, err error)

RecordReconcile records a reconciliation run.

func (*PrometheusMetrics) SetControllerRunning

func (m *PrometheusMetrics) SetControllerRunning(controller string, running bool)

SetControllerRunning sets whether a controller is running.

func (*PrometheusMetrics) SetLastReconcileTime

func (m *PrometheusMetrics) SetLastReconcileTime(controller string, t time.Time)

SetLastReconcileTime sets the last reconcile timestamp.

type QueuePriorityController

type QueuePriorityController struct {
	// contains filtered or unexported fields
}

QueuePriorityController periodically recalculates queue priorities for platform jobs. This ensures fair scheduling across tenants by adjusting priorities based on: - Tenant's plan tier (higher tiers get base priority boost) - Job age (older jobs get priority bonus to prevent starvation) - Tenant's current queue depth (tenants with fewer jobs get slight boost)

The priority calculation is done in the database for efficiency: new_priority = plan_base_priority + (wait_time_minutes * age_bonus_per_minute)

This is a soft-priority system - higher priority jobs are processed first, but no tenant can completely starve others.

func NewQueuePriorityController

func NewQueuePriorityController(
	commandRepo command.Repository,
	config *QueuePriorityControllerConfig,
) *QueuePriorityController

NewQueuePriorityController creates a new QueuePriorityController.

func (*QueuePriorityController) Interval

func (c *QueuePriorityController) Interval() time.Duration

Interval returns the reconciliation interval.

func (*QueuePriorityController) Name

func (c *QueuePriorityController) Name() string

Name returns the controller name.

func (*QueuePriorityController) Reconcile

func (c *QueuePriorityController) Reconcile(ctx context.Context) (int, error)

Reconcile recalculates queue priorities for all pending platform jobs.

type QueuePriorityControllerConfig

type QueuePriorityControllerConfig struct {
	// Interval is how often to recalculate queue priorities.
	// Default: 60 seconds.
	Interval time.Duration

	// Logger for logging.
	Logger *logger.Logger
}

QueuePriorityControllerConfig configures the QueuePriorityController.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL