bsp

package
v0.0.0-...-e97be17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 21, 2022 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

implemeted the BSP https://en.wikipedia.org/wiki/Bulk_synchronous_parallel computing model to aid in processing graph data

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrUnknownEdgeSource         = xerrors.New("source vertex is not part of the graph")
	ErrDestinationIsLocal        = xerrors.New("message destination is assigned to the local graph")
	ErrInvalidMessageDestination = xerrors.New("invalid message destination")
)

Functions

This section is empty.

Types

type Aggregator

type Aggregator interface {
	Type() string
	Set(val any)
	Get() any
	// updates the Aggregator value based on the current value.
	Aggregate(val any)

	// Delta returns the change in the aggregator's value since the last
	// call to Delta. The delta values can be used in distributed
	// aggregator use-cases to reduce local, partially-aggregated values
	// into a single value across by feeding them into the Aggregate method
	// of a top-level aggregator.
	//
	// For example, in a distributed counter scenario, each node maintains
	// its own local counter instance. At the end of each step, the master
	// node calls delta on each local counter and aggregates the values
	// to obtain the correct total which is then pushed back to the workers.
	Delta() any
}

type ComputeFunc

type ComputeFunc[VT, ET any] func(g *Graph[VT, ET], v *Vertex[VT, ET], msgIt message.Iterator) error

ComputeFunc is a function that a graph instance invokes on each vertex when executing a superstep.

type Edge

type Edge[ET any] struct {
	// contains filtered or unexported fields
}

func (*Edge[ET]) DstID

func (e *Edge[ET]) DstID() string

func (*Edge[ET]) SetValue

func (e *Edge[ET]) SetValue(val ET)

func (*Edge[ET]) Value

func (e *Edge[ET]) Value() ET

type Executor

type Executor[VT, ET any] struct {
	// contains filtered or unexported fields
}

Executor wraps a Graph instance and provides an orchestration layer for executing supersteps until an error occurs or an exit condition is met. Users can provide an optional set of hooks to be executed before and after each super-step.

func NewExecutor

func NewExecutor[VT, ET any](g *Graph[VT, ET], cb ExecutorHooks[VT, ET]) *Executor[VT, ET]

NewExecutor returns an Executor instance for graph g that invokes the provided list of hooks inside each execution loop.

func (*Executor[VT, ET]) Graph

func (ex *Executor[VT, ET]) Graph() *Graph[VT, ET]

Graph returns the graph instance associated with this executor.

func (*Executor[VT, ET]) RunSteps

func (ex *Executor[VT, ET]) RunSteps(ctx context.Context, numSteps int) error

RunSteps executes at most numStep supersteps unless the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.

func (*Executor[VT, ET]) RunToCompletion

func (ex *Executor[VT, ET]) RunToCompletion(ctx context.Context) error

RunToCompletion keeps executing supersteps until the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.

func (*Executor[VT, ET]) Superstep

func (ex *Executor[VT, ET]) Superstep() int

Superstep returns the current graph superstep.

type ExecutorFactory

type ExecutorFactory[VT, ET any] func(*Graph[VT, ET], ExecutorHooks[VT, ET]) *Executor[VT, ET]

ExecutorFactory is a function that creates new Executor instances.

type ExecutorHooks

type ExecutorHooks[VT, ET any] struct {
	// PreStep, if defined, is invoked before running the next superstep.
	// This is a good place to initialize variables, aggregators etc. that
	// will be used for the next superstep.
	PreStep func(ctx context.Context, g *Graph[VT, ET]) error

	// PostStep, if defined, is invoked after running a superstep.
	PostStep func(ctx context.Context, g *Graph[VT, ET], activeInStep int) error

	// PostStepKeepRunning, if defined, is invoked after running a superstep
	// to decide whether the stop condition for terminating the run has
	// been met. The number of the active vertices in the last step is
	// passed as the second argument.
	PostStepKeepRunning func(ctx context.Context, g *Graph[VT, ET], activeInStep int) (bool, error)
}

ExecutorHooks encapsulates a series of hooks that are invoked by an Executor instance on a graph. All hooks are optional and will be ignored if not specified.

type Graph

type Graph[VT, ET any] struct {
	// contains filtered or unexported fields
}

Graph implements a parallel graph processor based on the concepts described in the Pregel paper https://15799.courses.cs.cmu.edu/fall2013/static/papers/p135-malewicz.pdf .

func NewGraph

func NewGraph[VT, ET any](cfg GraphConfig[VT, ET]) (*Graph[VT, ET], error)

NewGraph creates a new Graph instance using the specified configuration. It is important for callers to invoke Close() on the returned graph instance when they are done using it.

func (*Graph[VT, ET]) AddEdge

func (g *Graph[VT, ET]) AddEdge(srcID, dstID string, initValue ET) error

AddEdge inserts a directed edge from src to destination and annotates it with the specified initValue. By design, edges are owned by the source and therefore srcID must resolve to a local vertex. Otherwise, AddEdge returns an error.

func (*Graph[VT, ET]) AddVertex

func (g *Graph[VT, ET]) AddVertex(id string, initValue VT)

AddVertex inserts a new vertex with the specified id and initial value into the graph. If the vertex already exists, AddVertex will just overwrite its value with the provided initValue.

func (*Graph[VT, ET]) Aggregator

func (g *Graph[VT, ET]) Aggregator(name string) Aggregator

func (*Graph[VT, ET]) Aggregators

func (g *Graph[VT, ET]) Aggregators() map[string]Aggregator

func (*Graph[VT, ET]) BroadcastToNeighbors

func (g *Graph[VT, ET]) BroadcastToNeighbors(v *Vertex[VT, ET], msg message.Message) error

BroadcastToNeighbors broadcasts the provided message to the neighboring vertecies (local or remote). Neighbors will recive the message in the next super-step

func (*Graph[VT, ET]) Close

func (g *Graph[VT, ET]) Close() error

Close releases any resources associated with the graph.

func (*Graph[VT, ET]) RegisterAggregator

func (g *Graph[VT, ET]) RegisterAggregator(name string, aggregator Aggregator)

func (*Graph[VT, ET]) RegisterRelayer

func (g *Graph[VT, ET]) RegisterRelayer(relayer Relayer)

RegisterRelayer configures a Relayer that the graph will invoke when attempting to deliver a message to a vertex that is not known locally but could potentially be owned by a remote graph instance.

func (*Graph[VT, ET]) Reset

func (g *Graph[VT, ET]) Reset() error

Reset the state of the graph by removing any existing vertices or aggregators and resetting the superstep counter.

func (*Graph[VT, ET]) SendMessage

func (g *Graph[VT, ET]) SendMessage(dst string, msg message.Message) error

SendMessage attempts to deliver a message to the vertex with the specified destination ID. Messages are queued for delivery and will be processed by receipients in the next superstep.

If the destination ID is not known by this graph, it might still be a valid ID for a vertex that is owned by a remote graph instance. If the client has provided a Relayer when configuring the graph, SendMessage will delegate message delivery to it.

On the other hand, if no Relayer is defined or the configured RemoteMessageSender returns a ErrDestinationIsLocal error, SendMessage will first check whether an UnknownVertexHandler has been provided at configuration time and invoke it. Otherwise, an ErrInvalidMessageDestination is returned to the caller.

func (*Graph[VT, ET]) Superstep

func (g *Graph[VT, ET]) Superstep() int

func (*Graph[VT, ET]) Vertices

func (g *Graph[VT, ET]) Vertices() map[string]*Vertex[VT, ET]

type GraphConfig

type GraphConfig[VT, ET any] struct {
	// QueueFactory is used by the graph to create message queue instances
	// for each vertex that is added to the graph. If not specified, the
	// default in-memory queue will be used instead.
	QueueFactory message.QueueFactory

	// ComputeFn is the compute function that will be invoked for each graph
	// vertex when executing a superstep. A valid ComputeFunc instance is
	// required for the config to be valid.
	ComputeFn ComputeFunc[VT, ET]

	// ComputeWorkers specifies the number of workers to use for invoking
	// the registered ComputeFunc when executing each superstep. If not
	// specified, a single worker will be used.
	ComputeWorkers int
}

GraphConfig encapsulates the configuration options for creating graphs.

type Relayer

type Relayer interface {
	// Relay a message to a vertex that's not known locally. returns ErrDestinationIsLocal
	// if the provided destination is not valid remote vertex.
	Relay(dst string, msg message.Message) error
}

Relayer implemeted by types that relay messages to vertecies that are managed by remote graph instance.

type RelayerFunc

type RelayerFunc func(dst string, msg message.Message) error

RelayerFunc function adapter to the Relayer interface.

func (RelayerFunc) Relay

func (r RelayerFunc) Relay(dst string, msg message.Message) error

type Vertex

type Vertex[VT any, ET any] struct {
	// contains filtered or unexported fields
}

func (*Vertex[VT, ET]) Edges

func (v *Vertex[VT, ET]) Edges() []*Edge[ET]

func (*Vertex[VT, ET]) Freeze

func (v *Vertex[VT, ET]) Freeze()

Freeze marks the vertex as inactive. Inactive vertices will not be processed in the following supersteps unless they receive a message in which case they will be re-activated.

func (*Vertex[VT, ET]) ID

func (v *Vertex[VT, ET]) ID() string

func (*Vertex[VT, ET]) SetValue

func (v *Vertex[VT, ET]) SetValue(val VT)

func (*Vertex[VT, ET]) Value

func (v *Vertex[VT, ET]) Value() VT

Directories

Path Synopsis
Message queue
Message queue

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL