Automation Workflows with Python
You can interact with the Fivetran REST API using Python to facilitate automation workflows. Test a few of the example use cases in this document and determine the best way to automate and monitor your Fivetran activity.
TIP: Use these examples as a jumping-off point for custom solutions that your organization may require.
Overview
The atlas function is a general-purpose function that interacts with our REST API. It supports the following parameters:
- The
method
parameter determines the HTTP method to use (GET, POST, PATCH, DELETE) - The
endpoint
parameter specifies the API endpoint to interact with - The
payload
parameter is used to send the payload data in the case of POST or PATCH requests
The function constructs the full URL for the API request, sets up the headers including the authorization, and makes the request using the requests
library. If the request is successful, it returns the JSON response. If the request fails, it prints an error message and returns None
. The function uses exception handling to catch any errors that occur during the request and to raise an exception if the HTTP status code indicates an error.
How it works
To interact with our REST API using Python, you must perform the following steps:
- Install the following libraries:
requests
json
colorama
- Set the values of the
api_key
andapi_secret
variables to your REST API credentials. - Run the script with the desired method, endpoint, and payload.
Use cases
- Create new user
- Create new team
- Create new group webhook
- Certificate management
- Create new connector
- Re-sync table
- Create new connector in multiple destinations
- Create new destinations
- Update connector settings
- Delete connector
- Get connector status
- Get connector status using pagination
- Get a list of connectors in a specific status
- Manage schema configurations
- Pause or sync connectors and log the action(s)
- Rewrite SQL files
Example: api_interact_connection_status.py
This Python script interacts with the Fivetran REST API to retrieve and display the status of connectors. It uses the requests
library to send HTTP requests and the colorama
library to colorize the output.
Import necessary libraries
The script begins by importing the following necessary Python libraries:
requests
for making HTTP requestsjson
for handling JSON datacolorama
for colorizing the terminal output
import requests
from requests.auth import HTTPBasicAuth
import json
import colorama
from colorama import Fore, Back, Style
Define the atlas function
This function is used to send HTTP requests to the Fivetran REST API.
NOTE: The Fivetran REST API uses API Key authentication. For each request to the API, provide an
Authorization
HTTP header with the following value:Basic {api_key}:{api_secret}
. Basic authentication requires the key and secret to be base64-encoded. See the REST API Authentication documentation section for details.
For example:
method
- the HTTP methodendpoint
- the API endpointpayload
- the request body for POST and PATCH requests
It constructs the request, sends it, and returns the response as a JSON object.
def atlas(method, endpoint, payload):
base_url = 'https://api.fivetran.com/v1'
credentials = f'{api_key}:{api_secret}'.encode('utf-8')
b64_credentials = base64.b64encode(credentials).decode('utf-8')
h = {
'Authorization': f'Basic {b64_credentials}'
}
url = f'{base_url}/{endpoint}'
...
Specify the request parameters
The script then specifies the parameters for the API request. In this case, it's making a GET request to the groups/{group_id}/connectors
endpoint.
group_id = ''
method = 'GET'
endpoint = 'groups/' + group_id + '/connectors'
payload = ''
Call the atlas function
The script calls the atlas function with the specified parameters and stores the response.
response = atlas(method, endpoint, payload)
Process and display the response
Finally, the script checks that the response is not None
, prints the request and response details, and iterates over the items in the response data. It prints the service
, sync_state
, and sync_frequency
for each item.
if response is not None:
print(Fore.CYAN + 'Call: ' + method + ' ' + endpoint + ' ' + str(payload))
print(Fore.GREEN + 'Response: ' + response['code'])
cdata_list = response['data']
ctimeline = cdata_list['items']
for c in ctimeline:
print(Fore.MAGENTA + 'Type:' + c['service'] + Fore.BLUE + ' Status:' + c['status']['sync_state'] + Fore.YELLOW + ' Frequency:' + str(c['sync_frequency']))
Example: api_interact_main_log.py
This Python script interacts with the Fivetran REST API to pause a given connector and log the actions. It uses the requests
library to send HTTP requests and the colorama
library to colorize the output.
Import necessary modules
import requests
from requests.auth import HTTPBasicAuth
import json
import colorama
from colorama import Fore
import os
import logging
from logging.handlers import RotatingFileHandler
Define the atlas function
This function is used to send HTTP requests to the Fivetran REST API.
NOTE: The Fivetran REST API uses API Key authentication. For each request to the API, provide an
Authorization
HTTP header with the following value:Basic {api_key}:{api_secret}
. Basic authentication requires the key and secret to be base64-encoded. See the REST API Authentication documentation section for details.
For example:
- HTTP method (GET, POST, PATCH, DELETE)
- API endpoint
- Payload (data to send with the request)
The function constructs the request, sends it, and logs the result.
def atlas(method, endpoint, payload):
base_url = 'https://api.fivetran.com/v1'
credentials = f'{api_key}:{api_secret}'.encode('utf-8')
b64_credentials = base64.b64encode(credentials).decode('utf-8')
h = {
'Authorization': f'Basic {b64_credentials}'
}
url = f'{base_url}/{endpoint}'
try:
if method == 'GET':
response = requests.get(url, headers=h, json=payload)
elif method == 'POST':
response = requests.post(url, headers=h, json=payload)
elif method == 'PATCH':
response = requests.patch(url, headers=h, json=payload)
elif method == 'DELETE':
response = requests.delete(url, headers=h, json=payload)
else:
raise ValueError('Invalid request method.')
response.raise_for_status() # Raise exception
logger.info(f'Successful {method} request to {url}')
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f'Request failed: {e}')
print(f'Request failed: {e}')
return None
Set up logging
The script sets up a logger that writes to a file (api_framework.log
). Once the log file exceeds 10MB, it is overwritten. The logger is set to log the INFO-level messages and higher. A rotating file handler is added to the logger, which keeps the last three log files when the current log file reaches 10MB.
log_file = "/api_framework.log"
log_size = 10 * 1024 * 1024 # 10 MB
#Check if the log file size exceeds 10MB
if os.path.exists(log_file) and os.path.getsize(log_file) >= log_size:
# If it does, overwrite the file
open(log_file, 'w').close()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
#Add a rotating handler
handler = RotatingFileHandler(log_file, maxBytes=log_size, backupCount=3)
logger.addHandler(handler)
Make a request
The script constructs a request to the Fivetran API to pause a connector (identified by the connector_id
). The HTTP method is PATCH, the endpoint is connectors/{connector_id}
, and the payload is {"paused": True}
.
connector_id = ''
method = 'PATCH' #'POST' 'PATCH' 'DELETE' 'GET'
endpoint = 'connectors/' + connector_id
payload = {"paused": True}
#Submit
response = atlas(method, endpoint, payload)
Handle the response
The script calls the atlas function to send the request and get the response. If the response is not None
, it prints the request details and response in different colors. In this example, we successfully pause a connector and log the activity.
if response is not None:
print(Fore.CYAN + 'Call: ' + method + ' ' + endpoint + ' ' + str(payload))
print(Fore.GREEN + 'Response: ' + response['code'])
print(Fore.MAGENTA + str(response))