Python SDK

Overview

The Bigeye python SDK is a collection of protobuf generated code, functions, and models used to interact programmatically with the Bigeye API.

Installation

The bigeye-sdk package is hosted on PyPi and can be installed via the following command. If you have already installed our CLI, then the SDK will already be installed as a dependency.

pip install bigeye-sdk

Authentication

The main entry point of the Bigeye SDK is the DatawatchClient abstraction and, in this core package, a basic auth client has been implemented. The abstract base class includes all core functionality and methods to interact with the Bigeye API.

Here are some examples on how to authenticate with the DatawatchClient within the Bigeye SDK.

Using a credential file

The CLI utilizes a credentials.ini file for authentication that by default is saved at ~/.bigeye/credentials. This can be easily generated by running the bigeye configure command of the CLI. Here is a sample of what that file should look like for basic setups.

Sample credentials file:

[DEFAULT]
base_url = https://app.bigeye.com
user = [email protected]
password = fakepassword1234

You can then use that same file to authenticate with the SDK.

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory


api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
client = datawatch_client_factory(api_auth, workspace_id=153)

📘

Workspaces

The workspace ID is only required to be passed to the DatawatchClient if your Bigeye user has access to more than 1 workspace. If not, then we will automatically default to the only workspace that you have access to.

Using AWS secrets

To help with more production scenarios, we also support the ability to authenticate via AWS secrets manager. First, create your AWS secret with the following properties.

{
    "base_url": "https://app.bigeye.com",
    "user": "fakeuser",
    "password": "fakepassword123"
}

Once that secret is saved in AWS secrets manager you can then utilize it in the SDK to authenticate. This also requires that you have the AWS CLI configured and authenticated in the same environment you are running this script.

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory

api_auth = ApiAuth.load_cred_from_aws_secret(region_name="us-west-2", secret_name="bigeye-docs-sdk-credentials")
client = datawatch_client_factory(api_auth, workspace_id=153)

Using base64 encoded

As another alternative, you can also encode a JSON string containing the necessary properties and pass that into the SDK to authenticate. See example below.

import base64
import json

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory

creds = json.dumps({
    "base_url": "https://app.bigeye.com",
    "user": "fakeusername",
    "password": "fakepassword123"
})

encoded_creds = base64.b64encode(creds.encode('utf-8'))
api_auth = ApiAuth.load_from_base64(encoded_creds)
client = datawatch_client_factory(api_auth, workspace_id=153)

Examples

Once authenticated, the Bigeye SDK has access to all API calls available within Bigeye. Here are some common use cases and examples of how to leverage the Bigeye SDK to help automate certain workflows within Bigeye.

Batch run metrics

One common use case is when customers have a set of Bigeye metrics that they would like to be run at a certain time. They can use the SDK to trigger metric runs for a particular set of metrics and then perform subsequent actions based on the outcomes of those metric runs. For example, if metrics are run and deemed to be unhealthy you could implement a circuit breaking process. In the code snippet below, we run metrics that are grouped together in a collection.

from typing import List
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import MetricInfo, MetricStatus
from bigeye_sdk.log import get_logger

log = get_logger(__file__)

COLLECTION_NAME = 'Demo collection'

def batch_run_metrics():
    
    # authenticate with Bigeye
    api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
    client = datawatch_client_factory(api_auth, workspace_id=153)

    # get all metrics ids from a particular collection
    metric_ids = [
      c.metric_ids for c in client.get_collections().collections 
      if c.name == COLLECTION_NAME
    ][0]
    
    # run metrics async
    run_metrics: List[MetricInfo] = client.run_metric_batch_async(metric_ids=metric_ids)

    # Check metric run results
    for m in run_metrics:
        if m.metric_metadata.current_metric_status != MetricStatus.METRIC_STATUS_HEALTHY:
            log.info(f"metric {m.metric_configuration.id} is unhealthy")


if __name__ == '__main__':
    batch_run_metrics()

Fetch schema changes

Bigeye automatically indexes all connected data sources every 24 hours. With this data, Bigeye is able to list out all schema changes (additions, deletions, and data type changes) that have been detected on these data sources. In some scenarios, you may want to schedule subsequent notifications or tasks based on the schema changes detected.

import datetime 
import dateutil.relativedelta

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.log import get_logger

log = get_logger(__file__)


def fetch_schema_changes():
    
    # authenticate with Bigeye
    api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
    client = datawatch_client_factory(api_auth, workspace_id=153)

    # get date exactly 1 month ago
    one_month_ago = (
      datetime.datetime.now() + 
      dateutil.relativedelta.relativedelta(months=-1)
    ).timestamp()

    # fetch schema changes
    schema_changes = [
      sc for sc in client.fetch_schema_changes(source_id=299) 
      if sc.detected_at > one_month_ago
    ]
    
    # take action based on all schema changes in last month
    for change in schema_changes:
        log.debug(change)
        

if __name__ == '__main__':
    fetch_schema_changes()

Get all metrics

In some cases, you just want to get a list of all the metrics that are currently configured in your Bigeye workspace. Here is an example that gets all metrics that a user has access to.

from typing import List

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import MetricInfo
from bigeye_sdk.log import get_logger

log = get_logger(__file__)


def get_all_metrics():
    
    # authenticate with Bigeye
    api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
    client = datawatch_client_factory(api_auth, workspace_id=153)

    # get list of source Ids that user has access to
    source_ids = [s.id for s in client.get_sources().sources]

    # get all metrics for all sources
    metrics: List[MetricInfo] = client.get_metric_info_batch_post(warehouse_ids=source_ids).metrics

    for m in metrics:
        log.info(m)


if __name__ == '__main__':
    get_all_metrics()

Manage Custom Rules

You can also manage all of your custom rules via our SDK using the examples below.

Create new rules

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import CustomRulesThresholdType


def create_rule():
    # authenticate with Bigeye
    api_auth = ApiAuth.load()
    client = datawatch_client_factory(api_auth, workspace_id=153)

    client.upsert_custom_rule(
        warehouse_id=1801,
        name="test sdk rule 3",
        sql="select * from tooy_demo.prod_repl.orders where price_per_unit <= 0",
        threshold_type=CustomRulesThresholdType.CUSTOM_RULES_THRESHOLD_TYPE_COUNT,
        upper_threshold=0
    )


if __name__ == '__main__':
    create_rule()

Bulk run rules for a source

from operator import attrgetter

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import GetCustomRuleListResponse, MetricStatus
from bigeye_sdk.log import get_logger

log = get_logger(__file__)


def bulk_run_rules():
    # authenticate with Bigeye either via credential file or via aws secret
    api_auth = ApiAuth.load()
    client = datawatch_client_factory(api_auth, workspace_id=153)

    rules: GetCustomRuleListResponse = client.get_rules(warehouse_id=1801)

    run_rules = client.bulk_run_rules(rule_ids=[r.id for r in rules.custom_rules])

    for r in run_rules.failed_updates:
        log.error(f"Rule id {r} failed to update.")

    for r in run_rules.not_run_ids:
        log.warning(f"Rule id {r} was not run.")

    successfully_run_rules = [r for r in client.get_rules(warehouse_id=1801).custom_rules if r.id in run_rules.successful_ids]
    for r in successfully_run_rules:
        latest_run = max(r.latest_runs, key=attrgetter('run_at_epoch_seconds'))
        if latest_run.summary_status != MetricStatus.METRIC_STATUS_HEALTHY:
            log.info(f"Rule is unhealthy with status of {latest_run.summary_status.name}")
        else:
            log.info(f"Rule refresh completed with healthy status.")


if __name__ == '__main__':
    bulk_run_rules()

Rebuild Sources

Another common use case is to trigger Bigeye's indexing process after certain jobs complete, or prior to metric runs. This example will trigger indexing for every source that the user has access to. This can also be done for particular sources.

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import GetSourceListResponse

def rebuild_sources():
    
    # authenticate with Bigeye
    api_auth = ApiAuth.load_cred_from_aws_secret(region_name="us-west-2", 
                                                 secret_name="bigeye-docs-sdk-credentials")
    client = datawatch_client_factory(api_auth, workspace_id=153)

    # rebuild all sources
    sources_response: GetSourceListResponse = client.get_sources()
    for source in sources_response.sources:
        client.rebuild_source(source_id=source.id)

if __name__ == '__main__':
    rebuild_sources()

Dynamic column level lineage

Bigeye has a lot of features to automatically detect column level lineage between sources. In edge cases where this is not possible these scripts can help automate the generating of column level lineage. The first example requires that the columns in the upstream and downstream tables have the exact same names (not case sensitive).

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.controller.lineage_controller import LineageController
from bigeye_sdk.client.datawatch_client import datawatch_client_factory


UPSTREAM_FQ_TABLE_NAME = 'Postgres.tooy_demo_db.prod.customer'
DOWNSTREAM_FQ_TABLE_NAME = 'PROD_WAREHOUSE.TOOY_DEMO.PROD_REPL.CUSTOMER'


def cll_from_fq_table_name():
    
    # authenticate with Bigeye
    api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
    client = datawatch_client_factory(api_auth, workspace_id=153)
    controller = LineageController(client)

    # get bigeye tables based on fully qualified table names
    upstream_table = controller.get_tables_from_selector(selector=UPSTREAM_FQ_TABLE_NAME)[0]
    downstream_table = controller.get_tables_from_selector(selector=DOWNSTREAM_FQ_TABLE_NAME)[0]

    # column names must match across tables
    controller.infer_column_level_lineage_from_tables(tables=[(upstream_table, downstream_table)])


if __name__ == '__main__':
    cll_from_fq_table_name()

In this next example, the column names do not match up between the upstream and downstream tables, but we are still able to generate the column level lineage.

from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.controller.lineage_controller import LineageController
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import ColumnNamePair, DataNodeType


UPSTREAM_FQ_TABLE_NAME = 'Postgres.tooy_demo_db.prod.orders'
DOWNSTREAM_FQ_TABLE_NAME = 'PROD_WAREHOUSE.TOOY_DEMO.PROD_REPL.ORDERS'

COLUMN_MAPPINGS = [
    ColumnNamePair(
        source_column_name="id",
        target_column_name="ORDER_ID"
    ),
    ColumnNamePair(
        source_column_name="c_id",
        target_column_name="CUSTOMER_ID"
    )
]


def cll_custom_column_mappings():
    
    # authenticate with Bigeye
    api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
    client = datawatch_client_factory(api_auth, workspace_id=153)
    controller = LineageController(client)

    # get bigeye tables based on fully qualified table names
    upstream_table = controller.get_tables_from_selector(selector=UPSTREAM_FQ_TABLE_NAME)[0]
    downstream_table = controller.get_tables_from_selector(selector=DOWNSTREAM_FQ_TABLE_NAME)[0]

    # get upstream and downstream columns indexed by name
    upstream_columns_ix_by_name = {
        c.column.name: c.column 
        for c in client.get_columns(column_ids=[c.id for c in upstream_table.columns]).columns
    }
    downstream_columns_ix_by_name = {
        c.column.name: c.column 
        for c in client.get_columns(column_ids=[c.id for c in downstream_table.columns]).columns
    }

    # establish column level lineage
    for cm in COLUMN_MAPPINGS:
        controller.create_edges(
            upstream=upstream_columns_ix_by_name[cm.source_column_name],
            downstream=downstream_columns_ix_by_name[cm.target_column_name],
            node_type=DataNodeType.DATA_NODE_TYPE_COLUMN
        )


if __name__ == '__main__':
    cll_custom_column_mappings()