Historical & Incremental Pipelines - Snowflake to BigQuery

Remya Raj
Searce
Published in
9 min readFeb 16, 2022

--

Both Snowflake and BigQuery are top data warehouses in cloud and both are managed services, best suited for enterprises who are adopting modern cloud platforms and services. Each has its unique features, from architecture to pricing; it depends on the organization’s use cases and requirements on analytics and machine learning to pick the best.

Based on the data strategy, it is quite possible for companies to migrate from one data warehouse to another; and that’s where the data migration methodologies are crucial and time-bound.

Here in this blog, we will cover the data migration approach from Snowflake to BigQuery — Historical & Incremental.

Business Case :

The use case is to move to Google Cloud Platform and utilize the best of cloud services like Machine Learning, based on Forrester’s analysis, BigQuery have a good edge over Snowflake in AI use cases over massive datasets. Streaming, external & federated sources, dynamic scaling can also be provisioned in BigQuery. BigQuery has a Query-based pricing model compared to Time-based pricing model in Snowflake, which makes BigQuery a slightly cheaper affair.

Also, BigQuery has been optimized and tuned over many years by Google, so the platform is more mature and managed compared to Snowflake.

Solution Approach :

Data migration to any data warehouse or data lake will have 2 parts -

  1. Data Migration — Data migration is a one-time historical data movement from source to destination, Snowflake to BigQuery in our case.
  2. Data Integration — Data integration, on the other hand, is continuous data pipelines or batch processes that would load the data to the target system.

Prerequisites :

  1. Snowflake Account with running warehouse
  2. GCP account with editor/owner access to BigQuery, GCS
  3. Google Cloud Storage bucket
  4. BigQuery dataset and table
  5. Access to Cloud Shell
  6. Access to Data Transfer Service
  7. Virtual Machine with Apache Airflow installed in it
  8. Snowflake-connector-python version 2.4.1.
  9. Snowflake-sqlalchemy 1.2.4
  10. Pandas 1.4.0.

Historical Data Migration :

Since the data in Snowflake is already deduped and cleansed, it can be copied directly to Google Cloud Storage bucket. COPY INTO command is available to unload data from Snowflake.

Pipeline to unload data from Snowflake (For one time load):

1. Unload data from Snowflake directly into Google Cloud Storage (recommended), or download data and copy it to Cloud Storage using gsutil or the Cloud Storage client libraries.

2. Use the BigQuery Data Transfer Service for Cloud Storage to load the data into BigQuery

  • Fast & less risky
  • Validate reports and dashboards before GCP adoption

Technical Steps:

  • Create Cloud Storage Integration in Snowflake
CREATE STORAGE INTEGRATION <integration_name>                                              //Create Storage integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket>/<path>/');

integration_name : Name of the new integration

bucket : Name of the Google Cloud Storage bucket that will store the files/data

path : optional to have granulated authority over the objects in bucket

  • Retrieve Cloud Storage Service Account ID that was created by default by Snowflake Account
DESC STORAGE integration <integration_name>;                                  //Describe Integration to retrieve service account ID

STORAGE_GCP_SERVICE_ACCOUNT is the default Cloud Storage Service Account created by Snowflake Account

  • Create custom IAM roles to provide service account below permissions to access GCS Bucket objects. Assign the custom roles to the Cloud Storage Service Account.
storage.buckets.get
storage.objects.get
storage.objects.list
storage.objects.create
storage.objects.list
  • Setup the required Snowflake Environment by creating a data warehouse where the data will be loaded and schema which will hold the tables
create warehouse if not exists warehouse_name
create database if not exists database_name
use database database_name
create schema if not exists schema_name;
  • Use the ACCOUNTADMIN role to setup the database and the schema as it has the ability to manage all aspects of the account
use role accountadmin
use warehouse warehouse_name
use database database_name
use schema schema_name;
  • Create a table in Snowflake and populate the data that need to be pushed to GCP
//Creating table in Snowflake
CREATE OR REPLACE TABLE mytable (
id number,holder_first_name string,holder_last_name string, holder_DOB timestamp,holder_address string,holder_city string, holder_state string,holder_zip number, approx_latitude decimal, approx_longitude decimal, holder_gender string,trafic_source string, registration_date datetime, opened_date datetime,
is_credit_card boolean,jsonrec variant,created_at datetime
);
  • Now that we are ready with all the initial setup, we can create a named stage to pull data from local machine using PUT command and then push the data into existing table from staging using COPY INTO <table>
//Pushing data into Snowflake stage
put file://c:/Users/Dell/Desktop/data.csv @my_stage;
//copying data from stage into Snowflake table
copy into mytable from @%my_stage/data.csv.gz
FILE_FORMAT = ( TYPE = CSV
COMPRESSION = 'AUTO' FIELD_DELIMITER = ','
RECORD_DELIMITER = '\n'
SKIP_HEADER = 1
FIELD_OPTIONALLY_ENCLOSED_BY = '"'
TRIM_SPACE = TRUE
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
ESCAPE = 'NONE'
ESCAPE_UNENCLOSED_FIELD = 'NONE'
DATE_FORMAT = 'AUTO'
TIMESTAMP_FORMAT = 'AUTO')

Use COPY INTO command to unload data from table directly into GCS bucket

//Copying data into GCS bucket
copy into 'gcs://<bucket>/<path>/'
from sf_table
storage_integration = <integration_name>
File_Format = (TYPE = PARQUET);

Copied data can be viewed in the Google Cloud Storage bucket

  • Use bq load command to load data from GCS bucket to BigQuery table
bq load --autodetect --source_format=PARQUET  bq_dataset.mytable gs://<bucket_name>/<path>/data_snowflake_file.parquet

Now the historical data has been migrated from Snowflake to BigQuery.

Incremental Data Integration :

After the historical data migration, incremental pipelines are required to sync the delta data in the target data warehouse, which is BigQuery. ETL tools such as Informatica, Talend, Hevo or other partner tools support integration of data from Snowflake into BigQuery.

In this prototype, the approach taken is “Extract and Load (EL)”. The data pipelines are created and orchestrated using Cloud Composer which is a managed Airflow service in Google Cloud Platform. Alternatively, Cloud Dataflow jobs can be created using JDBC to BigQuery template for scalability and fully managed services.

Pipeline to incrementally extract data from Snowflake (continuous):

1. Extract data from Snowflake using into Google Cloud Storage using Cloud Composer.

2. Use the bq load command to load the data into BigQuery.

  • Open Source Apache Airflow framework
  • Ease of implementation using Python
  • Intuitive interface for monitoring and managing dependencies

The prerequisites listed above 7–10 are required for setting up the incremental pipelines. Now start with creating an Airflow DAG pipeline for Snowflake to BigQuery integration.

Step 1. Initialize the Apache Airflow

Created a VM on Google Cloud Platform (GCP), and installed Apache Airflow on this VM. The Airflow UI looks as shown below.

Fig. Apache Airflow UI

All the DAGs which have been created can be seen here. By default Airflow has a few example DAGs to help you get started with the learning path especially if you are new to Airflow. These DAGs can be used as a reference for the same.

Step 2: Establish a connection with Snowflake

To establish a connection with Snowflake, the following parameters need to be entered such as warehouse_name, account, password, username, schema, database , etc.

Once we have it, a simple python script can be written to test the connection to Snowflake.

import snowflake.connector
import pandas as pd
from sqlalchemy import create_engine
from snowflake.sqlalchemy import URL
url = URL(
account = 'account_name',
user = 'user_name',
password = 'password',
database = 'database_name',
schema = 'schema_name',
warehouse = 'warehouse_name'
)
engine = create_engine(url)
connection = engine.connect()
query = '''
select * from database_name.schema_name.mytable;
'''
df = pd.read_sql(query, connection)
df

Using the above code, check if you are able to fetch the data from the table on the Snowflake end using dataframes. Once the data fetch is successful, we can confirm the connection has been established successfully.

Step 3. Create a DAG

In this prototype, the DAG consists of 4 tasks which would work in sequence to maintain a flow of data from Snowflake to BigQuery.

(The tasks are created using various operators. These concepts of DAGs can be learned using the Airflow Documentation)

Task 1. A function is defined in order to fetch the max timestamp from the Snowflake table loaded in Stage 1 (Historical Load). This function will return the latest record timestamp loaded into Snowflake, this is the date from which the incremental loads need to be triggered; this value is pushed to XCOM. The list of tables to be migrated can be listed in a config file as ‘/root/airflow/dags/sf_source_tables.txt’.

def get_max_ts(source_sf_table_name,**kwargs):

bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='US',use_legacy_sql=False)
bqmaxts = bq_hook.get_records(sql="select max(max_value) from `project_id.bq_dataset.snowflake_metadata` where tablename='{}'".format(source_sf_table_name))
result_bqmaxts = bqmaxts[0][0]
print("Max timestamp obtained from metadata in BQ: ",result_bqmaxts)
ti = kwargs['ti']
ti.xcom_push(key = 'max_ts', value = result_bqmaxts)
'''d1 = datetime.fromtimestamp( float(result_bqmaxts) )
ti.xcom_push(key = 'latest_timestamp', value = d1)'''
print(result_bqmaxts)
return result_bqmaxts

Task 2. The next function takes the maximum timestamp from XCOM, executes a SELECT query on Snowflake to extract the records created after the particular date/timestamp. These records are pushed into Google Cloud Storage(GCS) in Parquet format. The Snowflake credentials are stored as a variable in JSON format.

def get_data_from_snowflake(source_sf_table_name,**kwargs):
ti = kwargs['ti']
latest_timestamp = ti.xcom_pull(task_ids="get_maxts_{}".format(source_sf_table_name),key="max_ts")
credentials= Variable.get("Snowflake_Connection_params",deserialize_json=True)
url = URL(
account = credentials["account"],
user = credentials["user"],
password = credentials["password"],
database = credentials["database"],
schema = credentials["schema"],
warehouse = credentials["warehouse"]
)
engine = create_engine(url)
connection = engine.connect()
query = '''
select * from database_name.schema_name.{}
where created_at >'{}'; '''.format(source_sf_table_name,latest_timestamp)
df = pd.read_sql(query, connection)
client = storage.Client()
bucket = client.get_bucket('<bucket_name>')
print("File is uploaded")
df.to_parquet('gs://<bucket_name>/<path>/{}.parquet'.format(source_sf_table_name))
print("File has been uploaded to GCS Bucket!")
connection.close()

Task 3. Next task is to use a simple bash operator and load the data from GCS to BigQuery.

Task 4. Once we have successfully loaded the data to BigQuery, fetch the maximum timestamp of the current dataset and update the same on the metadata table. This updated maximum timestamp will be fetched in subsequent loads from Snowflake to BigQuery. The last task is to delete the parquet file from the GCS bucket.

def metatable_update(source_sf_table_name,**kwargs):
ti = kwargs['ti']
max_identifier = "created_at"

bq_hook = BigQueryHook(bigquery_conn_id='bigquery_default',location='US',use_legacy_sql=False)
update_max_ts = bq_hook.get_records(sql="select max({}) from `project_id.bq_dataset.{}`".format(max_identifier,source_sf_table_name))
update_max_ts = update_max_ts[0][0]
d2 = datetime.fromtimestamp( float(update_max_ts) )
print(d2)
bq_hook.run(sql="update `project_id.bq_dataset.snowflake_metadata` set max_value='{}' where tablename = '{}'".format(d2,source_sf_table_name))
return 'Executed the update query'
def GCS_Delete_via_Bash(dag,source_sf_table_name,**kwargs):
ti = kwargs['ti']
now = ti.xcom_pull(task_ids= 'get_maxts_{}'.format(table_name),key = 'execution_date')

parquet_folder_path_gcs = 'gs://<bucket_name>/<path>/{}.parquet'.format(source_sf_table_name)
print(parquet_folder_path_gcs)
command_parquet = """ gsutil rm -r {} """.format(parquet_folder_path_gcs)
print(command_parquet)
bucket_empty_parquet_data = BashOperator(
task_id='bucket_empty_json_data_' + str(source_sf_table_name),
bash_command = command_parquet,
dag=dag
)
print("Parquet files are removed")
bucket_empty_parquet_data.execute(dict())

Task 5. The tasks in the DAG are executed in sequence for every table included in the config file using a loop.

with open('/root/airflow/dags/sf_source_tables.txt','r') as tables:
table_name = tables.read()
source_sf_table_name = table_name.strip('\n').split(',')

for x in source_sf_table_name:

max_ts = PythonOperator(
task_id="get_maxts_{}".format(x),
python_callable=get_max_ts,
op_kwargs={'source_sf_table_name':x,'dag': dag},
provide_context=True,
dag=dag
)

Fetch_data_from_snowflake= PythonOperator(
task_id="fetch_data_from_snowflake_{}".format(x),
provide_context=True,
op_kwargs={'source_sf_table_name':x,'dag': dag},
python_callable=get_data_from_snowflake,
dag=dag,
)

bq_data_load = BashOperator(
task_id="bq_load_{}".format(x),
bash_command = """ bq load --autodetect --source_format=PARQUET bq_dataset.{0} 'gs://<bucket_name>/<path>/{0}.parquet' """.format(x),
op_kwargs={'source_sf_table_name':x,'dag': dag},
dag=dag
)

update_meta = PythonOperator(
task_id="update_tablemeta_{}".format(x),
python_callable=metatable_update,
op_kwargs={'source_sf_table_name':x,'dag': dag},
provide_context=True,
dag=dag
)

delete_gcs_files = PythonOperator(
task_id="delete_gcs_files_{}".format(x),
python_callable=GCS_Delete_via_Bash,
op_kwargs={'source_sf_table_name':x, 'dag': dag},
provide_context=True,
dag=dag
)

max_ts >> Fetch_data_from_snowflake >> bq_data_load >> update_meta >> delete_gcs_files

The below screenshot shows the flow of DAG by tasks created for this incremental pipeline.

Fig. Graphical View of the DAG

Working of the DAG :

After the historical loads, there are 4 records in the BigQuery with the maximum timestamp in the created_at column as 2022–01–14.

BigQuery table with Historical Data

The below screenshot shows the maximum timestamp fetched from the historical load is saved in the metadata table in BigQuery. This timestamp will be used as a starting point for loading the delta data or incremental data.

Fig. Metadata table showing the maximum timestamp

On successful execution of the DAG, the delta records are successfully loaded to the BigQuery table. The maximum value in created_at column post the incremental load is shown as 2022–02–08.

Fig. BigQuery table with Delta Data

After data is successfully loaded to BigQuery, below screenshot shows the metadata table being updated with the latest record timestamp for the in the next run of the DAG.

Fig. Updated timestamp in BigQuery metadata table

So, there you have an end to end data pipeline from Snowflake to BigQuery.

All credits to the Key Contributors, Palak Miharia and Brian Soares, they really got this working end to end. A lot of research and tests have gone into this prototype. Special mention to Piyush Bajaj, Shreya Goel and Neethika Singh for their timely support. Great work, team!

--

--