Fivetran Platform Connector Sample Queries
In the provided sample queries, the <destination_db>
and <destination_schema>
placeholders are references to different entities, depending on your destination, that correspond to your destination name/project ID and Fivetran Platform connection name, respectively.
The following table shows the mapping between the placeholder and the destination entities along with usage examples:
Destination | <destination_db> | <destination_schema> | Example |
---|---|---|---|
Snowflake | Snowflake database name | Schema name | analytics.platform.log |
BigQuery | GCP project ID | Dataset name | `my-project.my_dataset.log` |
Redshift | (optional) | Schema name | public.log |
Use this mapping to replace the placeholders in your queries.
NOTE: In BigQuery, the full table reference must be enclosed in backticks.
NOTE: For an automatically-created Fivetran Platform connection (when you create a destination), the default name is
fivetran_metadata
. For a manually-created Fivetran Platform connection, the default name isfivetran_log
. For S3 Data Lake and Managed Data Lake Services, the default name isfivetran_metadata_<group_id>
.
Calculate monthly active rows (MAR) per connection
The Monthly Active Rows (MAR) count sent through the Fivetran Platform Connector includes the active rows recorded up to the current date. The count resets at the beginning of each month. For example, if your MAR count for schema_A.table_A
is 5500 on January 1st but is 5800 on January 31st, your January MAR for schema_A.table_A
is 5800. Your MAR count then drops to 0 on February 1st. To learn more about MAR, see our pricing documentation.
NOTE: If you use these queries during your free trial, we recommend that you add every data source that you plan to use with Fivetran and let each connection run for 7 days under a typical load. This will give you a more accurate idea of how well Fivetran meets your business needs.
Calculate MAR grouped by schema (connection), destination, and month
Redshift and Snowflake MAR
Expand for Redshift and Snowflake query
We have tested the following query for Redshift and Snowflake destinations:
SELECT
schema_name,
destination_id,
date_trunc('month', measured_date) AS measured_month,
SUM(incremental_rows) AS MAR
FROM <destination_db>.<destination_schema>.incremental_mar
WHERE free_type = 'PAID'
GROUP BY schema_name, destination_id, measured_month
ORDER BY measured_month, schema_name
This query depends on the INCREMENTAL_MAR table.
BigQuery MAR
Expand for BigQuery query
We have tested the following query for BigQuery destinations:
SELECT
schema_name,
destination_id,
date_trunc(measured_date, month) AS measured_month,
SUM(incremental_rows) AS MAR
FROM <destination_db>.<destination_schema>.incremental_mar
WHERE free_type = 'PAID'
GROUP BY schema_name, destination_id, measured_month
ORDER BY measured_month, schema_name
This query depends on the INCREMENTAL_MAR table.
Calculate MAR by table
Redshift and Snowflake MAR by table
Expand for Redshift and Snowflake query
Run the following query with a Redshift or Snowflake destinations:
SELECT
schema_name,
destination_id,
table_name,
connection_name,
date_trunc('month', measured_date) AS measured_month,
SUM(incremental_rows) AS incremental_rows
FROM <destination_db>.<destination_schema>.incremental_mar
WHERE free_type = 'PAID'
GROUP BY schema_name, destination_id, measured_month, table_name, connection_name
ORDER BY measured_month, schema_name, table_name;
This query depends on the INCREMENTAL_MAR table.
BigQuery MAR by table
Expand for BigQuery query
Run the following query with a BigQuery destination:
SELECT
schema_name,
destination_id,
table_name,
connection_name,
date_trunc(measured_date, month) AS measured_month,
SUM(incremental_rows) AS incremental_rows
FROM <destination_db>.<destination_schema>.incremental_mar
WHERE free_type = 'PAID'
GROUP BY schema_name, destination_id, measured_month, table_name, connection_name
ORDER BY measured_month, schema_name, table_name;
This query depends on the INCREMENTAL_MAR table.
Calculate monthly transformation model runs
The Fivetran Platform Connector logs the number of times transformation models have run each month. This count includes all successful model runs up to the current date and resets to zero at the beginning of each month. For example, if your count for the Unified RAG
job is 5500 on January 21 and 5800 on January 31, then the total number of successful model runs for January is 5800. On February 1st, the count resets to 0.
Calculate model runs grouped by month, destination, and job name
Redshift and Snowflake
Expand for Redshift and Snowflake query
Run the following query with a Redshift or Snowflake destination:
SELECT
date_trunc('month', measured_date) AS measured_month,
destination_id,
job_name,
SUM(model_runs) AS model_runs
FROM <destination_db>.<destination_schema>.transformation_runs
WHERE free_type = 'PAID'
GROUP BY measured_month, destination_id, job_name
ORDER BY measured_month, destination_id, job_name;
This query depends on the TRANSFORMATION_RUNS
table.
BigQuery
Expand for BigQuery query
Run the following query with a BigQuery destination:
SELECT
date_trunc(measured_date, month) AS measured_month,
destination_id,
job_name,
SUM(model_runs) AS model_runs
FROM <destination_db>.<destination_schema>.transformation_runs
WHERE free_type = 'PAID'
GROUP BY measured_month, destination_id, job_name
ORDER BY measured_month, destination_id, job_name;
This query depends on the TRANSFORMATION_RUNS
table.
Calculate model runs grouped by destination, project type, and month
BigQuery
Expand for BigQuery query
Run the following query with a BigQuery destination:
SELECT
destination_id,
project_type,
date_trunc(measured_date, month) AS measured_month,
SUM(model_runs) AS model_runs
FROM <destination_db>.<destination_schema>.transformation_runs
WHERE free_type = 'PAID'
GROUP BY destination_id, project_type, measured_month
ORDER BY destination_id, project_type, measured_month;
This query depends on the TRANSFORMATION_RUNS
table.
Redshift and Snowflake
Expand for Redshift and Snowflake query
Run the following query with a Redshift or Snowflake destination:
SELECT
destination_id,
project_type,
date_trunc(`month`, measured_date) AS measured_month,
SUM(model_runs) AS model_runs
FROM <destination_db>.<destination_schema>.transformation_runs
WHERE free_type = 'PAID'
GROUP BY destination_id, project_type, measured_month
ORDER BY destination_id, project_type, measured_month;
This query depends on the TRANSFORMATION_RUNS
table.
Check connection status
Check sync start and end times
Expand for universal query
Run the following query with a BigQuery, Redshift, Databricks or Snowflake destination:
SELECT
connection_id,
message_event,
time_stamp AS process_start_time
FROM <destination_db>.<destination_schema>.log
WHERE message_event = 'sync_start' OR message_event = 'sync_end'
ORDER BY time_stamp DESC;
This query depends on the LOG table.
Troubleshoot errors and warnings
Expand for universal query
Run the following query with a BigQuery, Redshift, Databricks or Snowflake destination:
SELECT connection_id, time_stamp, event, message_data
FROM <destination_db>.<destination_schema>.log
WHERE event = 'WARNING' OR event = 'SEVERE'
ORDER BY time_stamp DESC;
This query depends on the LOG table.
Check records modified since last sync
The sample queries below return the volume of data that has been inserted, updated, or deleted since your last successful sync. They also return the timestamp of your connection's last record modification. Query results are at the connection level.
Use the sample query for your destination:
NOTE: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuery modified records since last sync
Expand for BigQuery query
WITH parse_json AS (
SELECT
time_stamp,
JSON_EXTRACT(message_data, '$.schema') AS connection_schema,
CAST(JSON_EXTRACT(message_data, '$.count') AS int64) AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connection_id) AS last_sync_completed_at
FROM <destination_db>.<destination_schema>.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
SELECT
connection_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connection_schema
ORDER BY row_volume_since_last_sync DESC
;
This query depends on the LOG table.
Redshift modified records since last sync
Expand for Redshift query
WITH parse_json AS (
SELECT
time_stamp,
json_extract_path_text(message_data, 'schema') AS connection_schema,
json_extract_path_text(message_data, 'count')::integer AS row_volume,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connection_id) AS last_sync_completed_at
FROM <destination_db>.<destination_schema>.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
SELECT
connection_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN row_volume ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connection_schema
ORDER BY row_volume_since_last_sync DESC
;
This query depends on the LOG table.
Snowflake modified records since last sync
Expand for Snowflake query
WITH parse_json AS (
SELECT
time_stamp,
PARSE_JSON(message_data) AS message_data,
message_event,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp ELSE NULL END) OVER(PARTITION BY connection_id) AS last_sync_completed_at
FROM <destination_db>.<destination_schema>.log
WHERE message_event = 'records_modified'
OR message_event = 'sync_end'
)
SELECT
message_data:schema AS connection_schema,
MAX(time_stamp) AS last_records_modified_at,
SUM(CASE WHEN time_stamp > last_sync_completed_at OR last_sync_completed_at IS NULL THEN message_data:count::integer ELSE 0 END) AS row_volume_since_last_sync
FROM parse_json
WHERE message_event = 'records_modified'
GROUP BY connection_schema
ORDER BY row_volume_since_last_sync DESC
;
This query depends on the LOG table.
Check daily modified records
The sample queries below return the volume of data that has been inserted, updated, or deleted each day. Query results are at the table level.
Use the sample query for your destination:
NOTE: If you want to filter your results based on data modification type (for example, view inserts only), use the
operationType
field in themessage_data
JSON object.
BigQuery daily records
Expand for BigQuery query
SELECT
DATE_TRUNC(CAST(time_stamp AS date), day) AS date_day,
JSON_EXTRACT(message_data, '$.schema') AS schema,
JSON_EXTRACT(message_data, '$.table') AS table,
SUM(CAST(JSON_EXTRACT(message_data, '$.count') AS int64)) AS row_volume
FROM <destination_db>.<destination_schema>.log
WHERE DATE_DIFF(CAST(CURRENT_DATE() AS date), CAST(time_stamp AS date), DAY) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, table
ORDER BY date_day DESC;
This query depends on the LOG table.
Redshift daily records
Expand for Redshift query
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
JSON_EXTRACT_PATH_TEXT(message_data, 'schema') AS schema,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS "table",
SUM(JSON_EXTRACT_PATH_TEXT(message_data, 'count')::integer) AS row_volume
FROM <destination_db>.<destination_schema>.log
WHERE
DATEDIFF(day, time_stamp, current_date) < 30
AND message_event = 'records_modified'
GROUP BY date_day, schema, "table"
ORDER BY date_day DESC;
This query depends on the LOG table.
Snowflake daily records
Expand for Snowflake query
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE DATEDIFF(DAY, time_stamp, current_date) < 30
AND message_event = 'records_modified'
)
SELECT
date_day,
message_data:schema AS "schema",
message_data:table AS "table",
SUM(message_data:count::integer) AS row_volume
FROM parse_json
GROUP BY date_day, "schema", "table"
ORDER BY date_day DESC;
This query depends on the LOG table.
Audit user actions within connection
The following sample query returns various user actions that have been made within a connection for audit-trail purposes. This can be helpful when trying to trace a user action to a log event such as a schema change, sync frequency update, manual update, broken connection, and more.
Use the sample query for your destination:
BigQuery user actions within connection
Expand for BigQuery query
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn, -- DAYNAME equivalent
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow, -- DAYOFWEEK
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_SCALAR(message_data, '$.actor') AS acting_user
FROM
`<project>.<dataset>.log`
)
SELECT *
FROM parsed
WHERE acting_user IS NOT NULL;
This query depends on the LOG table.
Redshift user actions within connection
Expand for Redshift query
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn, -- DAYNAME equivalent
EXTRACT(DOW FROM time_stamp) AS dow, -- 0=Sunday, 6=Saturday
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_PATH_TEXT(message_data, 'actor') AS acting_user
FROM
<schema>.log
)
SELECT *
FROM parsed
WHERE acting_user IS NOT NULL;
This query depends on the LOG table.
Snowflake user actions within connection
Expand for Snowflake query
with parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM
<destination_db>.<destination_schema>.log
),
t AS (
SELECT
id,
event_time,
dn AS weekday,
dow,
message_event,
connection_id,
message_data:actor AS acting_user
FROM parse_json
)
SELECT
*
FROM t
WHERE acting_user IS NOT NULL
This query depends on the LOG table.
Overview of event averages by day
Use the sample query for your destination:
BigQuery event averages by day
Expand for BigQuery query
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn, -- DAYNAME equivalent
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow, -- Sunday = 1
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_SCALAR(message_data, '$.event') AS extracted_event
FROM `<project>.<dataset>.log`
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
connection_id,
message_event,
dn AS weekday,
ROUND(AVG(event_count)) AS av_event_count,
ROUND(ROUND(AVG(event_count)) + ROUND(AVG(event_count)) * 0.2) AS high_event_variance_value,
ROUND(ROUND(AVG(event_count)) - ROUND(AVG(event_count)) * 0.2) AS low_event_variance_value,
ROUND(AVG(event_count) * 0.2) AS event_var_increment,
ROUND(STDDEV_POP(event_count)) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, dn
ORDER BY connection_id, message_event, dow
)
SELECT * FROM ev;
This query depends on the LOG table.
Redshift event averages by day
Expand for Redshift query
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn, -- DAYNAME equivalent
EXTRACT(DOW FROM time_stamp) AS dow, -- 0 = Sunday
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_PATH_TEXT(message_data, 'event') AS extracted_event
FROM <schema>.log
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
connection_id,
message_event,
dn AS weekday,
ROUND(AVG(event_count)) AS av_event_count,
ROUND(ROUND(AVG(event_count)) + ROUND(AVG(event_count)) * 0.2) AS high_event_variance_value,
ROUND(ROUND(AVG(event_count)) - ROUND(AVG(event_count)) * 0.2) AS low_event_variance_value,
ROUND(AVG(event_count) * 0.2) AS event_var_increment,
ROUND(STDDEV_SAMP(event_count)) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, dn
ORDER BY connection_id, message_event, dow
)
SELECT * FROM ev;
This query depends on the LOG table.
Snowflake event averages by day
Expand for Snowflake query
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parse_json
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
t.connection_id,
t.message_event,
t.dn AS weekday,
ROUND(AVG(t.event_count)) AS av_event_count,
ROUND(ROUND(AVG(t.event_count)) + ROUND(AVG(t.event_count)) * .2) AS high_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) - ROUND(AVG(t.event_count)) * .2) AS low_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) * .2) AS event_var_increment,
ROUND(STDDEV(t.event_count)) AS standard_deviation
FROM t
GROUP BY t.connection_id, t.message_event,t.dow, t.dn
ORDER BY t.connection_id, t.message_event,t.dow
)
SELECT * FROM ev
This query depends on the LOG table.
Assign your own variance logic and monitor your environment at event level
BigQuery monitor environment at event level
Expand for instructions in BigQuery
NOTE: This query is run against the
fivetran_log_event_averages
table. A few example implementations to create the tablefivetran_log_event_averages
are are provided in the following sections:
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn,
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id
FROM `<project>.<dataset>.log`
),
ec AS (
SELECT
COUNT(id) AS event_count,
date_day,
dn,
dow,
message_event,
connection_id
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
av AS (
SELECT *
FROM `<project>.<dataset>.fivetran_log_event_averages`
)
SELECT
ec.date_day,
ec.dn AS weekday,
ec.connection_id,
ec.message_event,
ec.event_count AS total_events,
av.av_event_count,
av.high_event_variance_value,
av.low_event_variance_value,
av.standard_deviation,
av.event_var_increment,
CASE
WHEN ec.event_count > av.high_event_variance_value THEN 'Event_Variance'
WHEN ec.event_count < av.low_event_variance_value THEN 'Event_Variance'
ELSE 'Standard'
END AS event_variance_flag
FROM ec
JOIN av
ON ec.connection_id = av.connection_id
AND ec.message_event = av.message_event
AND ec.dn = av.weekday
ORDER BY ec.date_day, ec.dow, ec.connection_id, ec.message_event;
This query depends on the LOG table.
BigQuery fivetran_log_event_averages
implementation options
Option 1: dbt implementation
Create the model file: models/<destination_schema>/fivetran_log_event_averages.sql
:
{{ config(
materialized = 'table',
schema = <destination_schema> -- Replace <destination_schema> with your actual schema name
) }}
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn,
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id
FROM `<project>.<dataset>.log`
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
connection_id,
message_event,
dn AS weekday,
ROUND(AVG(event_count)) AS av_event_count,
ROUND(ROUND(AVG(event_count)) + ROUND(AVG(event_count)) * 0.2) AS high_event_variance_value,
ROUND(ROUND(AVG(event_count)) - ROUND(AVG(event_count)) * 0.2) AS low_event_variance_value,
ROUND(AVG(event_count) * 0.2) AS event_var_increment,
ROUND(STDDEV_POP(event_count)) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, dn
)
SELECT * FROM ev;
Update models/sources.yml
:
version: 2
sources:
- name: fivetran
database: "{{ var('fivetran_database') }}"
schema: "{{ var('fivetran_schema') }}"
tables:
- name: log
Option 2: Direct BigQuery create table as...
CREATE OR REPLACE TABLE <destination_db>.<destination_schema>.fivetran_log_event_averages AS
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn,
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id
FROM `<project>.<dataset>.log`
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
connection_id,
message_event,
dn AS weekday,
ROUND(AVG(event_count)) AS av_event_count,
ROUND(ROUND(AVG(event_count)) + ROUND(AVG(event_count)) * 0.2) AS high_event_variance_value,
ROUND(ROUND(AVG(event_count)) - ROUND(AVG(event_count)) * 0.2) AS low_event_variance_value,
ROUND(AVG(event_count) * 0.2) AS event_var_increment,
ROUND(STDDEV_POP(event_count)) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, dn
)
SELECT * FROM ev;
Snowflake monitor environment at event level
Expand for instructions in Snowflake
NOTE: This query is run against the
fivetran_log_event_averages
table. A few example implementations to create the tablefivetran_log_event_averages
are provided in the following sections:
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
),
ec AS (
SELECT
COUNT(id) AS event_count,
date_day,
dn,
dow,
message_event,
connection_id
FROM parse_json
GROUP BY date_day, dn, dow, connection_id, message_event
ORDER BY connection_id, message_event ASC
)
, av AS (
SELECT
ev.connection_id,
ev.weekday,
ev.message_event,
ev.av_event_count,
ev.high_event_variance_value,
ev.low_event_variance_value,
ev.event_var_increment,
ev.standard_deviation
FROM <destination_db>.<destination_schema>.fivetran_log_event_averages ev
)
SELECT
ec.date_day,
ec.dn AS weekday,
ec.connection_id,
ec.message_event,
ec.event_count AS total_events,
av.av_event_count,
av.high_event_variance_value,
av.low_event_variance_value,
av.standard_deviation,
av.event_var_increment,
CASE WHEN ec.event_count > av.high_event_variance_value THEN 'Event_Variance'
WHEN ec.event_count < av.low_event_variance_value THEN 'Event_Variance'
else 'Standard'
END AS event_variance_flag
FROM ec
INNER JOIN av ON av.connection_id = ec.connection_id AND av.message_event = ec.message_event AND av.weekday = ec.dn
ORDER BY
ec.date_day,
ec.dow,
ec.connection_id,
ec.message_event
This query depends on the LOG table.
Snowflake fivetran_log_event_averages
implementation options
Option 1: dbt implementation
Create the model file: models/<destination_schema>/fivetran_log_event_averages.sql
:
{{ config(
materialized = 'table',
schema = <destination_schema> -- Replace <destination_schema> with your actual schema name
) }}
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM {{ source('fivetran', 'log') }}
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parse_json
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
t.connection_id,
t.message_event,
t.dn AS weekday,
ROUND(AVG(t.event_count)) AS av_event_count,
ROUND(ROUND(AVG(t.event_count)) + ROUND(AVG(t.event_count)) * .2) AS high_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) - ROUND(AVG(t.event_count)) * .2) AS low_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) * .2) AS event_var_increment,
ROUND(STDDEV(t.event_count)) AS standard_deviation
FROM t
GROUP BY t.connection_id, t.message_event, t.dow, t.dn
ORDER BY t.connection_id, t.message_event, t.dow
)
SELECT * FROM ev
Update models/sources.yml
:
version: 2
sources:
- name: fivetran
database: "{{ var('fivetran_database') }}"
schema: "{{ var('fivetran_schema') }}"
tables:
- name: log
Option 2: Direct Snowflake create table as...
CREATE OR REPLACE TABLE <destination_db>.<destination_schema>.fivetran_log_event_averages AS
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parse_json
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
t.connection_id,
t.message_event,
t.dn AS weekday,
ROUND(AVG(t.event_count)) AS av_event_count,
ROUND(ROUND(AVG(t.event_count)) + ROUND(AVG(t.event_count)) * .2) AS high_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) - ROUND(AVG(t.event_count)) * .2) AS low_event_variance_value,
ROUND(ROUND(AVG(t.event_count)) * .2) AS event_var_increment,
ROUND(STDDEV(t.event_count)) AS standard_deviation
FROM t
GROUP BY t.connection_id, t.message_event, t.dow, t.dn
ORDER BY t.connection_id, t.message_event, t.dow
)
SELECT * FROM ev
Redshift monitor environment at event level
Expand for instructions in Redshift
NOTE: This query is run against the
fivetran_log_event_averages
table. A few example implementations to create the tablefivetran_log_event_averages
are are provided in the following sections:
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn,
EXTRACT(DOW FROM time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id
FROM <schema>.log
),
ec AS (
SELECT
COUNT(id) AS event_count,
date_day,
dn,
dow,
message_event,
connection_id
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
av AS (
SELECT *
FROM <schema>.fivetran_log_event_averages
)
SELECT
ec.date_day,
ec.dn AS weekday,
ec.connection_id,
ec.message_event,
ec.event_count AS total_events,
av.av_event_count,
av.high_event_variance_value,
av.low_event_variance_value,
av.standard_deviation,
av.event_var_increment,
CASE
WHEN ec.event_count > av.high_event_variance_value THEN 'Event_Variance'
WHEN ec.event_count < av.low_event_variance_value THEN 'Event_Variance'
ELSE 'Standard'
END AS event_variance_flag
FROM ec
JOIN av
ON ec.connection_id = av.connection_id
AND ec.message_event = av.message_event
AND ec.dn = av.weekday
ORDER BY ec.date_day, ec.dow, ec.connection_id, ec.message_event;
This query depends on the LOG table.
Redshift fivetran_log_event_averages
implementation options
Option 1: dbt implementation
Create the model file: models/<destination_schema>/fivetran_log_event_averages.sql
:
{{ config(
materialized = 'table',
schema = <destination_schema> -- Replace <destination_schema> with your actual schema name
) }}
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn,
EXTRACT(DOW FROM time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id
FROM <schema>.log
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
connection_id,
message_event,
dn AS weekday,
ROUND(AVG(event_count)) AS av_event_count,
ROUND(ROUND(AVG(event_count)) + ROUND(AVG(event_count)) * 0.2) AS high_event_variance_value,
ROUND(ROUND(AVG(event_count)) - ROUND(AVG(event_count)) * 0.2) AS low_event_variance_value,
ROUND(AVG(event_count) * 0.2) AS event_var_increment,
ROUND(STDDEV_SAMP(event_count)) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, dn
)
SELECT * FROM ev;
Update models/sources.yml
:
version: 2
sources:
- name: fivetran
database: "{{ var('fivetran_database') }}"
schema: "{{ var('fivetran_schema') }}"
tables:
- name: log
Option 2: Direct Redshift create table as...
DROP TABLE IF EXISTS <destination_db>.<destination_schema>.fivetran_log_event_averages;
CREATE TABLE <destination_db>.<destination_schema>.fivetran_log_event_averages AS
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn,
EXTRACT(DOW FROM time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id
FROM <schema>.log
),
t AS (
SELECT
date_day,
dn,
dow,
message_event,
connection_id,
COUNT(id) AS event_count
FROM parsed
GROUP BY date_day, dn, dow, connection_id, message_event
),
ev AS (
SELECT
connection_id,
message_event,
dn AS weekday,
ROUND(AVG(event_count)) AS av_event_count,
ROUND(ROUND(AVG(event_count)) + ROUND(AVG(event_count)) * 0.2) AS high_event_variance_value,
ROUND(ROUND(AVG(event_count)) - ROUND(AVG(event_count)) * 0.2) AS low_event_variance_value,
ROUND(AVG(event_count) * 0.2) AS event_var_increment,
ROUND(STDDEV_SAMP(event_count)) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, dn
)
SELECT * FROM ev;
Usage
To use the fivetran_log_event_averages
table, you can either run the dbt model or create the table directly via SQL.
dbt
dbt run --select fivetran_log_event_averages
Direct SQL
Run the appropriate CREATE TABLE AS
query for your destination and replace the placeholder values in:
<destination_db>.<destination_schema>.fivetran_log_event_averages
Destination | <destination_db> | <destination_schema> | Example |
---|---|---|---|
Snowflake | Snowflake database name | Schema name | analytics.platform.fivetran_log_event_averages |
BigQuery | GCP project ID | Dataset name | `my-project.my_dataset.fivetran_log_event_averages` |
Redshift | (optional) | Schema name | public.fivetran_log_event_averages |
NOTE: In BigQuery, the full table reference must be enclosed in backticks.
Output Schema
Column | Description |
---|---|
connection_id | Fivetran connection identifier |
message_event | Type of event logged |
weekday | Day of the week |
av_event_count | Average event count |
high_event_variance_value | Upper threshold (120% of average) |
low_event_variance_value | Lower threshold (80% of average) |
event_var_increment | 20% of average event count |
standard_deviation | Standard deviation of event counts |
Review difference in seconds between write_to_table_start
and write_to_table_end
events
Use the sample query for your destination:
BigQuery review difference in seconds
Expand for BigQuery query
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_SCALAR(message_data, '$.table') AS table_name
FROM `<project>.<dataset>.log`
WHERE connection_id = ''
),
t AS (
SELECT
id,
event_time,
message_event,
connection_id,
table_name,
RANK() OVER (PARTITION BY connection_id, table_name ORDER BY event_time ASC) AS rn,
TIMESTAMP_DIFF(event_time, LAG(event_time) OVER (PARTITION BY connection_id, table_name ORDER BY event_time), SECOND) AS seconds_diff
FROM parsed
WHERE message_event IN ('write_to_table_start', 'write_to_table_end')
)
SELECT
id,
event_time,
message_event,
connection_id,
table_name AS `table`,
CASE
WHEN message_event = 'write_to_table_start' AND seconds_diff > 0 THEN 0
ELSE seconds_diff
END AS diff
FROM t
ORDER BY connection_id, table_name, event_time;
This query depends on the LOG table.
Redshift review difference in seconds
Expand for Redshift query
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS table_name
FROM <schema>.log
WHERE connection_id = ''
),
t AS (
SELECT
id,
event_time,
message_event,
connection_id,
table_name,
RANK() OVER (PARTITION BY connection_id, table_name ORDER BY event_time ASC) AS rn,
DATEDIFF(second, LAG(event_time) OVER (PARTITION BY connection_id, table_name ORDER BY event_time), event_time) AS seconds_diff
FROM parsed
WHERE message_event IN ('write_to_table_start', 'write_to_table_end')
)
SELECT
id,
event_time,
message_event,
connection_id,
table_name AS "table",
CASE
WHEN message_event = 'write_to_table_start' AND seconds_diff > 0 THEN 0
ELSE seconds_diff
END AS diff
FROM t
ORDER BY connection_id, table_name, event_time;
This query depends on the LOG table.
Snowflake review difference in seconds
Expand for Snowflake query
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE connection_id = ''
), t AS (
SELECT
id,
event_time,
message_event,
connection_id,
message_data:table AS "table",
RANK() OVER ( ORDER BY connection_id,"table",event_time ASC) AS rn ,
DATEDIFF(second,lag(event_time,1) OVER (ORDER BY connection_id,"table",event_time ASC),event_time) AS seconds_diff
FROM parse_json
WHERE message_event IN ('write_to_table_start','write_to_table_end')
GROUP BY id,connection_id,event_time,message_event,"table"
ORDER BY connection_id,"table",event_time ASC
)
SELECT
t.id,
t.event_time,
t.message_event,
t.connection_id,
t."table",
CASE WHEN t.message_event = 'write_to_table_start'
AND t.seconds_diff > 0
THEN 0 ELSE t.seconds_diff
END AS diff
FROM t
;
This query depends on the LOG table.
Review modified record count data by table
Expand for Snowflake query
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
), t AS (
SELECT
id,
event_time,
dn AS weekday,
dow,
message_event,
connection_id,
message_data:operationType AS optype,
message_data:table AS logtable,
message_data:count AS rowsimpacted
FROM parse_json
WHERE message_event = 'records_modified'
AND logtable <> 'fivetran_audit'
GROUP BY id,connection_id,event_time,dow,weekday,message_event,logtable,optype,rowsimpacted
ORDER BY connection_id,logtable ASC
)
SELECT
connection_id,
message_event,
weekday,
optype,
logtable AS avtable,
CAST(ROUND(AVG(t.rowsimpacted)) AS int) AS avgrow,
ROUND(ROUND(AVG(t.rowsimpacted)) + ROUND(AVG(t.rowsimpacted)) * .2) AS high_variance_value,
ROUND(ROUND(AVG(t.rowsimpacted)) - ROUND(AVG(t.rowsimpacted)) * .2) AS low_variance_value,
ROUND(ROUND(AVG(t.rowsimpacted)) * .2 ) AS var_increment,
IFNULL(ROUND(stddev(t.rowsimpacted)),0) AS standard_deviation
FROM t
GROUP BY connection_id,message_event,dow,weekday,avtable,optype
ORDER BY connection_id,avtable,dow,optype
This query depends on the LOG table.
Assign your own variance logic and monitor your environment at table level
BigQuery monitor environment at table level
Expand for instructions in BigQuery
NOTE: This query is run against the
fivetran_records_modified_averages
table. A few example implementations to create the tablefivetran_records_modified_averages
are provided in the following sections below.
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn,
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_SCALAR(message_data, '$.operationType') AS optype,
JSON_EXTRACT_SCALAR(message_data, '$.table') AS logtable,
CAST(JSON_EXTRACT_SCALAR(message_data, '$.count') AS INT64) AS rowsimpacted
FROM `<destination_db>.<destination_schema>.log`
WHERE message_event = 'records_modified'
AND JSON_EXTRACT_SCALAR(message_data, '$.table') <> 'fivetran_audit'
),
av AS (
SELECT *
FROM `<destination_db>.<destination_schema>.fivetran_records_modified_averages`
)
SELECT
parsed.id,
parsed.dn AS weekday,
parsed.date_day,
parsed.event_time,
parsed.message_event,
parsed.optype,
parsed.connection_id,
parsed.logtable,
parsed.rowsimpacted,
av.avgrow,
av.high_variance_value,
av.low_variance_value,
av.var_increment,
av.standard_deviation,
CASE
WHEN parsed.rowsimpacted > av.high_variance_value THEN 'Variance'
WHEN parsed.rowsimpacted < av.low_variance_value THEN 'Variance'
ELSE 'Standard'
END AS varianceflag
FROM parsed
JOIN av ON
parsed.connection_id = av.connection_id AND
parsed.dn = av.weekday AND
parsed.logtable = av.avtable AND
parsed.optype = av.optype
ORDER BY parsed.connection_id, parsed.dow, parsed.event_time, parsed.logtable;
This query depends on the LOG table.
BigQuery fivetran_records_modified_averages
implementation options
Option 1: dbt implementation
Create the model file models/<destination_schema>/fivetran_records_modified_averages.sql
:
{{ config(
materialized = 'table',
schema = <destination_schema>
) }}
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn,
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow,
id,
time_stamp,
message_event,
connection_id,
JSON_EXTRACT_SCALAR(message_data, '$.operationType') AS optype,
JSON_EXTRACT_SCALAR(message_data, '$.table') AS logtable,
CAST(JSON_EXTRACT_SCALAR(message_data, '$.count') AS INT64) AS rowsimpacted
FROM {{ source('fivetran', 'log') }}
WHERE message_event = 'records_modified'
AND JSON_EXTRACT_SCALAR(message_data, '$.table') <> 'fivetran_audit'
),
t AS (
SELECT
connection_id,
message_event,
dn AS weekday,
dow,
logtable AS avtable,
optype,
rowsimpacted
FROM parsed
),
ev AS (
SELECT
connection_id,
message_event,
weekday,
optype,
avtable,
CAST(ROUND(AVG(rowsimpacted)) AS INT64) AS avgrow,
ROUND(AVG(rowsimpacted) + AVG(rowsimpacted) * 0.2) AS high_variance_value,
ROUND(AVG(rowsimpacted) - AVG(rowsimpacted) * 0.2) AS low_variance_value,
ROUND(AVG(rowsimpacted) * 0.2) AS var_increment,
IFNULL(ROUND(STDDEV_POP(rowsimpacted)), 0) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, weekday, dow, avtable, optype
)
SELECT * FROM ev;
Update models/sources.yml
:
version: 2
sources:
- name: fivetran
database: "{{ var('fivetran_database') }}"
schema: "{{ var('fivetran_schema') }}"
tables:
- name: log
Option 2: Direct BigQuery create table as...
CREATE OR REPLACE TABLE <destination_db>.<destination_schema>.fivetran_records_modified_averages AS
WITH parsed AS (
SELECT
DATE_TRUNC(DATE(time_stamp), DAY) AS date_day,
FORMAT_DATE('%A', DATE(time_stamp)) AS dn,
EXTRACT(DAYOFWEEK FROM DATE(time_stamp)) AS dow,
id,
time_stamp,
message_event,
connection_id,
JSON_EXTRACT_SCALAR(message_data, '$.operationType') AS optype,
JSON_EXTRACT_SCALAR(message_data, '$.table') AS logtable,
CAST(JSON_EXTRACT_SCALAR(message_data, '$.count') AS INT64) AS rowsimpacted
FROM `<destination_db>.<destination_schema>.log`
WHERE message_event = 'records_modified'
AND JSON_EXTRACT_SCALAR(message_data, '$.table') <> 'fivetran_audit'
),
t AS (
SELECT
connection_id,
message_event,
dn AS weekday,
dow,
logtable AS avtable,
optype,
rowsimpacted
FROM parsed
),
ev AS (
SELECT
connection_id,
message_event,
weekday,
optype,
avtable,
CAST(ROUND(AVG(rowsimpacted)) AS INT64) AS avgrow,
ROUND(AVG(rowsimpacted) + AVG(rowsimpacted) * 0.2) AS high_variance_value,
ROUND(AVG(rowsimpacted) - AVG(rowsimpacted) * 0.2) AS low_variance_value,
ROUND(AVG(rowsimpacted) * 0.2) AS var_increment,
IFNULL(ROUND(STDDEV_POP(rowsimpacted)), 0) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, weekday, dow, avtable, optype
)
SELECT * FROM ev;
Snowflake monitor environment at table level
Expand for instructions in Snowflake
NOTE: This query is run against the
fivetran_records_modified_averages
table. A few example implementations to create the tablefivetran_records_modified_averages
are provided in the following sections below.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE message_event = 'records_modified'
),
t AS (
SELECT
id,
date_day,
dn,
dow,
event_time,
message_event,
connection_id,
message_data:operationType AS optype,
message_data:table AS logtable,
CAST(message_data:count AS INT) AS rowsimpacted
FROM parse_json
WHERE message_data:table <> 'fivetran_audit'
),
av AS (
SELECT *
FROM <destination_db>.<destination_schema>.fivetran_records_modified_averages
)
SELECT
t.id,
t.dn AS weekday,
t.date_day,
t.event_time,
t.message_event,
t.optype,
t.connection_id,
t.logtable,
t.rowsimpacted,
av.avgrow,
av.high_variance_value,
av.low_variance_value,
av.var_increment,
av.standard_deviation,
CASE
WHEN t.rowsimpacted > av.high_variance_value THEN 'Variance'
WHEN t.rowsimpacted < av.low_variance_value THEN 'Variance'
ELSE 'Standard'
END AS varianceflag
FROM t
JOIN av ON
t.connection_id = av.connection_id AND
t.dn = av.weekday AND
t.logtable = av.avtable AND
t.optype = av.optype
ORDER BY t.connection_id, t.dow, t.event_time, t.logtable;
This query depends on the LOG table.
Snowflake fivetran_records_modified_averages
implementation options
Option 1: dbt implementation
Create the model file models/<destination_schema>/fivetran_records_modified_averages.sql
:
{{ config(
materialized = 'table',
schema = <destination_schema>
) }}
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM {{ source('fivetran', 'log') }}
WHERE message_event = 'records_modified'
),
t AS (
SELECT
connection_id,
message_event,
dn AS weekday,
dow,
message_data:table AS avtable,
message_data:operationType AS optype,
CAST(message_data:count AS INT) AS rowsimpacted
FROM parse_json
WHERE message_data:table <> 'fivetran_audit'
),
ev AS (
SELECT
connection_id,
message_event,
weekday,
optype,
avtable,
CAST(ROUND(AVG(rowsimpacted)) AS INT) AS avgrow,
ROUND(ROUND(AVG(rowsimpacted)) + ROUND(AVG(rowsimpacted)) * 0.2) AS high_variance_value,
ROUND(ROUND(AVG(rowsimpacted)) - ROUND(AVG(rowsimpacted)) * 0.2) AS low_variance_value,
ROUND(ROUND(AVG(rowsimpacted)) * 0.2) AS var_increment,
IFNULL(ROUND(STDDEV(rowsimpacted)), 0) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, weekday, avtable, optype
)
SELECT * FROM ev;
Update models/sources.yml
:
version: 2
sources:
- name: fivetran
database: "{{ var('fivetran_database') }}"
schema: "{{ var('fivetran_schema') }}"
tables:
- name: log
Option 2: Direct Snowflake create table as...
CREATE OR REPLACE TABLE <destination_db>.<destination_schema>.fivetran_records_modified_averages AS
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE message_event = 'records_modified'
),
t AS (
SELECT
connection_id,
message_event,
dn AS weekday,
dow,
message_data:table AS avtable,
message_data:operationType AS optype,
CAST(message_data:count AS INT) AS rowsimpacted
FROM parse_json
WHERE message_data:table <> 'fivetran_audit'
),
ev AS (
SELECT
connection_id,
message_event,
weekday,
optype,
avtable,
CAST(ROUND(AVG(rowsimpacted)) AS INT) AS avgrow,
ROUND(ROUND(AVG(rowsimpacted)) + ROUND(AVG(rowsimpacted)) * 0.2) AS high_variance_value,
ROUND(ROUND(AVG(rowsimpacted)) - ROUND(AVG(rowsimpacted)) * 0.2) AS low_variance_value,
ROUND(ROUND(AVG(rowsimpacted)) * 0.2) AS var_increment,
IFNULL(ROUND(STDDEV(rowsimpacted)), 0) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, weekday, avtable, optype
)
SELECT * FROM ev;
Redshift monitor environment at table level
Expand for instructions in Redshift
NOTE: This query is run against the
fivetran_records_modified_averages
table. A few example implementations to create the tablefivetran_records_modified_averages
are provided in the following sections below.
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn,
EXTRACT(DOW FROM time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
JSON_EXTRACT_PATH_TEXT(message_data, 'operationType') AS optype,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS logtable,
CAST(JSON_EXTRACT_PATH_TEXT(message_data, 'count') AS INT) AS rowsimpacted
FROM <schema>.log
WHERE message_event = 'records_modified'
AND JSON_EXTRACT_PATH_TEXT(message_data, 'table') <> 'fivetran_audit'
),
av AS (
SELECT *
FROM <schema>.fivetran_records_modified_averages
)
SELECT
parsed.id,
parsed.dn AS weekday,
parsed.date_day,
parsed.event_time,
parsed.message_event,
parsed.optype,
parsed.connection_id,
parsed.logtable,
parsed.rowsimpacted,
av.avgrow,
av.high_variance_value,
av.low_variance_value,
av.var_increment,
av.standard_deviation,
CASE
WHEN parsed.rowsimpacted > av.high_variance_value THEN 'Variance'
WHEN parsed.rowsimpacted < av.low_variance_value THEN 'Variance'
ELSE 'Standard'
END AS varianceflag
FROM parsed
JOIN av ON
parsed.connection_id = av.connection_id AND
parsed.dn = av.weekday AND
parsed.logtable = av.avtable AND
parsed.optype = av.optype
ORDER BY parsed.connection_id, parsed.dow, parsed.event_time, parsed.logtable;
This query depends on the LOG table.
Redshift fivetran_records_modified_averages
implementation options
Option 1: dbt implementation
Create the model file models/<destination_schema>/fivetran_records_modified_averages.sql
:
{{ config(
materialized = 'table',
schema = <destination_schema>
) }}
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn,
EXTRACT(DOW FROM time_stamp) AS dow,
id,
time_stamp,
message_event,
connection_id,
JSON_EXTRACT_PATH_TEXT(message_data, 'operationType') AS optype,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS logtable,
CAST(JSON_EXTRACT_PATH_TEXT(message_data, 'count') AS INT) AS rowsimpacted
FROM {{ source('fivetran', 'log') }}
WHERE message_event = 'records_modified'
AND JSON_EXTRACT_PATH_TEXT(message_data, 'table') <> 'fivetran_audit'
),
t AS (
SELECT
connection_id,
message_event,
dn AS weekday,
dow,
logtable AS avtable,
optype,
rowsimpacted
FROM parsed
),
ev AS (
SELECT
connection_id,
message_event,
weekday,
optype,
avtable,
ROUND(AVG(rowsimpacted)) AS avgrow,
ROUND(AVG(rowsimpacted) + AVG(rowsimpacted) * 0.2) AS high_variance_value,
ROUND(AVG(rowsimpacted) - AVG(rowsimpacted) * 0.2) AS low_variance_value,
ROUND(AVG(rowsimpacted) * 0.2) AS var_increment,
COALESCE(ROUND(STDDEV_SAMP(rowsimpacted)), 0) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, weekday, avtable, optype
)
SELECT * FROM ev;
Update models/sources.yml
:
version: 2
sources:
- name: fivetran
database: "{{ var('fivetran_database') }}"
schema: "{{ var('fivetran_schema') }}"
tables:
- name: log
Option 2: Direct Redshift create table as...
DROP TABLE IF EXISTS <destination_db>.<destination_schema>.fivetran_records_modified_averages;
CREATE TABLE <destination_db>.<destination_schema>.fivetran_records_modified_averages AS
WITH parsed AS (
SELECT
DATE_TRUNC('day', time_stamp) AS date_day,
TO_CHAR(time_stamp, 'Day') AS dn,
EXTRACT(DOW FROM time_stamp) AS dow,
id,
time_stamp,
message_event,
connection_id,
JSON_EXTRACT_PATH_TEXT(message_data, 'operationType') AS optype,
JSON_EXTRACT_PATH_TEXT(message_data, 'table') AS logtable,
CAST(JSON_EXTRACT_PATH_TEXT(message_data, 'count') AS INT) AS rowsimpacted
FROM <schema>.log
WHERE message_event = 'records_modified'
AND JSON_EXTRACT_PATH_TEXT(message_data, 'table') <> 'fivetran_audit'
),
t AS (
SELECT
connection_id,
message_event,
dn AS weekday,
dow,
logtable AS avtable,
optype,
rowsimpacted
FROM parsed
),
ev AS (
SELECT
connection_id,
message_event,
weekday,
optype,
avtable,
ROUND(AVG(rowsimpacted)) AS avgrow,
ROUND(AVG(rowsimpacted) + AVG(rowsimpacted) * 0.2) AS high_variance_value,
ROUND(AVG(rowsimpacted) - AVG(rowsimpacted) * 0.2) AS low_variance_value,
ROUND(AVG(rowsimpacted) * 0.2) AS var_increment,
COALESCE(ROUND(STDDEV_SAMP(rowsimpacted)), 0) AS standard_deviation
FROM t
GROUP BY connection_id, message_event, dow, weekday, avtable, optype
)
SELECT * FROM ev;
Usage
To use the fivetran_records_modified_averages
table, you can either run the dbt model or create the table directly via SQL.
dbt
dbt run --select fivetran_records_modified_averages
Direct SQL
Run the appropriate CREATE TABLE AS
query for your destination and replace the placeholder values in:
<destination_db>.<destination_schema>.fivetran_records_modified_averages
Destination | <destination_db> | <destination_schema> | Example |
---|---|---|---|
Snowflake | Snowflake database name | Schema name | analytics.platform.fivetran_records_modified_averages |
BigQuery | GCP project ID | Dataset name | `my-project.my_dataset.fivetran_records_modified_averages` |
Redshift | (optional) | Schema name | public.fivetran_records_modified_averages |
NOTE: In BigQuery, the full table reference must be enclosed in backticks.
Output Schema
Column | Description |
---|---|
connection_id | Fivetran connection identifier |
message_event | Type of event (always records_modified in this model) |
weekday | Day of the week |
optype | Operation type (insert , update , delete ) |
avtable | Table name affected by the operation |
avgrow | Average number of rows impacted |
high_variance_value | Upper threshold (120% of average row count) |
low_variance_value | Lower threshold (80% of average row count) |
var_increment | 20% of average row count |
standard_deviation | Standard deviation of row counts |
dbt transformation data
Expand for Snowflake query
The following sample query returns dbt transformation data for a given event.
WITH a AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
sync_id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE message_event IN ('transformation_start','transformation_succeeded','transformation_failed')
)
Select
message_data:dbtJobId::string AS dbtJobId,
message_data:dbtJobName::string AS dbtJobName,
message_data:dbtJobType::string AS dbtJobType,
message_data:startTime::timestamp AS startTime,
message_data:endTime::timestamp AS endTime,
message_data:result:stepResults[0]:success::boolean AS success,
message_data:models AS models,
message_data:result:stepResults AS stepResults,
message_data:startupDetails AS startupDetails,
message_data:result:stepResults[0]:knownFailedModels AS knownFailedModels,
message_data:result:stepResults[0]:knownSuccessfulModels AS knownSuccessfulModels,
message_data
FROM a
This query depends on the LOG table.
Transformation data
Expand for Snowflake query
The following sample query returns transformation data for a given event.
WITH a AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
sync_id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE message_event IN ('transformation_start','transformation_succeeded','transformation_failed')
)
Select
message_data:id::string AS jobId,
message_data:name::string AS jobName,
message_data:transformationType::string AS transformationType,
message_data:startTime::timestamp AS startTime,
message_data:endTime::timestamp AS endTime,
message_data:schedule AS schedule,
message_data:result:stepResults AS stepResults,
message_data:description AS resultsSummary,
message_data
FROM a
This query depends on the LOG table.
Sync events
Expand for Snowflake query
The following sample query returns all events FROM a given sync.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
id,
sync_id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE sync_id = ''
), t AS (
SELECT
id,
sync_id,
event_time,
message_event,
connection_id,
message_data,
message_data:table AS "table",
message_data:query AS query,
RANK() OVER ( ORDER BY sync_id, connection_id,event_time ASC) AS rn ,
DATEDIFF(second,lag(event_time,1) over (ORDER BY sync_id, connection_id,event_time ASC),event_time) AS seconds_diff
FROM parse_json
GROUP BY id,sync_id,connection_id,event_time,message_event,message_data,"table"
ORDER BY connection_id,"table",event_time ASC
)
SELECT
t.id,
t.sync_id,
t.event_time,
t.message_event,
t.message_data,
t.connection_id,
t.query,
t."table",
CASE WHEN t.message_event = 'write_to_table_start'
AND t.seconds_diff > 0
THEN 0 else t.seconds_diff
END AS diff,
t.rn
FROM t
ORDER BY t.sync_id, t.event_time ASC
This query depends on the LOG table.
Sync statistics
Expand for Snowflake query
The following sample query returns sync statistics for PostgreSQL, Oracle, MySQL, and SQL Server connections.
WITH parse_json AS (
SELECT
DATE_TRUNC('DAY', time_stamp) AS date_day,
DAYNAME(time_stamp) AS dn,
DAYOFWEEK(time_stamp) AS dow,
id,
time_stamp AS event_time,
message_event,
connection_id,
PARSE_JSON(message_data) AS message_data
FROM <destination_db>.<destination_schema>.log
WHERE message_event = 'sync_stats'
),t AS (
SELECT
id,
event_time,
dn AS weekday,
dow,
message_event,
connection_id,
message_data:extract_time_s AS extract_time_s,
message_data:extract_volume_mb AS extract_volume_mb,
message_data:load_time_s AS load_time_s,
message_data:load_volume_mb AS load_volume_mb,
message_data:process_time_s AS process_time_s,
message_data:process_volume_mb AS process_volume_mb,
message_data:total_time_s AS total_time_s
FROM parse_json
)
SELECT * FROM t ORDER BY extract_time_s DESC
This query depends on the LOG table.
API extract_summary data
Expand for Snowflake query
The following sample query returns extract summary log data.
NOTE: If the query fails when using
INNER JOIN <destination_schema>.connection c on l.connection_id = c.connection_id
, re-sync your Fivetran Platform connection.
WITH es as(
SELECT
c.connection_name,
l.time_stamp,
PARSE_JSON(message_data) as md
FROM Fivetran_destination_database. <destination_schema>.log l
INNER JOIN <destination_schema>.connection c on l.connection_id = c.connection_id
WHERE message_event = 'extract_summary'
ORDER BY l._fivetran_synced DESC
)
SELECT
connection_name,
time_stamp,
md:status,
md:total_queries,
md:total_rows,
md:total_size,
md:rounded_total_size,
md:objects
FROM es
This query depends on the LOG table.
API extract_summary object data
Expand for Snowflake query
The following sample query returns extract summary log data for API objects.
NOTE: If the query fails when using
INNER JOIN <destination_schema>.connection c on l.connection_id = c.connection_id
, re-sync your Fivetran Platform connection.
WITH es as(
SELECT
c.connection_name,
l.time_stamp,
PARSE_JSON(message_data) as md
FROM Fivetran_destination_database. <destination_schema>.log l
INNER JOIN <destination_schema>.connection c on l.connection_id = c.connection_id
WHERE message_event = 'extract_summary'
ORDER BY l._fivetran_synced DESC
)
,eso
AS(
SELECT
connection_name,
time_stamp,
md:status,
md:total_queries,
md:total_rows,
md:total_size,
md:rounded_total_size,
PARSE_JSON(md:objects) as o
FROM es
)
SELECT
eso.connection_name,
value:name AS name,
value:queries AS queries
FROM eso,
LATERAL FLATTEN(input => PARSE_JSON(o))
This query depends on the LOG table.
Check metadata
IMPORTANT: You must have an Enterprise plan or higher to query metadata.
The Fivetran Platform Connector provides access to metadata for data synced by Fivetran, which helps you understand the mapping between the source and destination. The data retrieved can be easily consumed in BI tools, data catalogs, or through direct SQL queries.
The data retrieved helps organizations:
- Understand data synced by Fivetran
- Audit and enforce access control
- Retrieve metadata changes
The following queries return normalized tables with information on source and destination connections, schemas, tables, and columns.
Check which data moved through Fivetran
Expand for universal query
The query includes source/destination mapping so that it can be filtered by source.connectionId
.
SELECT * FROM <destination_db>.<destination_schema>.connection c
JOIN <destination_db>.<destination_schema>.source_schema_metadata ssm
ON c.connection_id = ssm.connection_id
JOIN <destination_db>.<destination_schema>.source_table_metadata stm
ON stm.schema_id = ssm.id
JOIN <destination_db>.<destination_schema>.source_column_metadata scm
ON scm.table_id = stm.id
JOIN <destination_db>.<destination_schema>.schema_lineage sl
ON ssm.id = sl.source_schema_id
JOIN <destination_db>.<destination_schema>.table_lineage tl
ON stm.id = tl.source_table_id
JOIN <destination_db>.<destination_schema>.column_lineage cl
ON scm.id = cl.source_column_id
JOIN <destination_db>.<destination_schema>.destination_schema_metadata dsm
ON sl.destination_schema_id = dsm.id
JOIN <destination_db>.<destination_schema>.destination_table_metadata dtm
ON tl.destination_table_id = dtm.id
JOIN <destination_db>.<destination_schema>.destination_column_metadata dcm
ON cl.destination_column_id = dcm.id
LEFT JOIN <destination_db>.<destination_schema>.source_foreign_key_metadata sfkm
ON sfkm.column_id = scm.id
This query depends on the following tables:
What is this data a reference to
Expand for universal query
SELECT * FROM <destination_db>.<destination_schema>.source_column_metadata scm
JOIN <destination_db>.<destination_schema>.column_lineage cl
ON scm.id = cl.source_column_id
WHERE cl.destination_column_id = %column_id%
This query depends on the following tables:
What downstream assets are impacted by this data
Expand for universal query
SELECT * FROM <destination_db>.<destination_schema>.destination_column_metadata dcm
JOIN <destination_db>.<destination_schema>.column_lineage cl
ON dcm.id = cl.destination_column_id
WHERE cl.source_column_id = %column_id%
This query depends on the following tables:
Check severe messages of SDK connections
Expand for universal query
SELECT message, event_time, connection_id, sync_id
FROM <destination_db>.<destination_schema>.connector_sdk_log
WHERE level = 'SEVERE' and message_origin = 'connector_sdk'
ORDER BY event_time DESC;
This query depends on the CONNECTOR_SDK_LOG table.