Orchestrating ELT in Airflow: Catching every sync with XCOM

Eliminate hanging sensors with new Fivetran Airflow provider release.
July 6, 2022

The Fivetran Airflow provider includes a FivetranOperator, which starts a Fivetran sync and a FivetranSensor, which monitors a connector until it is finished syncing data. With Airflow, it’s easy to program a FivetranSensor to start only after a FivetranOperator has successfully run for a connector. However, Airflow provides no guarantees that a FivetranSensor will run immediately after a FivetranOperator, which can cause issues. In this post, we’ll describe those issues and demonstrate how they are being resolved in the new version of the Fivetran Airflow provider.

Sensors missing syncs in Airflow 

Well over 600,000 Fivetran data syncs have run in Airflow since the Fivetran Airflow provider launched in early 2021. Rarely have we seen issues in which a FivetranSensor will keep running and time out after a connector has already finished moving data. But we’ve identified two potential reasons when and why this happens. 

First, short running syncs. Some Fivetran connectors take just seconds to move data from its source to destination. If this happens, a FivetranSensor may not start until after the connector it is monitoring has already finished syncing data. 

The other reason has to do with Airflow’s limited resources. Some Airflow deployments have a finite number of workers, which are containers that perform the tasks that make up DAGs. If many DAGs are running in parallel, every worker slot may be occupied, and Airflow’s scheduler will queue upcoming tasks until a worker slot is available. If a FivetranSensor is placed in a queued state by a scheduler, a connector’s sync may finish before the FivetranSensor begins running. 

In either case, data syncs that a FivetranSensor is supposed to monitor finish before the FivetranSensor can start. One of the very first things that a FivetranSensor does is collect the timestamp of the connector’s previously completed data sync from a connector’s metadata. A FivetranSensor will then monitor this field until Fivetran updates it, signaling the completion of a new data sync. If a connector’s metadata has already been updated before a FivetranSensor starts, which is the case in the examples illustrated above, the sensor will either time out or hang until the connector’s next data sync is started. Both situations can be avoided by utilizing another Airflow mechanism: XCOM.

Ensuring successful sensors with XCOM

By default, every Task in Airflow is entirely independent and isolated. This allows separate tasks to easily be composed together into data pipelines and ensure that any Task can run on any machine, at any time, in any order. 

XCOMs in Airflow do the opposite: Tying tasks together by allowing them to pass key value pairs between each other. 

By default, Airflow operators will store their return values with XCOM in a key named default_value, but other key value pairs can be added with the function xcom_push. In previous versions of the Fivetran Airflow provider, a FivetranOperator would store the API response generated by sending a request to Fivetran to start a connector sync (an example is below). 

 Upper left, FivetranOperator’s old XCOM value. Lower left, FivetranOperator’s new XCOM value. Blue, delaying the start of a FivetranSensor until after a sync is finished, which might have caused issues in previous versions of the Fivetran Airflow provider. Green, pulling the XCOM value from FivetranOpertor to FivetranSensor to ensure those issues do not happen.

The new version of the Fivetran Airflow provider has a FivetranOperator store a timestamp of its connector’s previously completed sync in XCOM, which can be used by a FivetranSensor by calling xcom_pull. This ensures that the previously completed timestamp will be correct, and a FivetranSensor will return at the appropriate time, even if the FivetranSensor does not start until after the sync that it is supposed to monitor has already completed.

Downloading the new Fivetran Airflow provider

The latest version of the Fivetran Airflow provider can be added to your airflow environment with the command pip install -u airflow-provider-fivetran. The version of the provider should be >= 1.1.1. When installed, this example DAG, called example_fivetran_xcom.py, demonstrates how to use the newly added functionality to ensure that a FivetranSensor is always monitoring the sync that was started by a previous FivetranOperator. Any DAGs using previous versions of the provider will be able to run without changing them. 

If you have any questions about the new version of the Fivetran Airflow provider, please email us at devrel@fivetran.com.

Start for free

Join the thousands of companies using Fivetran to centralize and transform their data.

Thank you! Your submission has been received!
Oops! Something went wrong while submitting the form.