Documentation
¶
Index ¶
- Variables
- func ParseHeaders(msg *kafka.Message) (map[string]string, error)
- func ParseMessage(msg []byte, operationType string) (*model.TupleEvent, error)
- func ParseMessageKey(msg []byte) (string, error)
- type CompletedConfig
- type Config
- type Consumer
- type InventoryConsumer
- func (i *InventoryConsumer) Consume() error
- func (i *InventoryConsumer) CreateTuple(ctx context.Context, tuples *[]model.RelationsTuple) (string, error)
- func (i *InventoryConsumer) DeleteTuple(ctx context.Context, tuples []model.RelationsTuple) (string, error)
- func (i *InventoryConsumer) Errs() <-chan error
- func (i *InventoryConsumer) ProcessMessage(headers map[string]string, relationsEnabled bool, msg *kafka.Message) (string, error)
- func (i *InventoryConsumer) RebalanceCallback(consumer *kafka.Consumer, event kafka.Event) error
- func (i *InventoryConsumer) Retry(operation func() (string, error), metricCounter metric.Int64Counter) (string, error)
- func (i *InventoryConsumer) Shutdown() error
- func (i *InventoryConsumer) UpdateConsistencyToken(resourceId, token string) error
- func (i *InventoryConsumer) UpdateConsistencyTokenIfPresent(resourceId, token string) error
- func (i *InventoryConsumer) UpdateTuple(ctx context.Context, tuplesToCreate *[]model.RelationsTuple, ...) (string, error)
- type KeyPayload
- type MessagePayload
- type Options
Constants ¶
This section is empty.
Variables ¶
var ErrClosed = errors.New("consumer closed")
var ErrMaxRetries = errors.New("max retries reached")
Functions ¶
func ParseMessage ¶
func ParseMessage(msg []byte, operationType string) (*model.TupleEvent, error)
func ParseMessageKey ¶
Types ¶
type CompletedConfig ¶
type CompletedConfig struct {
// contains filtered or unexported fields
}
type Config ¶
type Config struct {
*Options
KafkaConfig *kafka.ConfigMap
RetryConfig *retry.Config
AuthConfig *auth.Config
}
func (*Config) Complete ¶
func (c *Config) Complete() (CompletedConfig, []error)
type Consumer ¶
type Consumer interface {
CommitOffsets(offsets []kafka.TopicPartition) ([]kafka.TopicPartition, error)
SubscribeTopics(topics []string, rebalanceCb kafka.RebalanceCb) (err error)
Poll(timeoutMs int) (event kafka.Event)
IsClosed() bool
Close() error
AssignmentLost() bool
}
type InventoryConsumer ¶
type InventoryConsumer struct {
Consumer Consumer
OffsetStorage []kafka.TopicPartition
Config CompletedConfig
DB *gorm.DB
AuthzConfig authz.CompletedConfig
Authorizer api.Authorizer
Errors chan error
MetricsCollector *metricscollector.MetricsCollector
Logger *log.Helper
AuthOptions *auth.Options
RetryOptions *retry.Options
Notifier pubsub.Notifier
SchemaService *usecase_resources.SchemaUsecase
ResourceRepository data.ResourceRepository
// contains filtered or unexported fields
}
InventoryConsumer defines a Consumer with required clients and configs to call Relations API and update the Inventory DB with consistency tokens
func New ¶
func New(config CompletedConfig, db *gorm.DB, schemaRepository schema.Repository, authz authz.CompletedConfig, authorizer api.Authorizer, notifier pubsub.Notifier, logger *log.Helper, consumer Consumer) (InventoryConsumer, error)
New instantiates a new InventoryConsumer
func (*InventoryConsumer) Consume ¶
func (i *InventoryConsumer) Consume() error
Consume begins the consumption loop for the Consumer
func (*InventoryConsumer) CreateTuple ¶
func (i *InventoryConsumer) CreateTuple(ctx context.Context, tuples *[]model.RelationsTuple) (string, error)
CreateTuple calls the Relations API to create a tuple from the message payload received and returns the consistency token
func (*InventoryConsumer) DeleteTuple ¶
func (i *InventoryConsumer) DeleteTuple(ctx context.Context, tuples []model.RelationsTuple) (string, error)
DeleteTuple calls the Relations API to delete tuples from the RelationsTuple slice and returns the consistency token
func (*InventoryConsumer) Errs ¶
func (i *InventoryConsumer) Errs() <-chan error
Errs returns any errors put on the error channel to ensure proper shutdown of services
func (*InventoryConsumer) ProcessMessage ¶
func (*InventoryConsumer) RebalanceCallback ¶
RebalanceCallback logs when rebalance events occur and ensures any stored offsets are committed before losing the partition assignment. It is registered to the kafka 'SubscribeTopics' call and is invoked automatically whenever rebalances occurs. Note, the RebalanceCb function must satisfy the function type func(*Consumer, Event). This function does so, but the consumer embedded in the InventoryConsumer is used versus the passed one which is the same consumer in either case.
func (*InventoryConsumer) Retry ¶
func (i *InventoryConsumer) Retry(operation func() (string, error), metricCounter metric.Int64Counter) (string, error)
Retry executes the given function and will retry on failure with backoff until max retries is reached
func (*InventoryConsumer) Shutdown ¶
func (i *InventoryConsumer) Shutdown() error
Shutdown ensures the consumer is properly shutdown, whether by server or due to rebalance
func (*InventoryConsumer) UpdateConsistencyToken ¶
func (i *InventoryConsumer) UpdateConsistencyToken(resourceId, token string) error
UpdateConsistencyToken updates the resource in the inventory DB to add the consistency token
func (*InventoryConsumer) UpdateConsistencyTokenIfPresent ¶
func (i *InventoryConsumer) UpdateConsistencyTokenIfPresent(resourceId, token string) error
updateConsistencyTokenIfPresent updates the consistency token in the DB only if the token is non-empty. This prevents clearing existing tokens when operations don't generate new tokens (e.g., when there are no tuples to create or delete).
func (*InventoryConsumer) UpdateTuple ¶
func (i *InventoryConsumer) UpdateTuple(ctx context.Context, tuplesToCreate *[]model.RelationsTuple, tuplesToDelete *[]model.RelationsTuple) (string, error)
UpdateTuple calls the Relations API to create and delete tuples from the message payload received and returns the consistency token
type KeyPayload ¶
type KeyPayload struct {
MessageSchema map[string]interface{} `json:"schema"`
InventoryID string `json:"payload"`
}
KeyPayload stores the event message key captured from the topic as emitted by Debezium
type MessagePayload ¶
type MessagePayload struct {
MessageSchema map[string]interface{} `json:"schema"`
RelationsRequest interface{} `json:"payload"`
}
MessagePayload stores the event message value captured from the topic as emitted by Debezium
type Options ¶
type Options struct {
Enabled bool `mapstructure:"enabled"`
BootstrapServers []string `mapstructure:"bootstrap-servers"`
ConsumerGroupID string `mapstructure:"consumer-group-id"`
Topic string `mapstructure:"topic"`
CommitModulo int `mapstructure:"commit-modulo"`
SessionTimeout string `mapstructure:"session-timeout"`
HeartbeatInterval string `mapstructure:"heartbeat-interval"`
MaxPollInterval string `mapstructure:"max-poll-interval"`
EnableAutoCommit string `mapstructure:"enable-auto-commit"`
AutoOffsetReset string `mapstructure:"auto-offset-reset"`
StatisticsInterval string `mapstructure:"statistics-interval-ms"`
Debug string `mapstructure:"debug"`
RetryOptions *retry.Options `mapstructure:"retry-options"`
AuthOptions *auth.Options `mapstructure:"auth"`
ReadAfterWriteEnabled bool `mapstructure:"read-after-write-enabled"`
ReadAfterWriteAllowlist []string `mapstructure:"read-after-write-allowlist"`
}
func NewOptions ¶
func NewOptions() *Options