Apply Schema Updates Across Multiple Connections
When you have multiple connections that share the same schema, you may need to propagate a single schema change to all of them. A common example is a sharded database setup, where multiple database instances split data horizontally. Each shard holds a distinct subset of rows, typically partitioned by a key such as user_id or region, and all shards share the same table structure with its own Fivetran connection.
When your source database changes, for example, your product ships a new REFUNDS table or adds a new column, currency to an existing TRANSACTIONS table, you need to propagate that change across every connection. With 10, 50, or more connections, doing this manually in your Fivetran dashboard for each connection is time-consuming and error-prone.
This tutorial shows you how to use the Fivetran REST API to automate that update across all connections in a group with a single script.
The same pattern applies to other bulk operations, such as:
- Disabling a table across all connections, for example, to exclude a PII table.
- Changing the
schema_change_handlingpolicy across all connections.
Prerequisites
Before you begin, ensure you have:
- A Standard, Enterprise, or Business Critical Fivetran account with admin access.
- Your API key and secret.
- A Fivetran group containing multiple connections. Make a note of the
group_id— you can retrieve it by calling the List all groups endpoint. - Destination credentials with permission to create schemas and tables, if your destination requires these permissions.
If you need to create connections first, see Set up connections programmatically.
Endpoints used
| Endpoint | Description |
|---|---|
GET /v1/connections | List all connections in a group |
POST /v1/connections/{connection_id}/schemas/reload | Reload the schema to discover new tables |
PATCH /v1/connections/{connection_id}/schemas/{schema_name} | Enable or disable tables on a connection |
POST /v1/connections/{connection_id}/sync | Trigger a sync |
GET /v1/connections/{connection_id} | Retrieve connection details |
How to apply schema updates across multiple connections
The following steps walk you through enabling a new table and a new column across all your connections.
List all connections in your group
Use the List All Connections endpoint to retrieve every connection in your group. Use the group_id query parameter to filter by group.
Endpoint
GET https://api.fivetran.com/v1/connections?group_id={group_id}
Sample response
{
"data": {
"items": [
{ "id": "conn_abc123", "schema": "shard_001" },
{ "id": "conn_def456", "schema": "shard_002" },
{ "id": "conn_ghi789", "schema": "shard_003" }
],
"next_cursor": null
}
}
If your group has more than 100 connections, the response includes a next_cursor value. Pass it as a cursor query parameter in a follow-up request to retrieve the next page.
Reload schemas to discover the new table
If the new table has never been synced before, it may not yet appear in the connection's schema config in Fivetran. Use the Reload a Connection Schema Config endpoint on each connection to fetch the latest schema from the source.
Endpoint
POST https://api.fivetran.com/v1/connections/{connection_id}/schemas/reload
Sample response
{
"code": "Success",
"message": "Schema configuration has been reloaded",
"data": {
"schemas": {
"public": {
"enabled": true,
"tables": {
"refunds": {
"enabled": false
}
}
}
}
}
}
You can skip this step if the table is already visible in the connection's schema config in Fivetran, even if it is currently disabled.
Enable the new table and column on each connection
Use the Update a Connection Database Schema Config endpoint to enable the new REFUNDS table and the new currency column on the existing TRANSACTIONS table in a single request per connection.
The request body includes "enabled": true at the schema level in addition to the table level. If the schema itself is disabled, the table won't sync even if it is individually enabled.
Endpoint
PATCH https://api.fivetran.com/v1/connections/{connection_id}/schemas/{schema_name}
Sample request
{
"enabled": true,
"tables": {
"refunds": {
"enabled": true
},
"transactions": {
"enabled": true,
"columns": {
"currency": {
"enabled": true
}
}
}
}
}
Sample response
{
"code": "Success",
"message": "Schema has been updated",
"data": {
"schemas": {
"public": {
"enabled": true,
"tables": {
"refunds": {
"enabled": true
},
"transactions": {
"enabled": true,
"columns": {
"currency": { "enabled": true }
}
}
}
}
}
}
}
Repeat this call for each connection ID retrieved in the List all connections step.
Trigger syncs
Use the Sync Connection Data endpoint to trigger a sync for each connection so the newly enabled table starts syncing to the destination.
If a sync is already running on a connection, this endpoint returns HTTP 409 Conflict. The example script in this tutorial does not handle this error — it raises an exception and stops. For production use, check each connection's sync_state before calling this endpoint, or add 409 error handling to your script.
Endpoint
POST https://api.fivetran.com/v1/connections/{connection_id}/sync
Sample request
{
"force": false
}
Set "force" to false to trigger a sync only if no sync is already running. Set it to true to stop any in-progress sync and start a new one immediately.
Monitor sync status
Use the Retrieve Connection Details endpoint to check the sync status of each connection.
Endpoint
GET https://api.fivetran.com/v1/connections/{connection_id}
Sample response
{
"data": {
"id": "conn_abc123",
"status": {
"sync_state": "syncing"
}
}
}
Poll this endpoint until sync_state is no longer "syncing" or "rescheduled".
Alternatively, set up a webhook to receive a notification when each sync completes instead of polling.
Example Python script
The following Python script automates all steps above for every connection in a group.
Before running the script, install the required library:
pip install requests
Set your API credentials as environment variables:
export FIVETRAN_API_KEY=your_api_key export FIVETRAN_API_SECRET=your_api_secret
set FIVETRAN_API_KEY=your_api_key set FIVETRAN_API_SECRET=your_api_secret
import os
import time
import requests
from requests.auth import HTTPBasicAuth
BASE_URL = "https://api.fivetran.com/v1"
AUTH = HTTPBasicAuth(os.environ["FIVETRAN_API_KEY"], os.environ["FIVETRAN_API_SECRET"])
HEADERS = {"Accept": "application/json;version=2", "Content-Type": "application/json"}
GROUP_ID = "<your_group_id>"
SCHEMA_NAME = "public" # replace with your source schema name if different
NEW_TABLE = "refunds" # replace with the new table you want to enable
EXISTING_TABLE = "transactions" # replace with the existing table you want to update
NEW_COLUMN = "currency" # replace with the new column you want to enable
def api(method, path, payload=None, params=None):
url = f"{BASE_URL}{path}"
response = requests.request(method, url, json=payload, params=params, headers=HEADERS, auth=AUTH, timeout=60)
response.raise_for_status() # raises HTTPError on 4xx/5xx; check response body for details
return response.json()
def list_connections(group_id):
conn_ids, cursor = [], None
while True:
params = {"group_id": group_id, "limit": 100}
if cursor:
params["cursor"] = cursor
response = api("GET", "/connections", params=params)
conn_ids += [item["id"] for item in response["data"]["items"]]
cursor = response["data"].get("next_cursor")
if not cursor:
break
print(f"Found {len(conn_ids)} connections in group '{group_id}'")
return conn_ids
def reload_schema(conn_id):
api("POST", f"/connections/{conn_id}/schemas/reload")
print(f" Reloaded schema for {conn_id}")
def enable_table_and_column(conn_id, schema_name, new_table, existing_table, new_column):
api("PATCH", f"/connections/{conn_id}/schemas/{schema_name}", {
"enabled": True,
"tables": {
new_table: {"enabled": True},
existing_table: {
"enabled": True,
"columns": {new_column: {"enabled": True}},
},
},
})
print(f" Enabled {schema_name}.{new_table} and {schema_name}.{existing_table}.{new_column} on {conn_id}")
def trigger_sync(conn_id):
api("POST", f"/connections/{conn_id}/sync", {"force": False})
print(f" Triggered sync for {conn_id}")
def wait_for_sync(conn_id, timeout_minutes=30): # increase timeout for large shards
deadline = time.time() + timeout_minutes * 60
while time.time() < deadline:
state = api("GET", f"/connections/{conn_id}")["data"]["status"]["sync_state"]
print(f" {conn_id}: {state}")
if state not in ("syncing", "rescheduled"):
return state
time.sleep(15)
return "timeout"
if __name__ == "__main__":
# Step 1: List all connections in the group
connection_ids = list_connections(GROUP_ID)
# Steps 2-4 run sequentially across all connections before moving to the next step.
# Step 2: Reload schemas to discover the new table
for conn_id in connection_ids:
reload_schema(conn_id)
# Step 3: Enable the new table and column on each connection
for conn_id in connection_ids:
enable_table_and_column(conn_id, SCHEMA_NAME, NEW_TABLE, EXISTING_TABLE, NEW_COLUMN)
# Step 4: Trigger syncs
for conn_id in connection_ids:
trigger_sync(conn_id)
# Step 5: Wait for syncs to complete
for conn_id in connection_ids:
final_state = wait_for_sync(conn_id)
print(f"{conn_id} finished with state: {final_state}")
Never hardcode credentials in source code.