Python SDK
Overview
The Bigeye python SDK is a collection of protobuf generated code, functions, and models used to interact programmatically with the Bigeye API.
Installation
The bigeye-sdk package is hosted on PyPi and can be installed via the following command. If you have already installed our CLI, then the SDK will already be installed as a dependency.
pip install bigeye-sdk
Authentication
The main entry point of the Bigeye SDK is the DatawatchClient abstraction and, in this core package, a basic auth client has been implemented. The abstract base class includes all core functionality and methods to interact with the Bigeye API.
Here are some examples on how to authenticate with the DatawatchClient within the Bigeye SDK.
Using a credential file
The CLI utilizes a credentials.ini file for authentication that by default is saved at ~/.bigeye/credentials. This can be easily generated by running the bigeye configure
command of the CLI. Here is a sample of what that file should look like for basic setups.
Sample credentials file:
[DEFAULT]
base_url = https://app.bigeye.com
user = [email protected]
password = fakepassword1234
You can then use that same file to authenticate with the SDK.
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
client = datawatch_client_factory(api_auth, workspace_id=153)
Workspaces
The workspace ID is only required to be passed to the DatawatchClient if your Bigeye user has access to more than 1 workspace. If not, then we will automatically default to the only workspace that you have access to.
Using AWS secrets
To help with more production scenarios, we also support the ability to authenticate via AWS secrets manager. First, create your AWS secret with the following properties.
{
"base_url": "https://app.bigeye.com",
"user": "fakeuser",
"password": "fakepassword123"
}
Once that secret is saved in AWS secrets manager you can then utilize it in the SDK to authenticate. This also requires that you have the AWS CLI configured and authenticated in the same environment you are running this script.
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
api_auth = ApiAuth.load_cred_from_aws_secret(region_name="us-west-2", secret_name="bigeye-docs-sdk-credentials")
client = datawatch_client_factory(api_auth, workspace_id=153)
Using base64 encoded
As another alternative, you can also encode a JSON string containing the necessary properties and pass that into the SDK to authenticate. See example below.
import base64
import json
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
creds = json.dumps({
"base_url": "https://app.bigeye.com",
"user": "fakeusername",
"password": "fakepassword123"
})
encoded_creds = base64.b64encode(creds.encode('utf-8'))
api_auth = ApiAuth.load_from_base64(encoded_creds)
client = datawatch_client_factory(api_auth, workspace_id=153)
Examples
Once authenticated, the Bigeye SDK has access to all API calls available within Bigeye. Here are some common use cases and examples of how to leverage the Bigeye SDK to help automate certain workflows within Bigeye.
Batch run metrics
One common use case is when customers have a set of Bigeye metrics that they would like to be run at a certain time. They can use the SDK to trigger metric runs for a particular set of metrics and then perform subsequent actions based on the outcomes of those metric runs. For example, if metrics are run and deemed to be unhealthy you could implement a circuit breaking process. In the code snippet below, we run metrics that are grouped together in a collection.
from typing import List
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import MetricInfo, MetricStatus
from bigeye_sdk.log import get_logger
log = get_logger(__file__)
COLLECTION_NAME = 'Demo collection'
def batch_run_metrics():
# authenticate with Bigeye
api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
client = datawatch_client_factory(api_auth, workspace_id=153)
# get all metrics ids from a particular collection
metric_ids = [
c.metric_ids for c in client.get_collections().collections
if c.name == COLLECTION_NAME
][0]
# run metrics async
run_metrics: List[MetricInfo] = client.run_metric_batch_async(metric_ids=metric_ids)
# Check metric run results
for m in run_metrics:
if m.metric_metadata.current_metric_status != MetricStatus.METRIC_STATUS_HEALTHY:
log.info(f"metric {m.metric_configuration.id} is unhealthy")
if __name__ == '__main__':
batch_run_metrics()
Fetch schema changes
Bigeye automatically indexes all connected data sources every 24 hours. With this data, Bigeye is able to list out all schema changes (additions, deletions, and data type changes) that have been detected on these data sources. In some scenarios, you may want to schedule subsequent notifications or tasks based on the schema changes detected.
import datetime
import dateutil.relativedelta
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.log import get_logger
log = get_logger(__file__)
def fetch_schema_changes():
# authenticate with Bigeye
api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
client = datawatch_client_factory(api_auth, workspace_id=153)
# get date exactly 1 month ago
one_month_ago = (
datetime.datetime.now() +
dateutil.relativedelta.relativedelta(months=-1)
).timestamp()
# fetch schema changes
schema_changes = [
sc for sc in client.fetch_schema_changes(source_id=299)
if sc.detected_at > one_month_ago
]
# take action based on all schema changes in last month
for change in schema_changes:
log.debug(change)
if __name__ == '__main__':
fetch_schema_changes()
Get all metrics
In some cases, you just want to get a list of all the metrics that are currently configured in your Bigeye workspace. Here is an example that gets all metrics that a user has access to.
from typing import List
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import MetricInfo
from bigeye_sdk.log import get_logger
log = get_logger(__file__)
def get_all_metrics():
# authenticate with Bigeye
api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
client = datawatch_client_factory(api_auth, workspace_id=153)
# get list of source Ids that user has access to
source_ids = [s.id for s in client.get_sources().sources]
# get all metrics for all sources
metrics: List[MetricInfo] = client.get_metric_info_batch_post(warehouse_ids=source_ids).metrics
for m in metrics:
log.info(m)
if __name__ == '__main__':
get_all_metrics()
Manage Custom Rules
You can also manage all of your custom rules via our SDK using the examples below.
Create new rules
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import CustomRulesThresholdType
def create_rule():
# authenticate with Bigeye
api_auth = ApiAuth.load()
client = datawatch_client_factory(api_auth, workspace_id=153)
client.upsert_custom_rule(
warehouse_id=1801,
name="test sdk rule 3",
sql="select * from tooy_demo.prod_repl.orders where price_per_unit <= 0",
threshold_type=CustomRulesThresholdType.CUSTOM_RULES_THRESHOLD_TYPE_COUNT,
upper_threshold=0
)
if __name__ == '__main__':
create_rule()
Bulk run rules for a source
from operator import attrgetter
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import GetCustomRuleListResponse, MetricStatus
from bigeye_sdk.log import get_logger
log = get_logger(__file__)
def bulk_run_rules():
# authenticate with Bigeye either via credential file or via aws secret
api_auth = ApiAuth.load()
client = datawatch_client_factory(api_auth, workspace_id=153)
rules: GetCustomRuleListResponse = client.get_rules(warehouse_id=1801)
run_rules = client.bulk_run_rules(rule_ids=[r.id for r in rules.custom_rules])
for r in run_rules.failed_updates:
log.error(f"Rule id {r} failed to update.")
for r in run_rules.not_run_ids:
log.warning(f"Rule id {r} was not run.")
successfully_run_rules = [r for r in client.get_rules(warehouse_id=1801).custom_rules if r.id in run_rules.successful_ids]
for r in successfully_run_rules:
latest_run = max(r.latest_runs, key=attrgetter('run_at_epoch_seconds'))
if latest_run.summary_status != MetricStatus.METRIC_STATUS_HEALTHY:
log.info(f"Rule is unhealthy with status of {latest_run.summary_status.name}")
else:
log.info(f"Rule refresh completed with healthy status.")
if __name__ == '__main__':
bulk_run_rules()
Rebuild Sources
Another common use case is to trigger Bigeye's indexing process after certain jobs complete, or prior to metric runs. This example will trigger indexing for every source that the user has access to. This can also be done for particular sources.
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import GetSourceListResponse
def rebuild_sources():
# authenticate with Bigeye
api_auth = ApiAuth.load_cred_from_aws_secret(region_name="us-west-2",
secret_name="bigeye-docs-sdk-credentials")
client = datawatch_client_factory(api_auth, workspace_id=153)
# rebuild all sources
sources_response: GetSourceListResponse = client.get_sources()
for source in sources_response.sources:
client.rebuild_source(source_id=source.id)
if __name__ == '__main__':
rebuild_sources()
Dynamic column level lineage
Bigeye has a lot of features to automatically detect column level lineage between sources. In edge cases where this is not possible these scripts can help automate the generating of column level lineage. The first example requires that the columns in the upstream and downstream tables have the exact same names (not case sensitive).
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.controller.lineage_controller import LineageController
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
UPSTREAM_FQ_TABLE_NAME = 'Postgres.tooy_demo_db.prod.customer'
DOWNSTREAM_FQ_TABLE_NAME = 'PROD_WAREHOUSE.TOOY_DEMO.PROD_REPL.CUSTOMER'
def cll_from_fq_table_name():
# authenticate with Bigeye
api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
client = datawatch_client_factory(api_auth, workspace_id=153)
controller = LineageController(client)
# get bigeye tables based on fully qualified table names
upstream_table = controller.get_tables_from_selector(selector=UPSTREAM_FQ_TABLE_NAME)[0]
downstream_table = controller.get_tables_from_selector(selector=DOWNSTREAM_FQ_TABLE_NAME)[0]
# column names must match across tables
controller.infer_column_level_lineage_from_tables(tables=[(upstream_table, downstream_table)])
if __name__ == '__main__':
cll_from_fq_table_name()
In this next example, the column names do not match up between the upstream and downstream tables, but we are still able to generate the column level lineage.
from bigeye_sdk.authentication.api_authentication import ApiAuth
from bigeye_sdk.controller.lineage_controller import LineageController
from bigeye_sdk.client.datawatch_client import datawatch_client_factory
from bigeye_sdk.generated.com.bigeye.models.generated import ColumnNamePair, DataNodeType
UPSTREAM_FQ_TABLE_NAME = 'Postgres.tooy_demo_db.prod.orders'
DOWNSTREAM_FQ_TABLE_NAME = 'PROD_WAREHOUSE.TOOY_DEMO.PROD_REPL.ORDERS'
COLUMN_MAPPINGS = [
ColumnNamePair(
source_column_name="id",
target_column_name="ORDER_ID"
),
ColumnNamePair(
source_column_name="c_id",
target_column_name="CUSTOMER_ID"
)
]
def cll_custom_column_mappings():
# authenticate with Bigeye
api_auth = ApiAuth.load(auth_file="/Users/cmacdonald/.bigeye/credentials.ini")
client = datawatch_client_factory(api_auth, workspace_id=153)
controller = LineageController(client)
# get bigeye tables based on fully qualified table names
upstream_table = controller.get_tables_from_selector(selector=UPSTREAM_FQ_TABLE_NAME)[0]
downstream_table = controller.get_tables_from_selector(selector=DOWNSTREAM_FQ_TABLE_NAME)[0]
# get upstream and downstream columns indexed by name
upstream_columns_ix_by_name = {
c.column.name: c.column
for c in client.get_columns(column_ids=[c.id for c in upstream_table.columns]).columns
}
downstream_columns_ix_by_name = {
c.column.name: c.column
for c in client.get_columns(column_ids=[c.id for c in downstream_table.columns]).columns
}
# establish column level lineage
for cm in COLUMN_MAPPINGS:
controller.create_edges(
upstream=upstream_columns_ix_by_name[cm.source_column_name],
downstream=downstream_columns_ix_by_name[cm.target_column_name],
node_type=DataNodeType.DATA_NODE_TYPE_COLUMN
)
if __name__ == '__main__':
cll_custom_column_mappings()
Updated 5 months ago