Airflow
Bigeye metrics can be run as part of your Airflow DAG. This is done by creating an extra task after a table has been loaded to run all the checks on the table. If you want to include your Bigeye metrics as part of your ETL definition, you can also use the operators to create or update the metric definitions.
Getting started
You can use our plugin by referencing our PyPi project in your requirements.txt
by adding the following line:
bigeye-airflow
In order to communicate with Bigeye, you will have to define an HTTP connection in Airflow; your login and password are the same as you use to log into the Bigeye UI:
All operations need to be run on a specific warehouse and you will need to provide the warehouse id. The easiest way to find the warehouse id is to log into the Bigeye UI and navigate to a warehouse of interest; the ID is 446 in the screenshot below:
Run metrics on a table
You can use the RunMetricsOperator
to run all the metrics defined on a table. This operator will throw a ValueError
if it fails and will produce logs about how many and which metrics failed.
In order to run all the metrics on the users
table in the analytics
schema from the warehouse seen above and using the bigeye_connection
connection that we've defined, you can build the operator as follows:
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
....
run_metrics = RunMetricsOperator(
task_id='run_bigeye_metrics',
connection_id='bigeye_connection',
warehouse_id=446,
schema_name="analytics",
table_name="users",
dag=dag
)
Run metrics in a collection
In order to run all the metrics for critical collections and using the bigeye_connection
connection that we've defined, you can build the operator as follows:
from bigeye_airflow.operators.run_metrics_operator import RunMetricsOperator
....
run_metrics_by_collection = RunMetricsOperator(
task_id='run_metrics_by_collection',
connection_id=BIGEYE_CONNECTION,
workspace_id=BIGEYE_WORKSPACE_ID,
collection_id=3602,
circuit_breaker_mode=True,
dag=dag
)
Circuit breaker option
Notice in the example above the
circuit_breaker_mode
option has been set to True. Doing so will raise an exception and stop the DAG if any of the metric runs return an alerting status. By default, this is set to False and all metrics and their statuses will be returned.
Send dbt Core job run information
You can use the BigeyeDbtCoreOperator
to send information to Bigeye about the results of dbt Core job runs. Add the following to your DAG. The url
properties are optional, and when the DAG runs, these URLs (if present) will be added to the catalog pages for the tables affected by the dbt Core job.
from bigeye_airflow.operators.bigeye_dbt_operator import BigeyeDbtCoreOperator
....
BigeyeDbtCoreOperator(
task_id="dbt_core_sync",
connection_id="bigeye_connection_in_airflow",
workspace_id="<bigeye_workspace_id>",
target_path="path_to/target",
job_run_url="<url_for_your_job_run>",
job_url="<url_for_your_job>",
project_url="<url_for_your_project>",
dag=dag
)
Updated about 2 months ago