Documentation
¶
Overview ¶
implemeted the BSP https://en.wikipedia.org/wiki/Bulk_synchronous_parallel computing model to aid in processing graph data
Index ¶
- Variables
- type Aggregator
- type ComputeFunc
- type Edge
- type Executor
- type ExecutorFactory
- type ExecutorHooks
- type Graph
- func (g *Graph[VT, ET]) AddEdge(srcID, dstID string, initValue ET) error
- func (g *Graph[VT, ET]) AddVertex(id string, initValue VT)
- func (g *Graph[VT, ET]) Aggregator(name string) Aggregator
- func (g *Graph[VT, ET]) Aggregators() map[string]Aggregator
- func (g *Graph[VT, ET]) BroadcastToNeighbors(v *Vertex[VT, ET], msg message.Message) error
- func (g *Graph[VT, ET]) Close() error
- func (g *Graph[VT, ET]) RegisterAggregator(name string, aggregator Aggregator)
- func (g *Graph[VT, ET]) RegisterRelayer(relayer Relayer)
- func (g *Graph[VT, ET]) Reset() error
- func (g *Graph[VT, ET]) SendMessage(dst string, msg message.Message) error
- func (g *Graph[VT, ET]) Superstep() int
- func (g *Graph[VT, ET]) Vertices() map[string]*Vertex[VT, ET]
- type GraphConfig
- type Relayer
- type RelayerFunc
- type Vertex
Constants ¶
This section is empty.
Variables ¶
Functions ¶
This section is empty.
Types ¶
type Aggregator ¶
type Aggregator interface {
Type() string
Set(val any)
Get() any
// updates the Aggregator value based on the current value.
Aggregate(val any)
// Delta returns the change in the aggregator's value since the last
// call to Delta. The delta values can be used in distributed
// aggregator use-cases to reduce local, partially-aggregated values
// into a single value across by feeding them into the Aggregate method
// of a top-level aggregator.
//
// For example, in a distributed counter scenario, each node maintains
// its own local counter instance. At the end of each step, the master
// node calls delta on each local counter and aggregates the values
// to obtain the correct total which is then pushed back to the workers.
Delta() any
}
type ComputeFunc ¶
type ComputeFunc[VT, ET any] func(g *Graph[VT, ET], v *Vertex[VT, ET], msgIt message.Iterator) error
ComputeFunc is a function that a graph instance invokes on each vertex when executing a superstep.
type Executor ¶
type Executor[VT, ET any] struct { // contains filtered or unexported fields }
Executor wraps a Graph instance and provides an orchestration layer for executing supersteps until an error occurs or an exit condition is met. Users can provide an optional set of hooks to be executed before and after each super-step.
func NewExecutor ¶
func NewExecutor[VT, ET any](g *Graph[VT, ET], cb ExecutorHooks[VT, ET]) *Executor[VT, ET]
NewExecutor returns an Executor instance for graph g that invokes the provided list of hooks inside each execution loop.
func (*Executor[VT, ET]) RunSteps ¶
RunSteps executes at most numStep supersteps unless the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.
func (*Executor[VT, ET]) RunToCompletion ¶
RunToCompletion keeps executing supersteps until the context expires, an error occurs or one of the Pre/PostStepKeepRunning callbacks specified at configuration time returns false.
type ExecutorFactory ¶
type ExecutorFactory[VT, ET any] func(*Graph[VT, ET], ExecutorHooks[VT, ET]) *Executor[VT, ET]
ExecutorFactory is a function that creates new Executor instances.
type ExecutorHooks ¶
type ExecutorHooks[VT, ET any] struct { // PreStep, if defined, is invoked before running the next superstep. // This is a good place to initialize variables, aggregators etc. that // will be used for the next superstep. PreStep func(ctx context.Context, g *Graph[VT, ET]) error // PostStep, if defined, is invoked after running a superstep. PostStep func(ctx context.Context, g *Graph[VT, ET], activeInStep int) error // PostStepKeepRunning, if defined, is invoked after running a superstep // to decide whether the stop condition for terminating the run has // been met. The number of the active vertices in the last step is // passed as the second argument. PostStepKeepRunning func(ctx context.Context, g *Graph[VT, ET], activeInStep int) (bool, error) }
ExecutorHooks encapsulates a series of hooks that are invoked by an Executor instance on a graph. All hooks are optional and will be ignored if not specified.
type Graph ¶
type Graph[VT, ET any] struct { // contains filtered or unexported fields }
Graph implements a parallel graph processor based on the concepts described in the Pregel paper https://15799.courses.cs.cmu.edu/fall2013/static/papers/p135-malewicz.pdf .
func NewGraph ¶
func NewGraph[VT, ET any](cfg GraphConfig[VT, ET]) (*Graph[VT, ET], error)
NewGraph creates a new Graph instance using the specified configuration. It is important for callers to invoke Close() on the returned graph instance when they are done using it.
func (*Graph[VT, ET]) AddEdge ¶
AddEdge inserts a directed edge from src to destination and annotates it with the specified initValue. By design, edges are owned by the source and therefore srcID must resolve to a local vertex. Otherwise, AddEdge returns an error.
func (*Graph[VT, ET]) AddVertex ¶
AddVertex inserts a new vertex with the specified id and initial value into the graph. If the vertex already exists, AddVertex will just overwrite its value with the provided initValue.
func (*Graph[VT, ET]) Aggregator ¶
func (g *Graph[VT, ET]) Aggregator(name string) Aggregator
func (*Graph[VT, ET]) Aggregators ¶
func (g *Graph[VT, ET]) Aggregators() map[string]Aggregator
func (*Graph[VT, ET]) BroadcastToNeighbors ¶
BroadcastToNeighbors broadcasts the provided message to the neighboring vertecies (local or remote). Neighbors will recive the message in the next super-step
func (*Graph[VT, ET]) RegisterAggregator ¶
func (g *Graph[VT, ET]) RegisterAggregator(name string, aggregator Aggregator)
func (*Graph[VT, ET]) RegisterRelayer ¶
RegisterRelayer configures a Relayer that the graph will invoke when attempting to deliver a message to a vertex that is not known locally but could potentially be owned by a remote graph instance.
func (*Graph[VT, ET]) Reset ¶
Reset the state of the graph by removing any existing vertices or aggregators and resetting the superstep counter.
func (*Graph[VT, ET]) SendMessage ¶
SendMessage attempts to deliver a message to the vertex with the specified destination ID. Messages are queued for delivery and will be processed by receipients in the next superstep.
If the destination ID is not known by this graph, it might still be a valid ID for a vertex that is owned by a remote graph instance. If the client has provided a Relayer when configuring the graph, SendMessage will delegate message delivery to it.
On the other hand, if no Relayer is defined or the configured RemoteMessageSender returns a ErrDestinationIsLocal error, SendMessage will first check whether an UnknownVertexHandler has been provided at configuration time and invoke it. Otherwise, an ErrInvalidMessageDestination is returned to the caller.
type GraphConfig ¶
type GraphConfig[VT, ET any] struct { // QueueFactory is used by the graph to create message queue instances // for each vertex that is added to the graph. If not specified, the // default in-memory queue will be used instead. QueueFactory message.QueueFactory // ComputeFn is the compute function that will be invoked for each graph // vertex when executing a superstep. A valid ComputeFunc instance is // required for the config to be valid. ComputeFn ComputeFunc[VT, ET] // ComputeWorkers specifies the number of workers to use for invoking // the registered ComputeFunc when executing each superstep. If not // specified, a single worker will be used. ComputeWorkers int }
GraphConfig encapsulates the configuration options for creating graphs.
type Relayer ¶
type Relayer interface {
// Relay a message to a vertex that's not known locally. returns ErrDestinationIsLocal
// if the provided destination is not valid remote vertex.
Relay(dst string, msg message.Message) error
}
Relayer implemeted by types that relay messages to vertecies that are managed by remote graph instance.
type RelayerFunc ¶
RelayerFunc function adapter to the Relayer interface.