Use simple.DAG
Let's take a workflow that contains only two tasks and is executed serially as an example to introduce how to use simple.DAG.
Scope of application
simple.DAG
is well-suited for:
Dividing a task into multiple subtasks and executing them concurrently.
Sequentially executing multiple processes that constitute a task.
Handling non-complex subtasks that require minimal resources and time.
Embedding sub-workflows within a transit.
Dynamically orchestrating workflows as required during runtime.
Terminating the entire execution if an error occurs.
Typically generating no errors, let alone
panic()
.
Definition
Workflow: The advancement or pace of work being accomplished.
Transit: The most granular execution unit, processing incoming data and delivering it to the designated channel.
Channel: The pathway for transferring data, also outlining dependencies between transits.
Execution: The process from input to output as per the workflow's progression.
Context: The environment associated with each execution, carried throughout the process.
Workflow
Here is the workflow:
t:input receives data transmitted by c:input. Upon execution, the outcome is relayed to the c:t11 channel.
Subsequently, t:output executes promptly upon receiving data from the c:t11 channel. The result is then dispatched to c:output.
Import package
The workflow resides within the code package "github.com/rhosocial/go-dag/workflow/simple"
.
Prior to utilization, it's essential to import the code package. For instance:
Declare a simple.DAG instance
Here is the definition of NewWorkflow()
function:
In the definition, TInput
and TOutput
specify the input and output data types. The input and output types can be any
, but this is strongly discouraged.
The return value of this function is a pointer to Workflow[TInput, TOutput]
.
For example:
So, to clarify, you're defining a Workflow where both the input and output data types are strings. However, at this stage, the Workflow is empty and cannot be executed as a valid workflow.
If no errors were encountered when instantiating the workflow, err
is nil
.
Option[TInput, TOutput]
contains the following:
WithChannels()
: Informs the workflow of the list of channel names it should include.WithChannelInput()
: Specifies the name of the initial input channel for the workflow.WithChannelOutput()
: Specifies the name of the final output channel for the workflow.WithDefaultChannels()
: Specifies that the workflow should use default channel names, where the initial input channel is named "input" and the final output channel is named "output".WithTransits()
: Specifies the list of transits the workflow should include.WithLoggers()
: Specifies the list of loggers the workflow should include.
For Workflow
to work properly, you also need to declare channels, transits and loggers.
Initialize workflow with the channels
This step is not necessary. You can refer directly to the next section.
First, you need to specify the channel name associated with each transit in the workflow, as well as define the input and output channels of the workflow.
Here is the definition:
You can pass the return value of this function as a parameter to the simple.NewWorkflow()
. For example:
When initializing the channel names, each name must be unique. The order of names is irrelevant, but the configuration should include at least two channels: the first for input and the last for output. The workflow will transmit data to the specified channel and await output from the designated output channel.
simple.WithChannelInput()
andsimple.WithChannelOutput()
are used to specify the input and output channel names of the entire workflow.
In particular, we provide the WithDefaultChannels()
, which can directly declare "input" and "output" channels and specify them as the input and output of the workflow respectively. Then the above code can be simplified to:
At this time, WithChannels()
no longer needs to specify the "input" and "output" channels. WithDefaultChannels()
and WithChannels()
are order dependent. If WithChannels()
comes after WithDefaultChannels()
, "input" or "output" cannot be declared again. Otherwise, ErrChannelNameExisted
will be reported.
Note that do not use
WithDefaultChannels()
if you want to give other names to the workflow's input and output channels.
Initialize workflow with the transits
You can use simple.WithTransits()
to define all the transits. Here is the definition:
You can pass the return value of those functions as a parameter to the simple.NewWorkflow()
. For example:
In general, you don't need to specify the channel names as described in the previous section because the WithTransits()
method will register the input and output channel names for each transit. However, you must ensure that each channel is only used for input and output once; otherwise, unpredictable consequences may occur.
simple.WithTransits()
can be executed multiple times, regardless of the order. However, the content executed later will merge and overwrite the content executed first.
Initialize the transit
The parameter transits of the simple.WithTransits()
can be defined as follows:
In the NewTransit()
, name
is the name of the transit, options
represents the input channel list, output channel list and worker of the transit. For example:
simple.WithInputs()
and simple.WithOutputs()
are defined as follows:
The names
parameter is the channel name. You need to ensure that each channel name only appears once in the input list and output list of all transits.
Those functions can be executed multiple times, regardless of the order. However, the content executed later will merge and overwrite the content executed first.
WithWorker()
is used to specify the worker function for the current transit. Here is the definiton:
The definition of the worker function is fixed to:
The parameter ctx
is the sub-Context of context.Context
passed in each time it is executed. Therefore, when the superior Context
stops, the ctx
of the current worker will receive a stop signal.
The number of parameters a
of the worker
function and their actual types must correspond to the monitored channels. Therefore, the content of WithInputs()
is order dependent. The parameter types of worker functions are all any
, so you need to verify whether the parameters passed in meet expectations, unless the parameter type does not matter.
Note that the order of parameters received by the
WithInputs()
determines the order of parameters sent to the worker. Therefore, calling theWithInputs()
multiple times requires extra care that the channel name matches the parameters received by the worker. Theworker
also needs to verify the number of parameters received and the type of each parameter.
The return value types of worker
functions are specified as any
and error
. If an error needs to be reported during execution, the second parameter should be set to a specific error. If the error is not nil
, the entire execution of flow will be aborted.
The output of the worker
function is fed to all specified output channels simultaneously, so the order of the output channel names does not matter.
Go language supports the same channel to be monitored by multiple places. But we did not use this method. Instead, we used a one-to-one correspondence between channels and subsequent workers. The purpose of this is to facilitate the directed acyclic graph of the entire execution flow, topological sorting, sequential execution, logging and debugging.
If you think that a worker's error or panic will not affect subsequent execution, you can specify the allowFailure
attribute for the transit to which it belongs. For example:
When the first transit worker encounters an error, its error will not terminate the execution of the entire process. Its return value will also be injected into the specified channel. If the worker panics, nil
will be injected into the specified channel.
Note that the
WithAllowFailure()
can be executed multiple times, but only the last one will take effect.
Note, try to avoid executing
panic()
in the worker. Otherwise, the entire workflow will be terminated if failures not allowed.
Distribute and import worker of transit
If you want to distribute your transit worker, you can declare the worker function as an exportable variable.
If the transit you are referencing comes from someone else, you can combine the imported workers into your own NewTransit()
method.
If you feel that simple.Transit
does not meet your needs, you can customize the Transit
structure and only need to implement all the methods specified by the simple.TransitInterface
.
For example:
When distributing, you need to indicate the input parameter list and their respective types and legal ranges, and indicate the corresponding output format.
Since it is impossible to know whether the input channel provided by the user meets the requirements during development, your worker needs to verify each input.
In addition, it is also necessary to tell the user what errors may be returned and the possible panic, so that the user can judge how to handle these errors, including whether they can be skipped.
If your worker needs to be put into production or distributed to others, it is strongly recommended to test the worker and disclose the test results to the user.
Best Practices
transit channel:
Each transit should have less than one input channel, otherwise the transit will never be called when the workflow executes.
Each Transit does not need to specify an output channel, which means that the execution results of this transit will not be used as input to any other transits. However, the transit execution error will still affect the process.
In a workflow, each channel can only be used as input and output once, and is listed in different transits and cannot be reused. Otherwise, unpredictable consequences may occur.
transit worker:
You need to determine whether the number and type of parameters received by the worker are as expected. If it does not match,
ErrValueTypeMismatch
should be returned. It is better to check all parameter types at once and useerrors.Join()
to return.If your worker is time-consuming or supports cancellation, you should receive the incoming
ctx
and listen for cancellation notifications.The parameters processed by the worker are limited to those passed in through the channel. It is strongly not recommended to use global variables or pass them in through
ctx
, except for parameters that are limited to distinguishing requests (such as request id) and collecting logs generated during execution.
Execute the workflow
If you want to execute the workflow repeatedly after executing it once, you need to call the Execute()
method. For example:
The first parameter is the context and supports CancelCause
. That is, during the execution process, the context can notify execution abort.
The second parameter is the pointer to the initial input of the workflow. The type of this parameter must be consistent with the definition.
There is only one output of the function, which is the execution result. The type is consistent with the definition. In particular, nil
is returned if execution aborts. Therefore, it is not recommended to define the return type as any
, nor is it recommended to have a final return value of nil
.
Running only once
If you want to not to accept input after executing it once, you can call the RunOnce()
method. For example:
After destruction, an error will be reported if Execute()
or RunOnce()
is executed again.
Cancel execution
If you want to cancel during execution, you can call the following method:
If you don't want to provide a reason, you can pass nil
, but this is strongly discouraged.
Note that after calling this method, all transits being executed will receive a termination signal. But Execute()
and RunOnce()
can still be called.
When canceled actively, or when execution is terminated due to an error or panic()
, the return value of Execute()
and RunOnce()
is nil
.
In addition, you can use context's cancellation function, for example:
The above code declares a context that expires in 1500 milliseconds. If the workflow execution does not end for more than 1500 milliseconds, it will be notified to stop immediately.
In fact, workflow supports any context that implements the Done()
method. For details, please see https://go.dev/blog/context.
Nested workflow
You can embed other workflow into worker function of a transit. For example:
Best Practices
executing workflow:
It is not recommended to execute the same workflow multiple times asynchronously unless the workflow is linear. Otherwise, it may happen that the same transition is executed later but the result is returned first.
If you want to ensure sequential execution, you need to wait for one execution to complete before continuing.
If you want concurrency without destroying the order, you should use a worker pool and add a certain number of workflows for waiting calls.
nested workflow:
The context of the nested workflow must be the context passed in by the current worker, or its sub-context.
Will not report errors, or be able to capture errors and return them.
Resources consumed can be estimated, including time, CPU, memory, etc.
Use logger
Workflow
support comes with a logger. Here is the definition:
WithLogger()
can be passed in multiple times. All will be executed once, so please do not pass in too many to avoid affecting performance.
Note, if you want to test the performance of each transit in the workflow, please do not turn on log output in the benchmark. If necessary, a limited amount of log output should be ensured, otherwise the log will explode.
We have prepared a default logger and error collector.
Log event and predefined events
Currently, the following log events will be encountered during workflow execution:
LogEventWorkflowStart: when the workflow starts.
LogEventWorkflowEnd: when the workflow ends.
LogEventTransitInputReady: when data is injected into the channel.
LogEventTransitOutputReady: when the channel receives data.
LogEventTransitStart: when the transit worker starts.
LogEventTransitEnd: when the transit worker ends.
LogEventTransitCanceled: when the transit is notified of cancellation.
LogEventTransitError: when the transit worker reports an error.
LogEventTransitWorkerPanicked: when the transit worker panicked.
LogEventErrorValueTypeMistmatch: when a parameter received by the transit worker does not match expectations, or the final output result type does not match expectation.
Default logger
We have prepared a default logger for debugging and tracing purposes: simple.Logger
Here is the definition:
You can get a logger instance using the NewLogger()
method. The default method can cover most situations.
After the instantiation is completed, it can be passed in as a option for initializing the Workflow
. For example:
This default logger specifies four logging levels:
By default, only LevelWarning
and LeverError
are displayed. If you want to display LevelDebug
and above, you need to set the flag after declaring the logger:
Currently, LevelDebug
is tracked before the worker starts and after successful execution. If an error occurs in the worker execution returned, or the execution is notified to be canceled, LevelWarning
is tracked. If the worker is panicked, LevelError
is tracked. When the log level is warning, the background color is yellow; when the log level is error, the background color is red. For example:
Error collector
Workflows may cause errors, but the errors will not eventually return. We provide an error collector to collect errors during execution.
You can instantiate an error collector using simple.NewErrorCollector()
. For example:
The second statement of the above code starts a listener.
Note that
the listener must be started in asynchronous mode, otherwise it will always be blocked here.
The context of the
Listen()
are independent of the context of theWorkflow.Execute()
orWorkflow.RunOnce()
. The two can be the same or different.
Next, pass in the error collector when instantiating the Workflow
. For example:
You can get all collected errors using the Get()
. For example:
The return value is an error list, and the order of elements is based on the time order in which the errors were collected.
Customize Logger
If you want to implement the logger yourself, you can declare a struct and implement all methods of LoggerInterface
. Here is the definition of LoggerInterface
:
The Log()
method parameter events is passed in log events. Here is the definition of LogEventInterface
:
Among them, the return value of Name()
is recommended to be the name of the transit where the log occurs. LogLevel()
returns the log level. Message()
returns the log message, but it is not recommended to be too long.
Benchmark
Two parallel transits
Source: https://github.com/rhosocial/go-dag/blob/v1.0.0/workflow/simple/dag_test.go#L185
Here is the workflow:
The above benchmark test results are only the results given by GitHub, and there is no guarantee that any environment can achieve the same efficiency.
Best Practises
Dividing a complex task into subtasks requires careful consideration to ensure clear input and output, accurate resource estimation, and error impact assessment. Here's a breakdown of the process:
Task Identification: Identify the overarching complex task and break it down into smaller, manageable subtasks.
Subtask Definition:
Define each subtask with clear input and output parameters. Ensure that inputs and outputs are well-defined and unambiguous.
Estimate the resource consumption (such as CPU, memory, disk space, etc.) for each subtask accurately to allocate resources efficiently.
Subtask Process:
Clearly define the process for each subtask, including all necessary steps and operations.
Consider the potential impact of errors on the workflow for each subtask. Evaluate how errors may propagate through the workflow and affect subsequent subtasks.
Execution Order:
If the workflow is non-linear, it's recommended to complete each execution before proceeding to the next one. This helps prevent delays caused by time-consuming executions.
If execution order is not critical and subtasks do not generate exceptions or cancel the execution process, concurrent execution may commence before a previous execution completes. However, ensure that execution contexts are managed properly to avoid data corruption or conflicts.
Consider creating a new workflow instance for each execution to ensure isolation and prevent context overwriting, especially for publicly published transits.
Context Management:
Utilize
context.Context
to pass global values if necessary. However, exercise caution, especially for transits intended for public use. Over-reliance on global context may lead to unexpected behavior and dependencies.
By following these steps, you can effectively divide a complex task into clear and manageable subtasks, ensuring efficient execution and error handling within the workflow.
Test worker and workflow
Testing should include:
The worker needs to verify each input. Illegal input can report an error in time.
The worker needs to verify the output of typical input.
The worker needs to count the resource consumption of typical inputs, including time, CPU, memory, network (if any), etc.
The workflow needs to test a variety of typical inputs and count resource consumption, critical paths during execution, etc. If necessary, the worker developers on the critical path can be notified to improve efficiency or use other solutions.
Troubleshooting
Possible behavior when some parameters are abnormal. You can raise issues on the GitHub issues page. If a problem is encountered frequently, it will be updated here.
Last updated