Supported Operations
Fivetran Connector SDK supports the following data delivery operations for use within the update() method:
Each call to upsert(), update(), delete(), or truncate() sends exactly one operation through the SDK's gRPC stream. For upsert(), update(), and delete(), your connector is responsible for iterating over the records returned by the source system, deciding which operation makes sense for each record, and building the dictionary of column values that should be written. The SDK formats the data and forwards it to Fivetran. The truncate() operation requires only the table name — no row data is needed.
Destination naming and normalization when using upsert() and update()
The values you provide for the table and column keys are renamed based on our renaming rules so that the corresponding names in the destination may differ from how they are set in your code.
If you want the table and column names in the destination to exactly match the names you set in your code, we recommend adhering to the renaming rules ensuring the names align with the pattern and character set of transformed names. This means that names of the tables and columns in your source may not exactly match the corresponding names in the destination.
upsert()
Writes a single new or updated record to the target table by primary key.
Signature
op.upsert(table="name", data=data)
Parameters
| Name | Type | Description |
|---|---|---|
table | str | Name of target table. |
data | dict | Dictionary contains key-value pairs of target table column names and values, respectively. Value data type can be any of supported data types. |
Example
data = {"id": 1, "name": "John Doe"}
op.upsert(table="users", data=data)
Notes
- Columns present in target table but not present in data passed in operation become
NULL. - Column passed in row yet not present in target table writes column value for passed row and all existing rows in target table get NULL value for that column.
- Data types must match schema or be inferable by SDK.
update()
Updates existing row by primary key without creating new record.
Signature
op.update(table="name", modified=data)
Parameters
| Name | Type | Description |
|---|---|---|
table | str | Name of target table. |
modified | dict | Dictionary contains key-value pairs of target table column names and values, respectively. Value data type can be any of supported data types. |
Example
modified = {"id": 1, "status": "inactive"}
op.update(table="users", modified=modified)
Notes
- All columns of composite primary keys must be included in
modifieddictionary for correct row update. - This operation does not write data with new primary keys.
- Columns present in target table but not present in data passed in this method are left unchanged.
delete()
Marks rows as deleted by setting _fivetran_deleted = TRUE in target table.
Signature
op.delete(table="name", keys=data)
Parameters
| Name | Type | Description |
|---|---|---|
table | str | Name of target table. |
keys | dict | Dictionary contains key-value pairs of target table column names and values, respectively. All columns of composite primary keys must be included in dictionary. Value data type can be any of supported data types. |
Example
op.delete(table="users", keys={"id": 1})
truncate()
Marks all existing rows in the table as deleted by setting _fivetran_deleted = TRUE.
Signature
op.truncate(table="name")
Parameters
| Name | Type | Description |
|---|---|---|
table | str | Name of target table to truncate. |
Example
op.truncate(table="staging_data")
Notes
- Unlike
delete()which marks specific rows by primary keys,truncate()marks all rows whose_fivetran_syncedtimestamp is earlier than the time of thetruncate()call as deleted. Rows inserted after thetruncate()call are not affected. - This is a soft delete operation — rows are marked with
_fivetran_deleted = TRUErather than being physically removed. - The table structure (columns, schema) is preserved.
- This operation is buffered and executed at the next pipeline flush, which occurs at each
checkpoint()call. - Useful when you need to mark all previously synced rows in a table as deleted without iterating through each row individually.
checkpoint()
Saves current sync state by updating state: dict with new_state and signaling that data sent up until this point can be safely written to destination.
Signature
op.checkpoint(state=new_state)
Example
new_state = {"table1_cursor": "2024-08-14T02:01:00Z"}
op.checkpoint(state=new_state)
Notes
- You must design the structure of your connector's state to suit your connector's needs and use it to prevent data reprocessing in failed/long syncs.
- To achieve incremental syncs, you must identify a column to use as a cursor for each table and store the latest value of the cursor for each table in your connector's state to keep track of what data has been sent to the destination. At the start of a sync, the connector needs to read the cursor for each table from the state and use it to fetch only data the connector hasn't previously synced.
- State values are not stored automatically; only contents of
new_stateare applied as they are passed. - State must be passed as JSON string representing single JSON object (i.e., decoded result must be dictionary — not array, string, or number).
- All but simplest connectors need to use
checkpoint()so that connection does not reprocess data, especially when long sync fails due to any underlying reason. See our recommendations on checkpointing for large data sets. - Full connection re-sync can be run in the Fivetran dashboard. To re-sync only affected table(s), use the Modify the Connection's State endpoint. State object implementation in your connector must support table-level granularity to enable single table re-syncs.