Automate Connector Management With Python
Use Case
You want to get started writing programmatic scripts in Python that interact with the Fivetran REST API. A common use case for this is to automate processes around creating, updating, or running connectors on a custom schedule. You are looking for an example of how to get started.
Recommendation
Fivetran recommends starting off with a simple example to familiarize yourself with how to interact with our API using Python. This code example walks through the process to sync a connector manually using the API, which follows this diagram.
Code example
import requests
from requests.auth import HTTPBasicAuth
import json
import colorama
from colorama import Fore, Back, Style
def atlas(config_path):
# Load configuration
with open(config_path, "r") as i:
config = json.load(i)
api_key = config['fivetran']['api_key']
api_secret = config['fivetran']['api_secret']
auth = HTTPBasicAuth(api_key, api_secret)
base_url = 'https://api.fivetran.com/v1'
headers = {
'Authorization': f'Bearer {api_key}:{api_secret}'
}
def make_request(method, endpoint, payload=None):
url = f'{base_url}/{endpoint}'
try:
if method == 'GET':
response = requests.get(url, headers=headers, auth=auth)
elif method == 'POST':
response = requests.post(url, headers=headers, json=payload, auth=auth)
else:
raise ValueError('Invalid request method.')
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f'Request failed: {e}')
return None
# Get groups
groups = make_request('GET', 'groups')
#print(groups)
if groups is None or not groups['data']['items']:
print(Fore.RED + "No groups found or unable to fetch groups.")
return
# Get the last group
last_group = groups['data']['items'][-1]
last_group_id = last_group['id']
print(Fore.BLUE + f"Selected last group: {last_group['name']} (ID: {last_group_id})")
# Get connectors for the last group.
# connectors = make_request('GET', f'groups/{last_group_id}/connectors')
# Or, a specific group
group_id = ''
connectors = make_request('GET', f'groups/{group_id}/connectors')
#print(connectors)
if connectors is None or not connectors['data']['items']:
print(Fore.RED + f"No connectors found in group {group_id} or unable to fetch connectors.")
return
# Get the last connector
last_connector = connectors['data']['items'][-1]
last_connector_id = last_connector['id']
print(Fore.BLUE + f"Selected last connector: {last_connector['schema']} (ID: {last_connector_id})")
# Trigger sync for the last connector
sync_result = make_request('POST', f'connectors/{last_connector_id}/sync')
if sync_result is not None:
print(Fore.GREEN + f"Sync triggered for connector {last_connector_id} in group {last_group_id}")
print(Fore.CYAN + f"Response: {sync_result['message']}")
else:
print(Fore.RED + f"Failed to trigger sync for connector {last_connector_id}")
if __name__ == "__main__":
config_path = '/config.json'
atlas(config_path)