# Use simple.DAG

## Scope of application

`simple.DAG` is well-suited for:

* Dividing a task into multiple subtasks and executing them concurrently.
* Sequentially executing multiple processes that constitute a task.
* Handling non-complex subtasks that require minimal resources and time.
* Embedding sub-workflows within a transit.
* Dynamically orchestrating workflows as required during runtime.
* Terminating the entire execution if an error occurs.
* Typically generating no errors, let alone `panic()`.

## Definition

* **Workflow**: The advancement or pace of work being accomplished.
* **Transit**: The most granular execution unit, processing incoming data and delivering it to the designated channel.
* **Channel**: The pathway for transferring data, also outlining dependencies between transits.
* **Execution**: The process from input to output as per the workflow's progression.
* **Context**: The environment associated with each execution, carried throughout the process.

## Workflow

Here is the workflow:

<figure><img src="https://1486875518-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Foyezdgt4RCgef0Y2fFzx%2Fuploads%2FIYuqIYUhJfR3jNHYpeOm%2Fimage.png?alt=media&#x26;token=188c8b4c-c7bb-440c-9c94-8a3b0fbbfd9e" alt=""><figcaption><p>Workflow: a simplest example</p></figcaption></figure>

* **t:input** receives data transmitted by **c:input**. Upon execution, the outcome is relayed to the **c:t11** channel.
* Subsequently, **t:output** executes promptly upon receiving data from the **c:t11** channel. The result is then dispatched to **c:output**.

## Import package

The workflow resides within the code package `"github.com/rhosocial/go-dag/workflow/simple"`.

Prior to utilization, it's essential to import the code package. For instance:

```go
import "github.com/rhosocial/go-dag/workflow/simple"
```

## Declare a simple.DAG instance

Here is the definition of `NewWorkflow()` function:

```go
func NewWorkflow[TInput, TOutput any](options ...Option[TInput, TOutput]) *Workflow[TInput, TOutput]
```

In the definition, `TInput` and `TOutput` specify the input and output data types. The input and output types can be `any`, but this is strongly discouraged.

The return value of this function is a pointer to `Workflow[TInput, TOutput]`.

For example:

```go
f, err := simple.NewWorkflow[string, string]()
```

So, to clarify, you're defining a Workflow where both the input and output data types are strings. However, at this stage, the Workflow is empty and cannot be executed as a valid workflow.

If no errors were encountered when instantiating the workflow, `err` is `nil`.

`Option[TInput, TOutput]` contains the following:

* `WithChannels()`: Informs the workflow of the list of channel names it should include.
* `WithChannelInput()`: Specifies the name of the initial input channel for the workflow.
* `WithChannelOutput()`: Specifies the name of the final output channel for the workflow.
* `WithDefaultChannels()`: Specifies that the workflow should use default channel names, where the initial input channel is named "input" and the final output channel is named "output".
* `WithTransits()`: Specifies the list of transits the workflow should include.
* `WithLoggers()`: Specifies the list of loggers the workflow should include.

For `Workflow` to work properly, you also need to declare channels, transits and loggers.

### Initialize workflow with the channels

> This step is not necessary. You can refer directly to the next section.

First, you need to specify the channel name associated with each transit in the workflow, as well as define the input and output channels of the workflow.

Here is the definition:

```go
func WithChannels[TInput, TOutput any](names ...string) Option[TInput, TOutput]
func WithChannelInput[TInput, TOutput any](name string) Option[TInput, TOutput]
func WithChannelOutput[TInput, TOutput any](name string) Option[TInput, TOutput]
```

You can pass the return value of this function as a parameter to the `simple.NewWorkflow()`. For example:

```go
f := simple.NewWorkflow[string, string](
    simple.WithChannels[string, string]("input", "output", "t11"),
    simple.WithChannelInput[string, string]("input"),
    simple.WithChannelOutput[string, string]("output"),
)
```

When initializing the channel names, each name must be unique. The order of names is irrelevant, but the configuration should include at least two channels: the first for input and the last for output. The workflow will transmit data to the specified channel and await output from the designated output channel.

`simple.WithChannelInput()` and`simple.WithChannelOutput()` are used to specify the input and output channel names of the entire workflow.

In particular, we provide the `WithDefaultChannels()`, which can directly declare "input" and "output" channels and specify them as the input and output of the workflow respectively. Then the above code can be simplified to:

```go
f := simple.NewWorkflow[string, string](
    simple.WithDefaultChannels[string, string](),
    simple.WithChannels[string, string]("t11"),
)
```

At this time, `WithChannels()` no longer needs to specify the "input" and "output" channels. `WithDefaultChannels()` and `WithChannels()` are order dependent. If `WithChannels()` comes after `WithDefaultChannels()`, "input" or "output" cannot be declared again. Otherwise, `ErrChannelNameExisted` will be reported.

> Note that do not use `WithDefaultChannels()` if you want to give other names to the workflow's input and output channels.

### Initialize workflow with the transits

You can use	`simple.WithTransits()` to define all the transits. Here is the definition:

```go
func WithTransits[TInput, TOutput any](transits ...*Transit) Option[TInput, TOutput]
```

You can pass the return value of those functions as a parameter to the `simple.NewWorkflow()`. For example:

```go
f := simple.NewWorkflow[string, string](
    simple.WithDefaultChannels[string, string](),
    simple.WithTransits[string, string](transits...),
)
```

In general, you don't need to specify the channel names as described in the previous section because the `WithTransits()` method will register the input and output channel names for each transit. However, you must ensure that each channel is only used for input and output once; otherwise, unpredictable consequences may occur.

`simple.WithTransits()` can be executed multiple times, regardless of the order. However, the content executed later will merge and overwrite the content executed first.

#### Initialize the transit

The parameter transits of the `simple.WithTransits()` can be defined as follows:

```go
func NewTransit(name string, options ...TransitOption) *Transit
```

In the `NewTransit()`, `name` is the name of the transit, `options` represents the input channel list, output channel list and worker of the transit. For example:

```go
worker := func(ctx context.Context, a ...any) (any, error) {
    log.Println("started at", time.Now())
    time.Sleep(time.Duration(a[0].(int)) * time.Second)
    log.Println("ended at", time.Now())
    return a[0], nil
}

transits := []*simple.Transit{
    simple.NewTransit("input",
       simple.WithInputs("input"),
       simple.WithOutputs("t11"),
       simple.WithWorker(worker)),
    simple.NewTransit("output", 
       simple.WithInputs("t11"),
       simple.WithOutputs("output"),
       simple.WithWorker(worker)),
}
```

`simple.WithInputs()` and `simple.WithOutputs()` are defined as follows:

```go
func WithInputs(names ...string) TransitOption
func WithOutputs(names ...string) TransitOption
```

The `names` parameter is the channel name. You need to ensure that each channel name only appears once in the input list and output list of all transits.

Those functions can be executed multiple times, regardless of the order. However, the content executed later will merge and overwrite the content executed first.

`WithWorker()` is used to specify the worker function for the current transit. Here is the definiton:

```go
func WithWorker(worker func(context.Context, ...any) (any, error)) TransitOption
```

The definition of the worker function is fixed to:

```go
func(ctx context.Context, a ...any) (any, error) {
    return ..., ...
}
```

The parameter `ctx` is the sub-Context of `context.Context` passed in each time it is executed. Therefore, when the superior `Context` stops, the `ctx` of the current worker will receive a stop signal.

The number of parameters `a` of the `worker` function and their actual types must correspond to the monitored channels. Therefore, the content of `WithInputs()` is order dependent. The parameter types of worker functions are all `any`, so you need to verify whether the parameters passed in meet expectations, unless the parameter type does not matter.

> Note that the order of parameters received by the `WithInputs()` determines the order of parameters sent to the worker. Therefore, calling the `WithInputs()` multiple times requires extra care that the channel name matches the parameters received by the worker. The `worker` also needs to verify the number of parameters received and the type of each parameter.

The return value types of `worker` functions are specified as `any` and `error`. If an error needs to be reported during execution, the second parameter should be set to a specific error. If the error is not `nil`, the entire execution of flow will be aborted.

The output of the `worker` function is fed to all specified output channels simultaneously, so the order of the output channel names does not matter.

> Go language supports the same channel to be monitored by multiple places. But we did not use this method. Instead, we used a one-to-one correspondence between channels and subsequent workers. The purpose of this is to facilitate the directed acyclic graph of the entire execution flow, topological sorting, sequential execution, logging and debugging.

If you think that a worker's error or panic will not affect subsequent execution, you can specify the `allowFailure` attribute for the transit to which it belongs. For example:

```go
worker := func(ctx context.Context, a ...any) (any, error) {
    log.Println("started at", time.Now())
    time.Sleep(time.Duration(a[0].(int)) * time.Second)
    log.Println("ended at", time.Now())
    return a[0], nil
}

transits := []*simple.Transit{
    simple.NewTransit("input",
       simple.WithInputs("input"),
       simple.WithOutputs("t11"),
       simple.WithWorker(worker)),
       simple.WithAllowFailure(true),
    simple.NewTransit("output", 
       simple.WithInputs("t11"),
       simple.WithOutputs("output"),
       simple.WithWorker(worker)),
}
```

When the first transit worker encounters an error, its error will not terminate the execution of the entire process. Its return value will also be injected into the specified channel. If the worker panics, `nil` will be injected into the specified channel.

> Note that the `WithAllowFailure()` can be executed multiple times, but only the last one will take effect.

> Note, try to avoid executing `panic()` in the worker. Otherwise, the entire workflow will be terminated if failures not allowed.

#### Distribute and import worker of transit

If you want to distribute your transit worker, you can declare the worker function as an exportable variable.

If the transit you are referencing comes from someone else, you can combine the imported workers into your own `NewTransit()` method.&#x20;

If you feel that `simple.Transit` does not meet your needs, you can customize the `Transit` structure and only need to implement all the methods specified by the `simple.TransitInterface`.

For example:

<pre class="language-go"><code class="lang-go">package main

import (
    "context"

    "github.com/rhosocial/go-dag/workflow/simple"
)

var ExternalTransitWorker = func(ctx context.Context, a ...any) (any, error) {
    return a[0], nil
}

func NewTransit(name string) simple.TransitInterface {
    return simple.NewTransit(name,
<strong>        simple.WithInputs("input"),
</strong><strong>        simple.WithOutputs("output"),
</strong>        simple.WithWorker(ExternalTransitWorker),
    )
}

type ExternalTransit struct {
    simple.TransitInterface
}
</code></pre>

When distributing, you need to indicate the input parameter list and their respective types and legal ranges, and indicate the corresponding output format.

Since it is impossible to know whether the input channel provided by the user meets the requirements during development, your worker needs to verify each input.

In addition, it is also necessary to tell the user what errors may be returned and the possible panic, so that the user can judge how to handle these errors, including whether they can be skipped.

If your worker needs to be put into production or distributed to others, it is strongly recommended to test the worker and disclose the test results to the user.

### Best Practices

transit channel:

* Each transit should have more than one input channel, otherwise the transit will never be called when the workflow executes.
* Each Transit does not need to specify an output channel, which means that the execution results of this transit will not be used as input to any other transits. However, the transit execution error will still affect the process.
* In a workflow, each channel can only be used as input and output once, and is listed in different transits and cannot be reused. Otherwise, unpredictable consequences may occur.

transit worker:

* You need to determine whether the number and type of parameters received by the worker are as expected. If it does not match, `ErrValueTypeMismatch` should be returned. It is better to check all parameter types at once and use `errors.Join()` to return.
* If your worker is time-consuming or supports cancellation, you should receive the incoming `ctx` and listen for cancellation notifications.
* The parameters processed by the worker are limited to those passed in through the channel. It is strongly not recommended to use global variables or pass them in through `ctx`, except for parameters that are limited to distinguishing requests (such as request id) and collecting logs generated during execution.

## Execute the workflow

If you want to execute the workflow repeatedly after executing it once, you need to call the `Execute()` method. For example:

```go
input := "input"
result := f.Execute(context.Background(), &input))
```

The first parameter is the context and supports `CancelCause`. That is, during the execution process, the context can notify execution abort.

The second parameter is the pointer to the initial input of the workflow. The type of this parameter must be consistent with the definition.

There is only one output of the function, which is the execution result. The type is consistent with the definition. In particular, `nil` is returned if execution aborts. Therefore, it is not recommended to define the return type as `any`, nor is it recommended to have a final return value of `nil`.

### Running only once

If you want to not to accept input after executing it once, you can call the `RunOnce()` method. For example:

```go
input := "input"
result := f.RunOnce(context.Background(), &input))
```

After destruction, an error will be reported if `Execute()` or `RunOnce()` is executed again.

### Cancel execution

If you want to cancel during execution, you can call the following method:

```go
f.Cancel(cause error)
```

If you don't want to provide a reason, you can pass `nil`, but this is strongly discouraged.

Note that after calling this method, all transits being executed will receive a termination signal. But `Execute()` and `RunOnce()` can still be called.

When canceled actively, or when execution is terminated due to an error or `panic()`, the return value of `Execute()` and `RunOnce()` is `nil`.

In addition, you can use context's cancellation function, for example:

```go
ctx1, cancelFunc := context.WithTimeoutCause(context.Background(), time.Millisecond*1500, errors.New("canceled by parent ctx"))
output := f.Execute(ctx1, &input)
```

The above code declares a context that expires in 1500 milliseconds. If the workflow execution does not end for more than 1500 milliseconds, it will be notified to stop immediately.

In fact, workflow supports any context that implements the `Done()` method. For details, please see <https://go.dev/blog/context>.

### Nested workflow

You can embed other workflow into worker function of a transit. For example:

```go
worker1 := func(ctx context.Context, a ...any) (any, error) {
	log.Println("started at", time.Now())
	time.Sleep(time.Duration(a[0].(int)) * time.Second)
	log.Println("ended at", time.Now())
	return a[0], nil
}
worker2 := func(ctx context.Context, a ...any) (any, error) {
	channelInputs1 := []string{"input"}
	channelOutputs1 := []string{"t11"}
	channelOutputs2 := []string{"output"}
	transits := []*simple.Transit{
		simple.NewTransit("i:input", simple.WithInputs(channelInputs1...), simple.WithOutputs(channelOutputs1...), simple.WithWorker(worker1)),
		simple.NewTransit("i:output", simple.WithInputs(channelOutputs1...), simple.WithOutputs(channelOutputs2...), simple.WithWorker(worker1)),
	}
	f1 := simple.NewWorkflow[int, int](
		simple.WithDefaultChannels[int, int](),
		simple.WithTransits[int, int](transits...),
		simple.WithLogger[int, int](simple.NewLogger()),
	)
	input := a[0].(int)
	output := f1.Execute(ctx, &input)
	return *output, nil
}
channelInputs1 := []string{"input"}
channelOutputs1 := []string{"t11"}
channelOutputs2 := []string{"t12"}
channelOutputs3 := []string{"output"}
transits := []*simple.Transit{
	simple.NewTransit("input",
		simple.WithInputs(channelInputs1...),
		simple.WithOutputs(channelOutputs1...),
		simple.WithWorker(worker1)),
	simple.NewTransit("transit",
		simple.WithInputs(channelOutputs1...),
		simple.WithOutputs(channelOutputs2...),
		simple.WithWorker(worker2)),
	simple.NewTransit("output",
		simple.WithInputs(channelOutputs2...),
		simple.WithOutputs(channelOutputs3...),
		simple.WithWorker(worker1)),
}
f := simple.NewWorkflow[int, int](
	simple.WithDefaultChannels[int, int](),
	simple.WithTransits[int, int](transits...),
	simple.WithLogger[int, int](simple.NewLogger()),
)
input := 1
output := f.Execute(context.Background(), &input)
log.Println(*output)
```

### Best Practices

executing workflow:

* It is not recommended to execute the same workflow multiple times asynchronously unless the workflow is linear. Otherwise, it may happen that the same transition is executed later but the result is returned first.
* If you want to ensure sequential execution, you need to wait for one execution to complete before continuing.
* If you want concurrency without destroying the order, you should use a worker pool and add a certain number of workflows for waiting calls.

nested workflow:

* The context of the nested workflow must be the context passed in by the current worker, or its sub-context.
* Will not report errors, or be able to capture errors and return them.
* Resources consumed can be estimated, including time, CPU, memory, etc.

## Use logger

`Workflow` support comes with a logger. Here is the definition:

```go
func WithLoggers[TInput, TOutput any](logger LoggerInterface) Option[TInput, TOutput]
```

`WithLogger()` can be passed in multiple times. All will be executed once, so please do not pass in too many to avoid affecting performance.

> Note, if you want to test the performance of each transit in the workflow, please do not turn on log output in the benchmark. If necessary, a limited amount of log output should be ensured, otherwise the log will explode.

We have prepared a default logger and error collector.

#### Log event and predefined events

Currently, the following log events will be encountered during workflow execution:

* **LogEventWorkflowStart**: when the workflow starts.
* **LogEventWorkflowEnd**:  when the workflow ends.
* **LogEventTransitInputReady**: when data is injected into the channel.
* **LogEventTransitOutputReady**: when the channel receives data.
* **LogEventTransitStart**: when the transit worker starts.
* **LogEventTransitEnd**: when the transit worker ends.
* **LogEventTransitCanceled**: when the transit is notified of cancellation.
* **LogEventTransitError**: when the transit worker reports an error.
* **LogEventTransitWorkerPanicked**: when the transit worker panicked.
* **LogEventErrorValueTypeMistmatch**: when a parameter received by the transit worker does not match expectations, or the final output result type does not match expectation.

### Default logger

We have prepared a default logger for debugging and tracing purposes: `simple.Logger`

Here is the definition:

```go
type Logger struct {
    // contains filtered or unexported fields
}
```

You can get a logger instance using the `NewLogger()` method. The default method can cover most situations.

```
logger := simple.NewLogger()
```

After the instantiation is completed, it can be passed in as a option for initializing the `Workflow`. For example:

<pre class="language-go"><code class="lang-go">logger := simple.NewLogger()
f := simple.NewWorkflow[string, string](
<strong>    simple.WithChannels[string, string]("input", "output", "t11"),
</strong>    simple.WithChannelInput[string, string]("input"),
    simple.WithChannelOutput[string, string]("output"),
    simple.WithTransits[string, string](transits...),
    simple.WithLoggers[string, string](logger),
)
</code></pre>

This default logger specifies four logging levels:

```go
const (
    LevelDebug LogLevel = iota
    LevelInfo
    LevelWarning
    LevelError
)
```

By default, only `LevelWarning` and `LeverError` are displayed. If you want to display `LevelDebug` and above, you need to set the flag after declaring the logger:

```go
logger.SetFlags(simple.LDebugEnabled)
```

Currently, `LevelDebug` is tracked before the worker starts and after successful execution. If an error occurs in the worker execution returned, or the execution is notified to be canceled, `LevelWarning` is tracked.  If the worker is panicked, `LevelError` is tracked. When the log level is warning, the background color is yellow; when the log level is error, the background color is red. For example:

<figure><img src="https://1486875518-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Foyezdgt4RCgef0Y2fFzx%2Fuploads%2FNuEInANpOBNGeL7ueIES%2Fimage.png?alt=media&#x26;token=26a566e2-4506-4a3d-acdc-663a7667c399" alt=""><figcaption><p>Use Logger</p></figcaption></figure>

### Error collector

Workflows may cause errors, but the errors will not eventually return. We provide an error collector to collect errors during execution.

You can instantiate an error collector using `simple.NewErrorCollector()`. For example:

```go
errorCollector := simple.NewErrorCollector()
go errorCollector.Listen(context.Background())
```

The second statement of the above code starts a listener.

> Note that
>
> 1. the listener must be started in asynchronous mode, otherwise it will always be blocked here.
> 2. The context of the `Listen()` are independent of the context of the `Workflow.Execute()` or `Workflow.RunOnce()`. The two can be the same or different.

Next, pass in the error collector when instantiating the `Workflow`. For example:

```go
f := simple.NewWorkflow[string, string](
    simple.WithChannels[string, string]("input", "output", "t11"),
    simple.WithChannelInput[string, string]("input"),
    simple.WithChannelOutput[string, string]("output"),
    simple.WithTransits[string, string](transits...),
    simple.WithLoggers[string, string](logger, errorCollector),
)
```

You can get all collected errors using the `Get()`. For example:

<pre class="language-go"><code class="lang-go"><strong>errors := errorCollector.Get()
</strong></code></pre>

The return value is an error list, and the order of elements is based on the time order in which the errors were collected.

### Customize Logger

If you want to implement the logger yourself, you can declare a struct and implement all methods of `LoggerInterface`. Here is the definition of `LoggerInterface`:

```go
type LoggerInterface interface {
    // Log an event.
    Log(ctx context.Context, events ...LogEventInterface)
    SetFlags(uint)
}
```

The `Log()` method parameter events is passed in log events. Here is the definition of `LogEventInterface`:

```go
type LogEventInterface interface {
    Name() string
    Level() LogLevel
    Message() string
}
```

Among them, the return value of `Name()` is recommended to be the name of the transit where the log occurs. `LogLevel()` returns the log level. `Message()` returns the log message, but it is not recommended to be too long.

## Benchmark

```bash
go test -race -v ./... -bench . -run ^$ -count 10 -benchmem
goos: linux
goarch: amd64
pkg: github.com/rhosocial/go-dag/workflow
cpu: AMD EPYC 7763 64-Core Processor       
```

### Two parallel transits

Source: <https://github.com/rhosocial/go-dag/blob/v1.0.0/workflow/simple/dag_test.go#L185>

Here is the workflow:

<figure><img src="https://1486875518-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2Foyezdgt4RCgef0Y2fFzx%2Fuploads%2F3ifA40P3QhSJcFG37jhn%2Fimage.png?alt=media&#x26;token=2d842ba9-b733-4104-bdd9-6ede317376f7" alt=""><figcaption></figcaption></figure>

```
2024-01-27T06:05:00.4489512Z BenchmarkWorkflowTwoParallelTransits
2024-01-27T06:05:00.4493938Z BenchmarkWorkflowTwoParallelTransits/run_successfully
2024-01-27T06:05:00.4571447Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007029 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4633008Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007023 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4693155Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0006692 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4753693Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007548 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4811466Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0006532 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4868911Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005953 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4930640Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0007030 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.4990657Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005967 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.5043634Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005634 ns/op	       0 B/op	       0 allocs/op
2024-01-27T06:05:00.5097907Z BenchmarkWorkflowTwoParallelTransits/run_successfully-4         	1000000000	         0.0005825 ns/op	       0 B/op	       0 allocs/op
```

> The above benchmark test results are only the results given by GitHub, and there is no guarantee that any environment can achieve the same efficiency.

## Best Practises

Dividing a complex task into subtasks requires careful consideration to ensure clear input and output, accurate resource estimation, and error impact assessment. Here's a breakdown of the process:

1. **Task Identification**: Identify the overarching complex task and break it down into smaller, manageable subtasks.
2. **Subtask Definition**:
   * Define each subtask with clear input and output parameters. Ensure that inputs and outputs are well-defined and unambiguous.
   * Estimate the resource consumption (such as CPU, memory, disk space, etc.) for each subtask accurately to allocate resources efficiently.
3. **Subtask Process**:
   * Clearly define the process for each subtask, including all necessary steps and operations.
   * Consider the potential impact of errors on the workflow for each subtask. Evaluate how errors may propagate through the workflow and affect subsequent subtasks.
4. **Execution Order**:
   * If the workflow is non-linear, it's recommended to complete each execution before proceeding to the next one. This helps prevent delays caused by time-consuming executions.
   * If execution order is not critical and subtasks do not generate exceptions or cancel the execution process, concurrent execution may commence before a previous execution completes. However, ensure that execution contexts are managed properly to avoid data corruption or conflicts.
   * Consider creating a new workflow instance for each execution to ensure isolation and prevent context overwriting, especially for publicly published transits.
5. **Context Management**:
   * Utilize `context.Context` to pass global values if necessary. However, exercise caution, especially for transits intended for public use. Over-reliance on global context may lead to unexpected behavior and dependencies.

By following these steps, you can effectively divide a complex task into clear and manageable subtasks, ensuring efficient execution and error handling within the workflow.

### Test worker and workflow

Testing should include:

1. The worker needs to verify each input. Illegal input can report an error in time.
2. The worker needs to verify the output of typical input.
3. The worker needs to count the resource consumption of typical inputs, including time, CPU, memory, network (if any), etc.
4. The workflow needs to test a variety of typical inputs and count resource consumption, critical paths during execution, etc. If necessary, the worker developers on the critical path can be notified to improve efficiency or use other solutions.

## Troubleshooting

#### Possible behavior when some parameters are abnormal. You can raise issues on the GitHub issues page. If a problem is encountered frequently, it will be updated here.

<https://github.com/rhosocial/go-dag/issues>
