Composing Pipelines
  • 23 May 2023
  • Dark
    Light
  • PDF

Composing Pipelines

  • Dark
    Light
  • PDF

Article Summary

Placing Nodes

To compose a pipeline, drag and drop nodes onto the canvas and connect them by dragging the output port of one node to the input port of the next node.
Clicking on a node output port and releasing will create an instant connection with the closest input port available.

Canvas Navigation:

  • Left click and hold on any node to be able to drag it around the canvas.
  • Right click and hold on the canvas to be able to drag the entire canvas.

Pipeline Starting/Entrance Point

The starting icon image.png will appear next to the first node you place on the canvas. This icon can be dragged and placed on any node to mark it as the starting point of the pipeline.

When triggering data into a pipeline (for example, from the dataset-browser), the data enters the pipeline at the node set with the starting point.

Connecting Nodes

Connections can only be made between compatible nodes:

An image item, for example, cannot be passed to a function node that deals with annotations. In such a case, the item should be passed to a function that extracts the annotations, which then can be passed to the function that deals with annotations.

The type of output being passed by a node is determined by the event that triggers the action – for example, if an asset is triggered by item.completed event/status, the asset will be of type item. If the asset is triggered by annotation.created, it will be of type annotation.

The Dataloop default nodes have their own asset type presets, whereas functions inherit their asset type from function input/output parameters.

Undo/Redo Pipeline Editing Steps

While structuring the pipeline and making adjustments, you can use the Undo/Redo to trace back editing steps back and forth as needed, with having to manually change back the pipeline configuration. This includes nodes added or removed, nodes connectivity and nodes settings.

Undo/Redo does not provide traceability to code changes in code-nodes. Such manual changes to the code are currently not versioned.

Filtering Data between Nodes

Hover over a connection between nodes and click the + sign to add a filter to it. Adding a filter means that only data assets (i.e., items or annotations) that comply with the filter condition will be passed onto the next pipeline node.

Filters can be selected from previously selected filters (saved in the Dataset browser) or written directly into the DQL editor.

For example, the following filter will only pass items whose "size" attribute in the "system" in the "metedata" is less than 1MB:
image.png

Notice that unlike the DQL editor in the Dataset Browser, in the pipeline DQL editor you do not need to include the attributes you filter by within a "filter" property.

The DQL property JOIN is not supported in the pipeline DQL filter.

Remove a filter by hovering over a connection and clicking the X icon (this will sever the connection between the output/input points, and you will need to reconnect the nodes). Alternatively, you remove the filter by setting the filter to an empty json { }.

pipeFilter.gif

Triggering Data in Nodes

Triggers affect the node in which they are set and initiate the flow of data from one node to another. Triggers can be of type Cron or Event.

  • Cron triggers initiate the flow of data based on a time interval (e.g., every hour, day, etc.) or at a specific time set by the cron expression. Read more about setting cron expressions.

    Cron triggers work with nodes that either do not receive input or have a default input value.

    An example of using a cron trigger in a node is a FaaS node that trains a model once a day; this node does not have inputs but simply runs a service once a day to train the model. Similarly, you may create a Code node that will run as defined by the cron trigger, provided that the Code node either has a default input value or does not receive an input.

  • Event triggers refer to events in the data source and can be set to Created, Updated, and/or Deleted.  For example, if the trigger in a task node is set to Created and Updated, data will flow into the task node whenever an item is added (created) or updated in the source dataset.

The pipeline’s composition must be complete, all nodes must be properly connected, before running the pipeline.

Be sure to click the save icon image.png to save your changes before navigating to another page.

Trigger Execution Mode

Some events, such as item updates, can happen more than once on the same entity. Trigger execution mode defines those repeating events that will trigger the service every time they happen, or only on the first time they happen.

  • Once - the function will only run once when triggered. For instance, for an "item" resource and an "Updated" action, the function will only work on the first updated item.

  • Always - the function will run each time when triggered. For instance, for an "item" resource and an "Updated" action, the function will run for every updated item. Note that in this case, it will cause all the pipelines to run.

pipeline execution mode

Checkpoint for Task/Assignment Completion

You have the option to create a temporary checkpoint between a Workflow node (task) and its next node, which will remain until the task or assignment managed by the node is completed. This will cause the execution of the subsequent node in the cycle to be put on hold until the completion event occurs.

To use this functionality, follow these steps:

  1. Add a Workflow node to the canvas and configure the task as desired.

  2. Create another output on the Workflow node you just added, specifying its type as either a task or an assignment (according to the checkpoint you wish to create).

  3. Connect the output of the Task/Assignment to any Task/Assignment input in the nodes you wish to delay the running of until the Task/Assignment is complete. You will see a dotted line indicating the connection between the nodes.
    Task/Assignment inputs can be set on Code snippet nodes and FaaS nodes.

  4. By setting the above, every cycle created in the pipeline will pause at the Workflow node, waiting for the completion of the Task/Assignment before executing the subsequent node. The cycle's status will remain as "In Progress".

  5. Keep in mind that this functionality alters the pipeline context from running on your data (items) to running on the task/assignment. The items can extracted from the task/assignment context.

Dotted Line Connection
The dotted line connecting the nodes also indicates that a new cycle will be initiated with a task/assignment context once the subsequent node is triggered by the completion event.

Checkpoint Example

In the following example, the initial node (workflow) receives items as inputs, a pipeline cycle will be generated for each item. Upon node execution, the item is passed to the task created by the node. The code snippet node is triggered only after all of the task items were given a status by the task assignees, and an event is fired. A new pipeline cycle is created to indicate this special flow with a new context.

image.png

There are several approaches to sending data into a pipeline. The right approach for you may depend on your production/research needs.


What's Next