Many companies are using Tableau and Tableau extracts for data visualizations and analytics. Along with that, they are also using dbt for data transformation and orchestration to support those Tableau visualizations. Tableau and dbt are separate systems with no built-in way to trigger a Tableau refresh once a dbt job has completed. Trying to time dbt jobs and tableau refreshes by guessing can lead to many undesirable outcomes, such as errors and outdated visualizations. If the dbt job took too long or returned an error, then the Tableau dashboard that is connected to the extract would show outdated data or, even worse, be broken altogether. We need a better solution that ensures the data source won’t refresh unless the dbt job finishes running successfully to remove any doubt about the quality of data that is being used in the dashboard. So, having an automated and safe way to trigger Tableau extract refreshes when dbt jobs complete is an important integration task between the two systems.
To automate tableau refreshes once a dbt job is complete, we can use the dbt Cloud API, Tableau server client, and python to create a simple program that can trigger a dbt job and, once completed, can immediately trigger a data source/workbook refresh. This program would live inside a lambda function and can be put on a schedule to run every hour. Some may wonder why python was used, and the main reason for this is that the Tableau server client package is only available in python. If you prefer to use another language to build this program, then you will need to use the Tableau REST API. The Tableau server client has links to the equivalent API call in its documentation.
The Program:
1) Import Packages
We need to import requests in order to make calls to the dbt Cloud API and it is a dependency to the Tableau server client package. We will also need to import time, enum, and os for handling the constant checks to the dbt job status. Lastly, we will need to import the Tableau server client which will help run all necessary actions we need in Tableau.
2) Connect to dbt via the Cloud API
In order to make any calls to the API, we will need two things: the account ID and a personal API key. We can find both on dbt Cloud: the accountID is in the URL, and the API key is in your dbt account settings. More information on authentication can be found here. Once you have these two pieces, you are ready to make dbt API requests. Here is an example request that returns data on your dbt account:
3) Trigger a specific dbt job to run
Now, for keeping this example simple, we will trigger a specific job to run within the program using the Trigger job POST request. We will need the unique job ID to make this request which can be found either by making a “get request” that returns all jobs of the account or by going to the job on dbt Cloud and pulling the ID from the URL. Next, we need to build a function that takes the job ID as a parameter and returns the unique run ID that is generated by dbt in order to keep track of its status. Here is the function that runs the dbt job:
4) Track the status of the job until it is finished
Once we have triggered the job to run, we will need to keep track of its status. This can be done via a helper function that will be placed inside a while loop. The helper function takes the run ID as a parameter and will access the current status of the dbt job using the Get run request. This request returns the status as a numeric enum representation of the job status 1: Queued 2: Starting 3: Running 10: Success 20: Error 30: Canceled.
We want to know when the job has finished running and to do this we will need to call this function periodically. To do this, we can put this in a while loop that pulls the status, checks if it’s equal to success: if yes then break, if no wait 5 seconds and repeat. An example:
5) Connect to Tableau via the Tableau server client
When we get a success status for our dbt job, we can then sign into the Tableau server client and start doing some data source/workbook refreshes. To sign in, we will need either your Tableau username and password or a personal access token, the site ID and the server URL. We can create a Tableau personal access token in your tableau online settings, and we can find the site ID in the URL (https://<server_url>/#/site/<site_id>/projects)
6) Refresh the datasource/workbook
Refreshing a data source or workbook only requires an ID once signed in. We can get these IDs by running TAB_SERVER.workbooks.get() which returns a list of all workbook objects in the Tableau site. We place these function calls in the while loop from above after the success condition is met. Here is an example of refreshing a workbook and its datasources:
Now that we have a program that can trigger a dbt job, check its status, and trigger a Tableau data source/workbook refresh, we can create a new lambda function in AWS and put in on a schedule to auto run the dbt job and subsequently refresh data sources or run extract schedules. The credentials that we used to connect to dbt and Tableau can be stored as environment variables so that when tokens expire, we would only need to change the variable value and not the code itself. We can easily extend this format to handle many kinds of orchestration tasks between dbt and Tableau and I hope this provides a great starting point.
Full Code
import enum
import os
import time
import requests
import tableauserverclient as tsc
### TABLEAU SERVER CLIENT CONFIG ###
# Lambda function environment variables:
TAB_TOKEN_NAME = os.environ['TAB_TOKEN_NAME']
TAB_TOKEN_VALUE = os.environ['TAB_TOKEN_VALUE']
TAB_SITE_ID = os.environ['TAB_SITE_URL']
TAB_SERVER_URL = os.environ['TAB_SERVER_URL']
# This corresponds to the contentUrl attribute in the Tableau REST API.
# The site_id is the portion of the URL that follows the /site/ in the URL.
# For example, “MarketingTeam” is the site_id in the following URL MyServer/#/site/MarketingTeam/projects.
TAB_AUTH = tsc.PersonalAccessTokenAuth(TAB_TOKEN_NAME, TAB_TOKEN_VALUE, site_id=TAB_SITE_ID)
TAB_SERVER = tsc.Server(TAB_SERVER_URL, use_server_version=True)
### DBT CONFIG ###
# Lambda function environment variables:
DBT_ACCOUNT_ID = os.environ['DBT_ACCOUNT_ID']
DBT_API_KEY = os.environ['DBT_CLOUD_API_TOKEN']
# Status Enum, pulled from dbt api docs
class DbtJobStatus(enum.IntEnum):
QUEUED = 1
STARTING = 2
RUNNING = 3
SUCCESS = 10
ERROR = 20
CANCELED = 30
def _dbt_run_job(job_id) -> int:
response = requests.post(
url=f'https://cloud.getdbt.com/api/v2/accounts/{DBT_ACCOUNT_ID}/jobs/{job_id}/run/',
headers={'Authorization': f'Token {DBT_API_KEY}'},
json = {'cause': 'job triggered by dbt-tableau-extract-refresh'}
)
try:
response.raise_for_status()
except requests.exceptions.RequestException as e:
print('FAILED TO RUN')
raise e
response_payload = response.json()
return response_payload['data']['id']
def _dbt_get_job_status(run_id) -> int:
print()
response = requests.get(
url=f'https://cloud.getdbt.com/api/v2/accounts/{DBT_ACCOUNT_ID}/runs/{run_id}/',
headers={'Authorization': f'Token {DBT_API_KEY}'}
)
try:
response.raise_for_status()
except requests.exceptions.RequestException as e:
raise e
response_payload = response.json()
return response_payload['data']['status']
def _tab_get_workbook_by_id(w_id) -> object:
workbook = TAB_SERVER.workbooks.get_by_id(w_id)
return workbook
def _tab_run_workbook(workbook):
TAB_SERVER.workbooks.refresh(workbook)
def _tab_sign_in():
TAB_SERVER.auth.sign_in(TAB_AUTH)
def _tab_sign_out():
TAB_SERVER.auth.sign_out()
def run():
job_id = '' # Fill in with job id
run_id = _dbt_run_job(job_id)
print(f'run_id = {run_id}')
while True:
print('loading...')
time.sleep(5)
status = _dbt_get_job_status(run_id)
print(DbtJobStatus(status))
if status == DbtJobStatus.SUCCESS:
_tab_sign_in()
# Do some datasource/workbook refreshes or schedule runs
# Example:
w_id = '' # Fill in with workbook id
workbook = _tab_get_workbook_by_id(w_id)
_tab_run_workbook(workbook)
_tab_sign_out()
break
elif status == DbtJobStatus.ERROR:
raise Exception('Job ran into an error')
elif status == DbtJobStatus.CANCELED:
raise Exception('Job was canceled')
if __name__ == '__main__':
run()
Commentaires