Composing Pipelines
  • 01 Jan 2024
  • Dark
    Light
  • PDF

Composing Pipelines

  • Dark
    Light
  • PDF

Article Summary

Placing Nodes

To compose a pipeline, drag and drop nodes onto the canvas and connect them by dragging the output port of one node to the input port of the next node.
Clicking on a node output port and releasing it will create an instant connection with the closest input port available.

Canvas Navigation:

  • Left-click and hold on to any node to be able to drag it around the canvas.
  • Right-click and hold on to the canvas to be able to drag the entire canvas.

Pipeline Starting/Entrance Point

The starting icon image.png will appear next to the first node you place on the canvas. This icon can be dragged and placed on any node to mark it as the starting point of the pipeline.

When triggering data into a pipeline (for example, from the dataset-browser), the data enters the pipeline at the node set with the starting point.


Node Inputs

For providing input to a pipeline node, you have three available methods:

  1. Node-to-Node Connection
    Form a linkage between the nodes. The input to the subsequent node can be derived from the output of the connected predecessor node. This connection allows the input to be transferred and processed during runtime.
  2. Static Input
    Establish a static input by assigning a fixed value directly to the node input. Note that this value remains immutable during runtime, ensuring consistency.
  3. Dynamic Input (variables)
    Facilitate a dynamic input by assigning a variable directly to the node input. This variable offers flexibility as it can be updated at any point during runtime. This provides the ability to alter and adapt processing based on real-time conditions or changes in data.
Node-to-Node Connection

Connections can only be made between compatible nodes:

  • An image item, for example, cannot be passed to a function node that deals with annotations. In such a case, the item should be passed to a function that extracts the annotations, which then can be passed to the function that deals with annotations.
  • The type of output being passed by a node is determined by the event that triggers the action – for example, if an asset is triggered by an item.completed event/status, the asset will be a typed item. If the asset is triggered by annotation.created, it will be of type annotation.
  • The Dataloop default nodes have their asset type presets, whereas functions inherit their asset type from function input/output parameters.

Static Input

Static inputs are useful for setting a predetermined or constant value for a pipeline node. The value will remain constant throughout the runtime of the pipeline, providing a consistent input for the node's executions.

To set a static input for a pipeline node:

  1. Select Node: Open the Config tab in the side panel of the node.
  2. Set Parameter: In the relevant input, click on the Set Parameter button.
  1. Choose Fixed Value: In the dialog that will open, choose the Fixed value.
  1. Input Value: Type the desired value. For entity type inputs, set the entity ID as a value (for example, dataset.id, task.id, etc.).


Notes
  • Incompatibility with Connections: A fixed value and a node-to-node connection cannot be assigned simultaneously for the same input.
    • Visual Confirmation: Once the static input value is set, an indicator will be displayed on the canvas confirming the successful setup.

Dynamic Input: Variables

Pipeline variables enable setting and managing dynamic parameters within a pipeline. The variables can be accessed in multiple pipeline nodes simultaneously and can be changed during runtime.


Setting Variables as Node Input

To set a variable as a default input for a pipeline node:

  1. Select Node: Open the "Config" tab in the side panel of the node.
  2. Set Parameter: In the relevant input, click on the Set parameter button.
  1. Choose Variable: In the dialog that will open, choose Variable.
  1. Set variable: Please select the desired variable from the dropdown menu. It's important to note that only variables with a type that matches the input type will be visible in the dropdown list.


Adding New Variable

To set create a new variable:

  1. Click on the variables button on the Pipeline Header.
  1. Click on Add New Variable, or Manage Variables.
  2. The "Manage variables" dialog will get open.
  1. Fill in the required fields for the variable: Name, Type, Value, Description (optional).
    1. All fields are mandatory but the description.
    2. Variable Type should match the input type.
  2. Save changes.


Updating Variable Values

Unlike fixed parameters, the values of pipeline variables can be updated during the execution of the pipeline. When a variable's value is updated, it is automatically propagated to all nodes that use the variable.

Variables can be updated via:

  1. SDK: using a FaaS or a Code node.
  2. Using the Update Variable node.
Updating variable for existing executions

Only pipeline executions with the status Created will be affected when updating a variable value. Executions with the status "In Progress" will remain unchanged.


Editing/Deleting Variable

  • Deleting variables that are used as pipeline node input is impossible
  • Editing Variable name/type for existing variables is impossible.

Undo/Redo Pipeline Editing Steps

While structuring the pipeline and making adjustments, you can use the Undo/Redo to trace back editing steps back and forth as needed, without having to manually change back the pipeline configuration. This includes nodes added or removed, nodes connectivity, and nodes settings.

Undo/Redo does not provide traceability to code changes in code nodes. Such manual changes to the code are currently not versioned.


Filtering Data between Nodes

Hover over a connection between nodes and click the + sign to add a filter. Adding a filter means that only data assets (i.e., items or annotations) that comply with the filter condition will be passed onto the next pipeline node.

Filters can be selected from previously selected filters (saved in the Dataset browser) or written directly into the DQL editor.

For example, the following filter will only pass items whose "size" attribute in the "system" in the "metadata" is less than 1MB:

Notice that unlike the DQL editor in the Dataset Browser, in the pipeline DQL editor you do not need to include the attributes you filter by within a "filter" property.

The DQL property JOIN is not supported in the pipeline DQL filter.

Remove a filter by hovering over a connection and clicking the X icon (this will sever the connection between the output/input points, and you will need to reconnect the nodes). Alternatively, you remove the filter by setting the filter to an empty JSON { }.


Triggering Data in Nodes

Triggers affect the node in which they are set and initiate the flow of data from one node to another. Triggers can be of type Cron or Event.

  • Cron triggers initiate the flow of data based on a time interval (e.g., every hour, day, etc.) or at a specific time set by the cron expression. Read more about setting cron expressions.

    Cron triggers work with nodes that either do not receive input or have a default input value.

    An example of using a cron trigger in a node is a FaaS node that trains a model once a day; this node does not have inputs, but simply runs a service once a day to train the model. Similarly, you may create a Code node that will run as defined by the cron trigger, provided that the Code node either has a default input value or does not receive input.

  • Event triggers refer to events in the data source and can be set to Created, Updated, and/or Deleted. For example, if the trigger in a task node is set to Created and Updated, data will flow into the task node whenever an item is added (created) or updated in the source dataset.

The pipeline’s composition must be complete, all nodes must be properly connected, before running the pipeline.

Be sure to click the save icon image.png to save your changes before navigating to another page.


Trigger Execution Mode

Some events, such as item updates, can happen more than once on the same entity. Trigger execution mode defines those repeating events that will trigger the service every time they happen, or only on the first time they happen.

  • Once - the function will only run once when triggered. For instance, for an "item" resource and an "Updated" action, the function will only work on the first updated item.

  • Always - the function will run each time when triggered. For instance, for an "item" resource and an "Updated" action, the function will run for every updated item. Note that in this case, it will cause all the pipelines to run.


What's Next