Use simple.DAG

Let's take a workflow that contains only two tasks and is executed serially as an example to introduce how to use simple.DAG.

Scope of application

simple.DAG is well-suited for:

  • Dividing a task into multiple subtasks and executing them concurrently.

  • Sequentially executing multiple processes that constitute a task.

  • Handling non-complex subtasks that require minimal resources and time.

  • Embedding sub-workflows within a transit.

  • Dynamically orchestrating workflows as required during runtime.

  • Terminating the entire execution if an error occurs.

  • Typically generating no errors, let alone panic().

Definition

  • Workflow: The advancement or pace of work being accomplished.

  • Transit: The most granular execution unit, processing incoming data and delivering it to the designated channel.

  • Channel: The pathway for transferring data, also outlining dependencies between transits.

  • Execution: The process from input to output as per the workflow's progression.

  • Context: The environment associated with each execution, carried throughout the process.

Workflow

Here is the workflow:

  • t:input receives data transmitted by c:input. Upon execution, the outcome is relayed to the c:t11 channel.

  • Subsequently, t:output executes promptly upon receiving data from the c:t11 channel. The result is then dispatched to c:output.

Import package

The workflow resides within the code package "github.com/rhosocial/go-dag/workflow/simple".

Prior to utilization, it's essential to import the code package. For instance:

import "github.com/rhosocial/go-dag/workflow/simple"

Declare a simple.DAG instance

Here is the definition of NewWorkflow() function:

func NewWorkflow[TInput, TOutput any](options ...Option[TInput, TOutput]) *Workflow[TInput, TOutput]

In the definition, TInput and TOutput specify the input and output data types. The input and output types can be any, but this is strongly discouraged.

The return value of this function is a pointer to Workflow[TInput, TOutput].

For example:

f, err := simple.NewWorkflow[string, string]()

So, to clarify, you're defining a Workflow where both the input and output data types are strings. However, at this stage, the Workflow is empty and cannot be executed as a valid workflow.

If no errors were encountered when instantiating the workflow, err is nil.

Option[TInput, TOutput] contains the following:

  • WithChannels(): Informs the workflow of the list of channel names it should include.

  • WithChannelInput(): Specifies the name of the initial input channel for the workflow.

  • WithChannelOutput(): Specifies the name of the final output channel for the workflow.

  • WithDefaultChannels(): Specifies that the workflow should use default channel names, where the initial input channel is named "input" and the final output channel is named "output".

  • WithTransits(): Specifies the list of transits the workflow should include.

  • WithLoggers(): Specifies the list of loggers the workflow should include.

For Workflow to work properly, you also need to declare channels, transits and loggers.

Initialize workflow with the channels

This step is not necessary. You can refer directly to the next section.

First, you need to specify the channel name associated with each transit in the workflow, as well as define the input and output channels of the workflow.

Here is the definition:

func WithChannels[TInput, TOutput any](names ...string) Option[TInput, TOutput]
func WithChannelInput[TInput, TOutput any](name string) Option[TInput, TOutput]
func WithChannelOutput[TInput, TOutput any](name string) Option[TInput, TOutput]

You can pass the return value of this function as a parameter to the simple.NewWorkflow(). For example:

f := simple.NewWorkflow[string, string](
    simple.WithChannels[string, string]("input", "output", "t11"),
    simple.WithChannelInput[string, string]("input"),
    simple.WithChannelOutput[string, string]("output"),
)

When initializing the channel names, each name must be unique. The order of names is irrelevant, but the configuration should include at least two channels: the first for input and the last for output. The workflow will transmit data to the specified channel and await output from the designated output channel.

simple.WithChannelInput() andsimple.WithChannelOutput() are used to specify the input and output channel names of the entire workflow.

In particular, we provide the WithDefaultChannels(), which can directly declare "input" and "output" channels and specify them as the input and output of the workflow respectively. Then the above code can be simplified to:

f := simple.NewWorkflow[string, string](
    simple.WithDefaultChannels[string, string](),
    simple.WithChannels[string, string]("t11"),
)

At this time, WithChannels() no longer needs to specify the "input" and "output" channels. WithDefaultChannels() and WithChannels() are order dependent. If WithChannels() comes after WithDefaultChannels(), "input" or "output" cannot be declared again. Otherwise, ErrChannelNameExisted will be reported.

Note that do not use WithDefaultChannels() if you want to give other names to the workflow's input and output channels.

Initialize workflow with the transits

You can use simple.WithTransits() to define all the transits. Here is the definition:

func WithTransits[TInput, TOutput any](transits ...*Transit) Option[TInput, TOutput]

You can pass the return value of those functions as a parameter to the simple.NewWorkflow(). For example:

f := simple.NewWorkflow[string, string](
    simple.WithDefaultChannels[string, string](),
    simple.WithTransits[string, string](transits...),
)

In general, you don't need to specify the channel names as described in the previous section because the WithTransits() method will register the input and output channel names for each transit. However, you must ensure that each channel is only used for input and output once; otherwise, unpredictable consequences may occur.

simple.WithTransits() can be executed multiple times, regardless of the order. However, the content executed later will merge and overwrite the content executed first.

Initialize the transit

The parameter transits of the simple.WithTransits() can be defined as follows:

func NewTransit(name string, options ...TransitOption) *Transit

In the NewTransit(), name is the name of the transit, options represents the input channel list, output channel list and worker of the transit. For example:

worker := func(ctx context.Context, a ...any) (any, error) {
    log.Println("started at", time.Now())
    time.Sleep(time.Duration(a[0].(int)) * time.Second)
    log.Println("ended at", time.Now())
    return a[0], nil
}

transits := []*simple.Transit{
    simple.NewTransit("input",
       simple.WithInputs("input"),
       simple.WithOutputs("t11"),
       simple.WithWorker(worker)),
    simple.NewTransit("output", 
       simple.WithInputs("t11"),
       simple.WithOutputs("output"),
       simple.WithWorker(worker)),
}

simple.WithInputs() and simple.WithOutputs() are defined as follows:

func WithInputs(names ...string) TransitOption
func WithOutputs(names ...string) TransitOption

The names parameter is the channel name. You need to ensure that each channel name only appears once in the input list and output list of all transits.

Those functions can be executed multiple times, regardless of the order. However, the content executed later will merge and overwrite the content executed first.

WithWorker() is used to specify the worker function for the current transit. Here is the definiton:

func WithWorker(worker func(context.Context, ...any) (any, error)) TransitOption

The definition of the worker function is fixed to:

func(ctx context.Context, a ...any) (any, error) {
    return ..., ...
}

The parameter ctx is the sub-Context of context.Context passed in each time it is executed. Therefore, when the superior Context stops, the ctx of the current worker will receive a stop signal.

The number of parameters a of the worker function and their actual types must correspond to the monitored channels. Therefore, the content of WithInputs() is order dependent. The parameter types of worker functions are all any, so you need to verify whether the parameters passed in meet expectations, unless the parameter type does not matter.

Note that the order of parameters received by the WithInputs() determines the order of parameters sent to the worker. Therefore, calling the WithInputs() multiple times requires extra care that the channel name matches the parameters received by the worker. The worker also needs to verify the number of parameters received and the type of each parameter.

The return value types of worker functions are specified as any and error. If an error needs to be reported during execution, the second parameter should be set to a specific error. If the error is not nil, the entire execution of flow will be aborted.

The output of the worker function is fed to all specified output channels simultaneously, so the order of the output channel names does not matter.

Go language supports the same channel to be monitored by multiple places. But we did not use this method. Instead, we used a one-to-one correspondence between channels and subsequent workers. The purpose of this is to facilitate the directed acyclic graph of the entire execution flow, topological sorting, sequential execution, logging and debugging.

If you think that a worker's error or panic will not affect subsequent execution, you can specify the allowFailure attribute for the transit to which it belongs. For example:

worker := func(ctx context.Context, a ...any) (any, error) {
    log.Println("started at", time.Now())
    time.Sleep(time.Duration(a[0].(int)) * time.Second)
    log.Println("ended at", time.Now())
    return a[0], nil
}

transits := []*simple.Transit{
    simple.NewTransit("input",
       simple.WithInputs("input"),
       simple.WithOutputs("t11"),
       simple.WithWorker(worker)),
       simple.WithAllowFailure(true),
    simple.NewTransit("output", 
       simple.WithInputs("t11"),
       simple.WithOutputs("output"),
       simple.WithWorker(worker)),
}

When the first transit worker encounters an error, its error will not terminate the execution of the entire process. Its return value will also be injected into the specified channel. If the worker panics, nil will be injected into the specified channel.

Note that the WithAllowFailure() can be executed multiple times, but only the last one will take effect.

Note, try to avoid executing panic() in the worker. Otherwise, the entire workflow will be terminated if failures not allowed.

Distribute and import worker of transit

If you want to distribute your transit worker, you can declare the worker function as an exportable variable.

If the transit you are referencing comes from someone else, you can combine the imported workers into your own NewTransit() method.

If you feel that simple.Transit does not meet your needs, you can customize the Transit structure and only need to implement all the methods specified by the simple.TransitInterface.

For example:

package main

import (
    "context"

    "github.com/rhosocial/go-dag/workflow/simple"
)

var ExternalTransitWorker = func(ctx context.Context, a ...any) (any, error) {
    return a[0], nil
}

func NewTransit(name string) simple.TransitInterface {
    return simple.NewTransit(name,
        simple.WithInputs("input"),
        simple.WithOutputs("output"),
        simple.WithWorker(ExternalTransitWorker),
    )
}

type ExternalTransit struct {
    simple.TransitInterface
}

When distributing, you need to indicate the input parameter list and their respective types and legal ranges, and indicate the corresponding output format.

Since it is impossible to know whether the input channel provided by the user meets the requirements during development, your worker needs to verify each input.

In addition, it is also necessary to tell the user what errors may be returned and the possible panic, so that the user can judge how to handle these errors, including whether they can be skipped.

If your worker needs to be put into production or distributed to others, it is strongly recommended to test the worker and disclose the test results to the user.

Best Practices

transit channel:

  • Each transit should have less than one input channel, otherwise the transit will never be called when the workflow executes.

  • Each Transit does not need to specify an output channel, which means that the execution results of this transit will not be used as input to any other transits. However, the transit execution error will still affect the process.

  • In a workflow, each channel can only be used as input and output once, and is listed in different transits and cannot be reused. Otherwise, unpredictable consequences may occur.

transit worker:

  • You need to determine whether the number and type of parameters received by the worker are as expected. If it does not match, ErrValueTypeMismatch should be returned. It is better to check all parameter types at once and use errors.Join() to return.

  • If your worker is time-consuming or supports cancellation, you should receive the incoming ctx and listen for cancellation notifications.

  • The parameters processed by the worker are limited to those passed in through the channel. It is strongly not recommended to use global variables or pass them in through ctx, except for parameters that are limited to distinguishing requests (such as request id) and collecting logs generated during execution.

Execute the workflow

If you want to execute the workflow repeatedly after executing it once, you need to call the Execute() method. For example:

input := "input"
result := f.Execute(context.Background(), &input))

The first parameter is the context and supports CancelCause. That is, during the execution process, the context can notify execution abort.

The second parameter is the pointer to the initial input of the workflow. The type of this parameter must be consistent with the definition.

There is only one output of the function, which is the execution result. The type is consistent with the definition. In particular, nil is returned if execution aborts. Therefore, it is not recommended to define the return type as any, nor is it recommended to have a final return value of nil.

Running only once

If you want to not to accept input after executing it once, you can call the RunOnce() method. For example:

input := "input"
result := f.RunOnce(context.Background(), &input))

After destruction, an error will be reported if Execute() or RunOnce() is executed again.

Cancel execution

If you want to cancel during execution, you can call the following method:

f.Cancel(cause error)

If you don't want to provide a reason, you can pass nil, but this is strongly discouraged.

Note that after calling this method, all transits being executed will receive a termination signal. But Execute() and RunOnce() can still be called.

When canceled actively, or when execution is terminated due to an error or panic(), the return value of Execute() and RunOnce() is nil.

In addition, you can use context's cancellation function, for example:

ctx1, cancelFunc := context.WithTimeoutCause(context.Background(), time.Millisecond*1500, errors.New("canceled by parent ctx"))
output := f.Execute(ctx1, &input)

The above code declares a context that expires in 1500 milliseconds. If the workflow execution does not end for more than 1500 milliseconds, it will be notified to stop immediately.

In fact, workflow supports any context that implements the Done() method. For details, please see https://go.dev/blog/context.

Nested workflow

You can embed other workflow into worker function of a transit. For example:

worker1 := func(ctx context.Context, a ...any) (any, error) {
	log.Println("started at", time.Now())
	time.Sleep(time.Duration(a[0].(int)) * time.Second)
	log.Println("ended at", time.Now())
	return a[0], nil
}
worker2 := func(ctx context.Context, a ...any) (any, error) {
	channelInputs1 := []string{"input"}
	channelOutputs1 := []string{"t11"}
	channelOutputs2 := []string{"output"}
	transits := []*simple.Transit{
		simple.NewTransit("i:input", simple.WithInputs(channelInputs1...), simple.WithOutputs(channelOutputs1...), simple.WithWorker(worker1)),
		simple.NewTransit("i:output", simple.WithInputs(channelOutputs1...), simple.WithOutputs(channelOutputs2...), simple.WithWorker(worker1)),
	}
	f1 := simple.NewWorkflow[int, int](
		simple.WithDefaultChannels[int, int](),
		simple.WithTransits[int, int](transits...),
		simple.WithLogger[int, int](simple.NewLogger()),
	)
	input := a[0].(int)
	output := f1.Execute(ctx, &input)
	return *output, nil
}
channelInputs1 := []string{"input"}
channelOutputs1 := []string{"t11"}
channelOutputs2 := []string{"t12"}
channelOutputs3 := []string{"output"}
transits := []*simple.Transit{
	simple.NewTransit("input",
		simple.WithInputs(channelInputs1...),
		simple.WithOutputs(channelOutputs1...),
		simple.WithWorker(worker1)),
	simple.NewTransit("transit",
		simple.WithInputs(channelOutputs1...),
		simple.WithOutputs(channelOutputs2...),
		simple.WithWorker(worker2)),
	simple.NewTransit("output",
		simple.WithInputs(channelOutputs2...),
		simple.WithOutputs(channelOutputs3...),
		simple.WithWorker(worker1)),
}
f := simple.NewWorkflow[int, int](
	simple.WithDefaultChannels[int, int](),
	simple.WithTransits[int, int](transits...),
	simple.WithLogger[int, int](simple.NewLogger()),
)
input := 1
output := f.Execute(context.Background(), &input)
log.Println(*output)

Best Practices

executing workflow:

  • It is not recommended to execute the same workflow multiple times asynchronously unless the workflow is linear. Otherwise, it may happen that the same transition is executed later but the result is returned first.

  • If you want to ensure sequential execution, you need to wait for one execution to complete before continuing.

  • If you want concurrency without destroying the order, you should use a worker pool and add a certain number of workflows for waiting calls.

nested workflow:

  • The context of the nested workflow must be the context passed in by the current worker, or its sub-context.

  • Will not report errors, or be able to capture errors and return them.

  • Resources consumed can be estimated, including time, CPU, memory, etc.

Use logger

Workflow support comes with a logger. Here is the definition:

func WithLoggers[TInput, TOutput any](logger LoggerInterface) Option[TInput, TOutput]

WithLogger() can be passed in multiple times. All will be executed once, so please do not pass in too many to avoid affecting performance.

Note, if you want to test the performance of each transit in the workflow, please do not turn on log output in the benchmark. If necessary, a limited amount of log output should be ensured, otherwise the log will explode.

We have prepared a default logger and error collector.

Log event and predefined events

Currently, the following log events will be encountered during workflow execution:

  • LogEventWorkflowStart: when the workflow starts.

  • LogEventWorkflowEnd: when the workflow ends.

  • LogEventTransitInputReady: when data is injected into the channel.

  • LogEventTransitOutputReady: when the channel receives data.

  • LogEventTransitStart: when the transit worker starts.

  • LogEventTransitEnd: when the transit worker ends.

  • LogEventTransitCanceled: when the transit is notified of cancellation.

  • LogEventTransitError: when the transit worker reports an error.

  • LogEventTransitWorkerPanicked: when the transit worker panicked.

  • LogEventErrorValueTypeMistmatch: when a parameter received by the transit worker does not match expectations, or the final output result type does not match expectation.

Default logger

We have prepared a default logger for debugging and tracing purposes: simple.Logger

Here is the definition:

type Logger struct {
    // contains filtered or unexported fields
}

You can get a logger instance using the NewLogger() method. The default method can cover most situations.

logger := simple.NewLogger()

After the instantiation is completed, it can be passed in as a option for initializing the Workflow. For example:

logger := simple.NewLogger()
f := simple.NewWorkflow[string, string](
    simple.WithChannels[string, string]("input", "output", "t11"),
    simple.WithChannelInput[string, string]("input"),
    simple.WithChannelOutput[string, string]("output"),
    simple.WithTransits[string, string](transits...),
    simple.WithLoggers[string, string](logger),
)

This default logger specifies four logging levels:

const (
    LevelDebug LogLevel = iota
    LevelInfo
    LevelWarning
    LevelError
)

By default, only LevelWarning and LeverError are displayed. If you want to display LevelDebug and above, you need to set the flag after declaring the logger:

logger.SetFlags(simple.LDebugEnabled)

Currently, LevelDebug is tracked before the worker starts and after successful execution. If an error occurs in the worker execution returned, or the execution is notified to be canceled, LevelWarning is tracked. If the worker is panicked, LevelError is tracked. When the log level is warning, the background color is yellow; when the log level is error, the background color is red. For example:

Error collector

Workflows may cause errors, but the errors will not eventually return. We provide an error collector to collect errors during execution.

You can instantiate an error collector using simple.NewErrorCollector(). For example:

errorCollector := simple.NewErrorCollector()
go errorCollector.Listen(context.Background())

The second statement of the above code starts a listener.

Note that

  1. the listener must be started in asynchronous mode, otherwise it will always be blocked here.

  2. The context of the Listen() are independent of the context of the Workflow.Execute() or Workflow.RunOnce(). The two can be the same or different.

Next, pass in the error collector when instantiating the Workflow. For example:

f := simple.NewWorkflow[string, string](
    simple.WithChannels[string, string]("input", "output", "t11"),
    simple.WithChannelInput[string, string]("input"),
    simple.WithChannelOutput[string, string]("output"),
    simple.WithTransits[string, string](transits...),
    simple.WithLoggers[string, string](logger, errorCollector),
)

You can get all collected errors using the Get(). For example:

errors := errorCollector.Get()

The return value is an error list, and the order of elements is based on the time order in which the errors were collected.

Customize Logger

If you want to implement the logger yourself, you can declare a struct and implement all methods of LoggerInterface. Here is the definition of LoggerInterface:

type LoggerInterface interface {
    // Log an event.
    Log(ctx context.Context, events ...LogEventInterface)
    SetFlags(uint)
}

The Log() method parameter events is passed in log events. Here is the definition of LogEventInterface:

type LogEventInterface interface {
    Name() string
    Level() LogLevel
    Message() string
}

Among them, the return value of Name() is recommended to be the name of the transit where the log occurs. LogLevel() returns the log level. Message() returns the log message, but it is not recommended to be too long.

Benchmark

go test -race -v ./... -bench . -run ^$ -count 10 -benchmem
goos: linux
goarch: amd64
pkg: github.com/rhosocial/go-dag/workflow
cpu: AMD EPYC 7763 64-Core Processor       

Two parallel transits

Source: https://github.com/rhosocial/go-dag/blob/v1.0.0/workflow/simple/dag_test.go#L185

Here is the workflow:

2024-01-27T06:05:00.4489512Z BenchmarkWorkflowTwoParallelTransits
2024-01-27T06:05:00.4493938Z BenchmarkWorkflowTwoParallelTransits/run_successfully
2024-01-27T06:05:00.4571447Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007029 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4633008Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007023 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4693155Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0006692 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4753693Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007548 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4811466Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0006532 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4868911Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005953 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4930640Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007030 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4990657Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005967 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.5043634Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005634 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.5097907Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005825 ns/op	       0 B/op	       0 allocs/op

The above benchmark test results are only the results given by GitHub, and there is no guarantee that any environment can achieve the same efficiency.

Best Practises

Dividing a complex task into subtasks requires careful consideration to ensure clear input and output, accurate resource estimation, and error impact assessment. Here's a breakdown of the process:

  1. Task Identification: Identify the overarching complex task and break it down into smaller, manageable subtasks.

  2. Subtask Definition:

    • Define each subtask with clear input and output parameters. Ensure that inputs and outputs are well-defined and unambiguous.

    • Estimate the resource consumption (such as CPU, memory, disk space, etc.) for each subtask accurately to allocate resources efficiently.

  3. Subtask Process:

    • Clearly define the process for each subtask, including all necessary steps and operations.

    • Consider the potential impact of errors on the workflow for each subtask. Evaluate how errors may propagate through the workflow and affect subsequent subtasks.

  4. Execution Order:

    • If the workflow is non-linear, it's recommended to complete each execution before proceeding to the next one. This helps prevent delays caused by time-consuming executions.

    • If execution order is not critical and subtasks do not generate exceptions or cancel the execution process, concurrent execution may commence before a previous execution completes. However, ensure that execution contexts are managed properly to avoid data corruption or conflicts.

    • Consider creating a new workflow instance for each execution to ensure isolation and prevent context overwriting, especially for publicly published transits.

  5. Context Management:

    • Utilize context.Context to pass global values if necessary. However, exercise caution, especially for transits intended for public use. Over-reliance on global context may lead to unexpected behavior and dependencies.

By following these steps, you can effectively divide a complex task into clear and manageable subtasks, ensuring efficient execution and error handling within the workflow.

Test worker and workflow

Testing should include:

  1. The worker needs to verify each input. Illegal input can report an error in time.

  2. The worker needs to verify the output of typical input.

  3. The worker needs to count the resource consumption of typical inputs, including time, CPU, memory, network (if any), etc.

  4. The workflow needs to test a variety of typical inputs and count resource consumption, critical paths during execution, etc. If necessary, the worker developers on the critical path can be notified to improve efficiency or use other solutions.

Troubleshooting

Possible behavior when some parameters are abnormal. You can raise issues on the GitHub issues page. If a problem is encountered frequently, it will be updated here.

https://github.com/rhosocial/go-dag/issues

Last updated