consumer

package
v0.0.0-...-d55e4f4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("consumer closed")
View Source
var ErrMaxRetries = errors.New("max retries reached")

Functions

func ParseHeaders

func ParseHeaders(msg *kafka.Message) (map[string]string, error)

func ParseMessage

func ParseMessage(msg []byte, operationType string) (*model.TupleEvent, error)

func ParseMessageKey

func ParseMessageKey(msg []byte) (string, error)

Types

type CompletedConfig

type CompletedConfig struct {
	// contains filtered or unexported fields
}

type Config

type Config struct {
	*Options
	KafkaConfig *kafka.ConfigMap

	RetryConfig *retry.Config
	AuthConfig  *auth.Config
}

func NewConfig

func NewConfig(o *Options) *Config

func (*Config) Complete

func (c *Config) Complete() (CompletedConfig, []error)

type Consumer

type Consumer interface {
	CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
	SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
	Poll(timeoutMs int) (event kafka.Event)
	IsClosed() bool
	Close() error
	AssignmentLost() bool
}

type InventoryConsumer

type InventoryConsumer struct {
	Consumer         Consumer
	OffsetStorage    []kafka.TopicPartition
	Config           CompletedConfig
	DB               *gorm.DB
	AuthzConfig      authz.CompletedConfig
	Authorizer       api.Authorizer
	Errors           chan error
	MetricsCollector *metricscollector.MetricsCollector
	Logger           *log.Helper
	AuthOptions      *auth.Options
	RetryOptions     *retry.Options
	Notifier         pubsub.Notifier
	SchemaService    *usecase_resources.SchemaUsecase

	ResourceRepository data.ResourceRepository
	// contains filtered or unexported fields
}

InventoryConsumer defines a Consumer with required clients and configs to call Relations API and update the Inventory DB with consistency tokens

func New

func New(config CompletedConfig, db *gorm.DB, schemaRepository schema.Repository, authz authz.CompletedConfig, authorizer api.Authorizer, notifier pubsub.Notifier, logger *log.Helper, consumer Consumer) (InventoryConsumer, error)

New instantiates a new InventoryConsumer

func (*InventoryConsumer) Consume

func (i *InventoryConsumer) Consume() error

Consume begins the consumption loop for the Consumer

func (*InventoryConsumer) CreateTuple

func (i *InventoryConsumer) CreateTuple(ctx context.Context, tuples *[]model.RelationsTuple) (string, error)

CreateTuple calls the Relations API to create a tuple from the message payload received and returns the consistency token

func (*InventoryConsumer) DeleteTuple

func (i *InventoryConsumer) DeleteTuple(ctx context.Context, tuples []model.RelationsTuple) (string, error)

DeleteTuple calls the Relations API to delete tuples from the RelationsTuple slice and returns the consistency token

func (*InventoryConsumer) Errs

func (i *InventoryConsumer) Errs() <-chan error

Errs returns any errors put on the error channel to ensure proper shutdown of services

func (*InventoryConsumer) ProcessMessage

func (i *InventoryConsumer) ProcessMessage(headers map[string]string, relationsEnabled bool, msg *kafka.Message) (string, error)

func (*InventoryConsumer) RebalanceCallback

func (i *InventoryConsumer) RebalanceCallback(consumer *kafka.Consumer, event kafka.Event) error

RebalanceCallback logs when rebalance events occur and ensures any stored offsets are committed before losing the partition assignment. It is registered to the kafka 'SubscribeTopics' call and is invoked automatically whenever rebalances occurs. Note, the RebalanceCb function must satisfy the function type func(*Consumer, Event). This function does so, but the consumer embedded in the InventoryConsumer is used versus the passed one which is the same consumer in either case.

func (*InventoryConsumer) Retry

func (i *InventoryConsumer) Retry(operation func() (string, error), metricCounter metric.Int64Counter) (string, error)

Retry executes the given function and will retry on failure with backoff until max retries is reached

func (*InventoryConsumer) Shutdown

func (i *InventoryConsumer) Shutdown() error

Shutdown ensures the consumer is properly shutdown, whether by server or due to rebalance

func (*InventoryConsumer) UpdateConsistencyToken

func (i *InventoryConsumer) UpdateConsistencyToken(resourceId, token string) error

UpdateConsistencyToken updates the resource in the inventory DB to add the consistency token

func (*InventoryConsumer) UpdateConsistencyTokenIfPresent

func (i *InventoryConsumer) UpdateConsistencyTokenIfPresent(resourceId, token string) error

updateConsistencyTokenIfPresent updates the consistency token in the DB only if the token is non-empty. This prevents clearing existing tokens when operations don't generate new tokens (e.g., when there are no tuples to create or delete).

func (*InventoryConsumer) UpdateTuple

func (i *InventoryConsumer) UpdateTuple(ctx context.Context, tuplesToCreate *[]model.RelationsTuple, tuplesToDelete *[]model.RelationsTuple) (string, error)

UpdateTuple calls the Relations API to create and delete tuples from the message payload received and returns the consistency token

type KeyPayload

type KeyPayload struct {
	MessageSchema map[string]interface{} `json:"schema"`
	InventoryID   string                 `json:"payload"`
}

KeyPayload stores the event message key captured from the topic as emitted by Debezium

type MessagePayload

type MessagePayload struct {
	MessageSchema    map[string]interface{} `json:"schema"`
	RelationsRequest interface{}            `json:"payload"`
}

MessagePayload stores the event message value captured from the topic as emitted by Debezium

type Options

type Options struct {
	Enabled                 bool           `mapstructure:"enabled"`
	BootstrapServers        []string       `mapstructure:"bootstrap-servers"`
	ConsumerGroupID         string         `mapstructure:"consumer-group-id"`
	Topic                   string         `mapstructure:"topic"`
	CommitModulo            int            `mapstructure:"commit-modulo"`
	SessionTimeout          string         `mapstructure:"session-timeout"`
	HeartbeatInterval       string         `mapstructure:"heartbeat-interval"`
	MaxPollInterval         string         `mapstructure:"max-poll-interval"`
	EnableAutoCommit        string         `mapstructure:"enable-auto-commit"`
	AutoOffsetReset         string         `mapstructure:"auto-offset-reset"`
	StatisticsInterval      string         `mapstructure:"statistics-interval-ms"`
	Debug                   string         `mapstructure:"debug"`
	RetryOptions            *retry.Options `mapstructure:"retry-options"`
	AuthOptions             *auth.Options  `mapstructure:"auth"`
	ReadAfterWriteEnabled   bool           `mapstructure:"read-after-write-enabled"`
	ReadAfterWriteAllowlist []string       `mapstructure:"read-after-write-allowlist"`
}

func NewOptions

func NewOptions() *Options

func (*Options) AddFlags

func (o *Options) AddFlags(fs *pflag.FlagSet, prefix string)

func (*Options) Complete

func (o *Options) Complete() []error

func (*Options) Validate

func (o *Options) Validate() []error

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL