Moving Data

Working with SSO and Splitting Large Zip Files

The information available to measure marketing effectiveness and measure the impact of short- and long-term effect of marketing investment has greatly increased the size of datasets for statistical analysis and complicated connections between platforms. As a leader in Growth Marketing Technology, RXA (RXA.IO) continues to develop solutions delivering on the expectation of performance optimization, accountability, and demonstrable financial benefits through; SaaS or custom, data driven and channel agnostic Media Mix Modeling (MMM) analysis.

Solution for uploading large datasets from Snowflake to Python

Python allows for fast processing of large datasets. Snowflake is great for long term data storage and access. But getting 2.1GB compressed (41GB uncompressed) .zip files from a cloud drive or local machine into Snowflake can be tricky for more than one reason; Snowflake does not natively handle the upload of .zip files, the recommended maximum file upload size for the Python/Snowflake connector is 100MB when using parallel processing, and there may be memory issues with trying to open the entire file locally (or on a server) to chunk it.

The RXA Data Enigneering team has developed a solution to handling the transfer of large .zip files to a Snowflake table using Single Sign-On (SSO) authentication:

Getting Started

First, make sure you are using Python version 3.6+. Then install and import all the necessary libraries. The credentials file is named snowflake_credentials.py, and the template can be found in the project Git folder.

import datetime as dt
import math
import os
import threading
import time
import zipfile as zf

import pandas as pd  # pandas==1.0.1
import snowflake.connector  # snowflake-connector-python==2.2.5

# Credential file: snowflake_credentials.py
from snowflake_credentials import creds

Next, define a SnowConnector class to connect to your Snowflake instance and access tables within your database. The full code can be seen at the bottom of this walkthrough, or on GitHub.

class SnowConnector():
def __init__(self, creds):
# set variables from your creds file
return

# define variable access functions

def snowflake_cursor(self):
e_counter = 0
conn = None

while e_counter < 5:
try:
conn = snowflake.connector.connect(
user=self.user,
authenticator=’externalbrowser’,
account=self.account,
warehouse=self.warehouse,
database=self.database,
schema=self.schema
)
break
except snowflake.connector.errors.DatabaseError as e:
print(e)
print(‘Connection to Snowflake refused, trying again…’)
e_counter += 1
time.sleep(5)
if not conn:
print(“””\n*****\n
Connection to Snowflake refused after 5 attempts.
Please check connection credentials and
connection to server/internet.
\n*****\n”””)
exit(1)
print(“Connected to Snowflake”)
return conn

# define table checking and creation/replacing functions

Note: the authenticator=’externalbrowser’ argument is necessary for SSO authentication and will open a tab in your default browser to confirm that you are logged in. The “user” value in the snowflake_credentials.py file must match the account that you are logged into in your browser.

Splitting Your File

A note before discussing the splitting of files: this can also be done through command line (on Linux/MacOS) or through third-party software. I elected to do it programmatically, as I was constrained by my available access permissions and software. If you find a faster way to chunk files from a .zip format within Python, please let me know!

Creating a class and function to split up the .zip file allows us to run multiple threads at the same time, reducing the file chunking time by 84%* over a non-threaded approach. Python’s internal threading.Thread class is overridden with our own variables:

*based on my own testing and profiling, with a 2.1GB compressed/41GB uncompressed csv file

class ZipToGzThread (threading.Thread):
def __init__(self, thread_id, skip, rows_to_read,
filename, header, dtypes,
staging_folder, zipname=None):
threading.Thread.__init__(self)
self.threadID = thread_id
self.skip = skip
self.rows_to_read = rows_to_read
self.filename = filename
self.header = header.columns
self.dtypes = dtypes
self.staging_folder = staging_folder
self.zipname = zipname

def run(self):
chunk_file(self.skip, self.rows_to_read, self.filename,
self.header, self.dtypes, self.threadID,
self.staging_folder, self.zipname)

This program can handle .zip, .csv, and .txt files. If there is no zipname, .csv or .txt formats are assumed, and the file is read and chunked appropriately. The split files are compressed into a gzip format, as Snowflake can natively decompress these files.

def chunk_file(skip, rows_to_read, filename,
header, dtypes, threadID,
staging_filepath, zipname):
if zipname:
temp = pd.read_csv(zipname, skiprows=skip,
nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine=’c’)
else:
temp = pd.read_csv(filename, skiprows=skip,
nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine=’c’)
print(f’Read {len(temp)} lines starting at row {skip}’)

temp.to_csv(f'{staging_filepath}/{filename}_{threadID}.gz’,
index=False, header=True,
compression=’gzip’, chunksize=1000)
print(f’Sent thread ID {threadID} to local staging folder as GZIP’)

With this method, we take advantage of Pandas’ read_csv function, specifically skip_rows and nrows, that allows us to read in distinct row chunks anywhere in the file, while keeping the rows_to_read value constant. This will make more sense further down in the walkthrough.

Initializing Your Transfer

Once the files are chunked in a local staging folder and ready to be transferred to Snowflake staging tables, we must create a list of SQL statements to send to each individual thread. Again, we will override the threading.Thread class with our own variables:

class SfExecutionThread (threading.Thread):
def __init__(self, thread_id, sql_query):
threading.Thread.__init__(self)
self.threadID = thread_id
self.sql_query = sql_query

def run(self):
print(‘Starting {0}: {1}’.format(self.threadID, self.sql_query))
execute_in_snowflake(self.sql_query)
print(‘Exiting {0}: {1}’.format(self.threadID, self.sql_query))

def execute_in_snowflake(sf_query):
# connect to snowflake
temp_snow = SnowConnector(creds)
conn = temp_snow.snowflake_cursor()

# increase timeout
conn.cursor().execute(“””ALTER SESSION SET
STATEMENT_TIMEOUT_IN_SECONDS = 86400″””)

conn.cursor().execute(sf_query)
conn.close()

Now that all our Snowflake connecting, file splitting, and SQL creating functions are defined, we can run the main program. First order is to create an instance of the SnowConnector class:

snow = SnowConnector(creds)
snow.print_conn_info()
filetype = snow.get_filetype()
table_name = snow.get_table()

One of the biggest issues the RXA team encountered with chunking a zip file using threading is figuring out how many iterations need to be completed to read in the whole file. With a 41GB file, reading the whole dataset into memory is not very efficient. We used this trick (found here) to count the rows in a .zip file without storing anything other than the counter value:

if filetype == ‘zip’:
zipname = snow.get_infile()
filename = snow.get_file_within_zip()

with zf.ZipFile(zipname) as folder:
with folder.open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(zipname, nrows=1)
else:
with open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(filename, nrows=1)

This can also be used for large .csv/.txt files. We also gather the header information from the zip files at this time for later use.

Threading Your Transfer

To see the number of “read file” threads we need to run, the following calculations are made:

# calculate number of iterations needed for file chunking (recommended to use 1 mill)
rows_to_read = snow.get_rows_to_read()
rows_div_mill = row_count / rows_to_read
iterations = math.ceil(rows_div_mill)
skip = 1

# set all types to str/obj for faster read
dtypes = {k: str for k in header.columns}
file_chunk_threads = []

The skip value is set to 1 because we do not want to read in the header every time – the column names are stored in our header variable from earlier.

Now we loop through the number of iterations, passing in gradually increasing skip values until all rows are handled. For each ZipToGzThread instance created, the skip variable is increased by whatever the rows_to_read value is set to in your snowflake_credentials.py file (the default is 1 million).

if filetype == ‘zip’:
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read,
filename, header,
dtypes, staging_filepath,
zipname=zipname))
skip += rows_to_read
else:
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read,
filename, header,
dtypes, staging_filepath,
zipname=None))
skip += rows_to_read

From here, we start each thread and join them so the program will not continue until each thread is finished running. This Stack Overflow post was helpful to me when trying to wrap my head around how the .join() method works.

for fc_thread in file_chunk_threads:
fc_thread.start()

for fc_thread in file_chunk_threads:
fc_thread.join()

The same list creation and thread starting process is done for all the staging files after they are in the local staging folder (previous step):

staging_files = []

for root, dirs, files in os.walk(f”{staging_filepath}/.”):
for f in files:
staging_files.append(f)

put_statements = []

for staging_file in staging_files:
put_statements.append(f”’
PUT file://{staging_filepath}/{staging_file} @%{table_name}
SOURCE_COMPRESSION = GZIP
PARALLEL = 20
AUTO_COMPRESS = FALSE”’)

put_threads = []
put_counter = 0

# create thread list
for statement in put_statements:
put_threads.append(SfExecutionThread(put_counter, statement))
put_counter += 1

# execute the threads
for thread in put_threads:
thread.start()

for thread in put_threads:
thread.join()

Moving to Staging

After all staging files are placed into Snowflake, we perform one COPY INTO statement to move the files from their Snowflake staging table to the destination table.

copy_into_sql = f”””COPY INTO {table_name}
FROM @%{table_name}
FILE_FORMAT =
(REPLACE_INVALID_CHARACTERS = TRUE
SKIP_HEADER = 1
FIELD_DELIMITER = ‘{field_delimiter}’
RECORD_DELIMITER = ‘{record_delimiter}’)
ON_ERROR = CONTINUE”””
snow.snowflake_cursor().cursor().execute(copy_into_sql)

*I will admit that here is one spot where this file upload script has a hole. Every so often there is a row with a special character that does not allow it to be uploaded into the destination table. I am currently working on creating more data cleaning processes, or at least an output to an error log so these dropped rows are trackable. Use this COPY INTO statement at your own discretion, and please let me know if you have already found a solution to this issue!

From here, all that is left to do is to clean up the staging folders, both locally and in Snowflake. The local folder deletion is optional and can be turned on or off in the snowflake_credentials.py file (the default is to delete all staging data).

# deleting all values from the Snowflake staging table
remove_staging_sql = f”REMOVE @%{table_name}”
snow.snowflake_cursor().cursor().execute(remove_staging_sql)

if snow.get_rm_staging_files():
# remove files from local staging folder
for root, dirs, files in os.walk(staging_filepath):
for f in files:
os.unlink(os.path.join(root, f))

if snow.get_rm_staging_folder():
# deleting local staging folder
os.rmdir(staging_filepath)

Your dataset should now be accessible in your selected Snowflake table. However, all data is different, and each specific data problem requires a unique solution. Feel free to use parts of this code, or the whole thing to make your file uploads to Snowflake a bit easier! Feel free to contact me with questions or comments at jess.brown@rxa.io.

Notes:

The threading approach was based on this interworks blog post.

I plan to add functionality that allows a user to sign into Snowflake using a basic Username/Password instead of SSO in the near future. Please check the project Git folder for any updates: https://github.com/rxa-io/pysnow

View the full Python x Snowflake Multithreaded Connector below:

#!/usr/bin/env python
# coding: utf-8

import datetime as dt
import math
import os
import threading
import time
import zipfile as zf

import pandas as pd
import snowflake.connector

# CRED_FILE = “snowflake_credentials.py”
from snowflake_credentials import creds

class SnowConnector():
def __init__(self, creds):
try:
self.user = creds[‘user’]
self.account = creds[‘account’]
self.warehouse = creds[‘warehouse’]
self.database = creds[‘database’]
self.schema = creds[‘schema’]
self.infile = creds[‘infile’]
self.filechunks_exist = False
self.staging_folder = ‘staging’
self.rows_to_read = 1000000
self.field_delimiter = ‘,’
self.field_delimiter = ‘\n’
self.filename_in_zip = ”
self.rm_staging_folder = True
self.rm_staging_folder = True

if ‘snow_table’ in creds and creds[‘snow_table’]:
self.snow_table = creds[‘snow_table’]
elif ‘infile’ in creds and creds[‘infile’]:
# create table name from infile name
remove_list = [‘.txt’, ‘.csv’, ‘.zip’]
temp_filename = creds[‘infile’].split(‘/’)[-1]
for r in remove_list:
temp_filename = temp_filename.replace(r, ”)

temp_filename = temp_filename.replace(‘ ‘, ‘_’).replace(‘.’, ‘_’)
self.snow_table = ”.join(s for s in temp_filename if (s.isalnum() or s == ‘_’))
else:
now = str(dt.datetime.now())
self.snow_table = ‘python_upload_’ + now.replace(‘:’, ‘-‘).replace(‘.’, ‘-‘).replace(‘ ‘, ‘-‘)

if ‘.zip’ in self.infile:
self.filetype =  ‘zip’
elif ‘.txt’ in self.infile:
self.filetype =  ‘txt’
elif ‘.csv’ in self.infile:
self.filetype =  ‘csv’
else:
self.filetype =  ‘none’

if ‘create_new_if_table_exists’ in creds and creds[‘create_new_if_table_exists’]:
self.if_table_exists = ‘create_new’
elif ‘replace_if_table_exists’ in creds and creds[‘replace_if_table_exists’]:
self.if_table_exists = ‘replace’
else:
print(“””No selection for how to handle pre-existing Snowflake tables with the same name.
Default is to create a new table”””)
self.if_table_exists = ‘create_new’

if ‘filechunks_exist’ in creds and creds[‘filechunks_exist’]:
self.filechunks_exist = True

if ‘staging_folder_for_filechunks’ in creds and creds[‘staging_folder_for_filechunks’]:
self.staging_folder = str(creds[‘staging_folder_for_filechunks’])

if ‘rows_to_read_for_chunking’ in creds and creds[‘rows_to_read_for_chunking’]:
self.rows_to_read = creds[‘rows_to_read_for_chunking’]

if ‘field_delimiter’ in creds and creds[‘field_delimiter’]:
self.field_delimiter = str(creds[‘field_delimiter’])

if ‘record_delimiter’ in creds and creds[‘record_delimiter’]:
self.record_delimiter = str(creds[‘record_delimiter’])

if ‘filename_in_zip’ in creds and creds[‘filename_in_zip’]:
self.filename_in_zip = creds[‘filename_in_zip’]

if ‘delete_staging_folder_after_process’ in creds and creds[‘delete_staging_folder_after_process’]:
self.rm_staging_folder = creds[‘delete_staging_folder_after_process’]

if ‘remove_local_staging_filechunks’ in creds and creds[‘remove_local_staging_filechunks’]:
self.rm_staging_files = creds[‘remove_local_staging_filechunks’]

except Exception as e:
print(e, ‘\nPlease check the credential file for missing information’)

def print_conn_info(self):
print()
print(‘USER:’, self.user)
print(‘ACCOUNT:’, self.account)
print(‘WAREHOUSE:’, self.warehouse)
print(‘DATABASE:’, self.database)
print(‘SCHEMA:’, self.schema)
print(‘DATA INPUT:’, self.infile)
print(‘INFILE TYPE:’, self.filetype)
print(‘SNOWFLAKE TABLE:’, self.snow_table)
print()

def get_filetype(self):
return self.filetype

def get_infile(self):
return self.infile

def get_table(self):
return self.snow_table

def get_rows_to_read(self):
return self.rows_to_read

def get_field_delimiter(self):
return self.field_delimiter

def get_record_delimiter(self):
return self.record_delimiter

def get_staging_folder(self):
return self.staging_folder

def get_file_within_zip(self):
return self.filename_in_zip

def get_rm_staging_folder(self):
return self.rm_staging_folder

def get_rm_staging_files(self):
return self.rm_staging_files

# TODO: error handling when connection fails after 5 attempts
def snowflake_cursor(self):
e_counter = 0
conn = None

while e_counter < 5:
try:
conn = snowflake.connector.connect(
user=self.user,
authenticator=’externalbrowser’,
account=self.account,
warehouse=self.warehouse,
database=self.database,
schema=self.schema
)
break
except snowflake.connector.errors.DatabaseError as e:
print(e)
print(‘Connection to Snowflake refused, trying again…’)
e_counter += 1
time.sleep(5)
if not conn:
print(“””\n*****\n
Connection to Snowflake refused after 5 attempts.
Please check connection credentials and
connection to server/internet.
\n*****\n”””)
exit(1)
print(“Connected to Snowflake”)
return conn

def table_exists(self, conn, table=None):
if table:
table_name = table
else:
table_name = self.snow_table

check = f”SHOW TABLES LIKE ‘{table_name}'”

cur = conn.cursor()
cur.execute(check)
tables = cur.fetchall()

if tables:
return True
return False

def create_table(self, col_names_types):
table_name = self.snow_table
conn = self.snowflake_cursor()

if self.table_exists(conn):
if self.if_table_exists == ‘create_new’:
old_table_name = table_name
loop_count = 2

while True:
table_name = f'{old_table_name}_{loop_count}’
exists = self.table_exists(conn, table_name)

if not exists:
break

loop_count += 1
self.snow_table = table_name
print(old_table_name, ‘already exists. new table name is’, table_name)
else:
print(table_name, ‘already exists. table will be replaced’)
drop_rows = “DELETE FROM ” + table_name
conn.cursor().execute(drop_rows)
print(table_name, ‘table rows cleared and ready for upload’)

create = “CREATE OR REPLACE TABLE ” + table_name + col_names_types
conn.cursor().execute(create)
print(table_name, ‘table created or replaced’)

def file_col_names_types(filename, all_strings=True):
# set all col types to string
if all_strings:
infile = pd.read_csv(filename, nrows=1)
temp_cols = infile.columns
clean_cols = []
unnamed_count = 1

for col in temp_cols:
if ‘Unnamed:’ in col:
col = f’UNNAMED_COLUMN_{unnamed_count}’
unnamed_count += 1
clean_cols.append(col)
infile_sql_types = “(” + “, “.join([col.replace(‘ ‘, ‘_’).strip() + ‘ string’ for col in clean_cols]) + “)”

# attempt to guess col types based on first 1000 rows of infile
else:
infile = pd.read_csv(filename, nrows=1000)
py_to_sql_types = {‘int64’: ‘integer’, ‘int32’: ‘integer’, ‘object’: ‘string’,
‘float64’: ‘bigint’, ‘float32’: ‘bigint’, ‘bool’: ‘bit’,
‘datetime64[ns]’: ‘datetime’}

infile_types = infile.dtypes.apply(lambda x: x.name).to_dict()
infile_sql_types = “(” + “, “.join([key.replace(‘ ‘, ‘_’).strip() + ‘ ‘ +
(py_to_sql_types[value] if value in py_to_sql_types else ‘string’)
for key, value in infile_types.items()]) + “)”

return infile_sql_types

class ZipToGzThread (threading.Thread):
def __init__(self, thread_id, skip, rows_to_read, filename, header, dtypes,
staging_folder, zipname=None):
threading.Thread.__init__(self)
self.threadID = thread_id
self.skip = skip
self.rows_to_read = rows_to_read
self.filename = filename
self.header = header.columns
self.dtypes = dtypes
self.staging_folder = staging_folder
self.zipname = zipname

def run(self):
chunk_file(self.skip, self.rows_to_read, self.filename, self.header,
self.dtypes, self.threadID, self.staging_folder, self.zipname)

def chunk_file(skip, rows_to_read, filename, header, dtypes, threadID, staging_filepath, zipname=None):
if zipname:
temp = pd.read_csv(zipname, skiprows=skip, nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine=’c’)
else:
temp = pd.read_csv(filename, skiprows=skip, nrows=rows_to_read, dtype=dtypes,
names=header, header=None, engine=’c’)
print(f’Read {len(temp)} lines starting at row {skip}’)
temp.to_csv(f'{staging_filepath}/{filename}_{threadID}.gz’, index=False, header=True,
compression=’gzip’, chunksize=1000)
print(f’Sent thread ID {threadID} to local staging folder as GZIP’)

class SfExecutionThread (threading.Thread):
# credit: https://interworks.com/blog/2020/03/04/zero-to-snowflake-multi-threaded-bulk-loading-with-python/
def __init__(self, thread_id, sql_query):
threading.Thread.__init__(self)
self.threadID = thread_id
self.sql_query = sql_query

def run(self):
print(‘Starting {0}: {1}’.format(self.threadID, self.sql_query))
execute_in_snowflake(self.sql_query)
print(‘Exiting {0}: {1}’.format(self.threadID, self.sql_query))

def execute_in_snowflake(sf_query):
# connect to snowflake
temp_snow = SnowConnector(creds)
conn = temp_snow.snowflake_cursor()

# increase timeout
conn.cursor().execute(“””ALTER SESSION SET
STATEMENT_TIMEOUT_IN_SECONDS = 86400″””)

conn.cursor().execute(sf_query)
conn.close()

def main():
start_time = dt.datetime.now()
print(‘Starting main:’, start_time)

snow = SnowConnector(creds)
snow.print_conn_info()
filetype = snow.get_filetype()
table_name = snow.get_table()

if filetype in [‘zip’, ‘csv’, ‘txt’]:
filename = snow.get_infile()
col_names_types = file_col_names_types(filename, all_strings=True)

print(dt.datetime.now(), ‘***’, ‘Creating table’, table_name)
snow.create_table(col_names_types)

# create staging folder
staging_folder = snow.get_staging_folder()
path = os.path.abspath(os.getcwd())
staging_filepath = str(path)+’/’+staging_folder
staging_exists = os.path.isdir(staging_folder)

print(dt.datetime.now(), ‘***’, ‘Creating or clearing staging folder…’)
if staging_exists:
# remove contents from staging folder
for root, dirs, files in os.walk(staging_filepath):
for f in files:
os.unlink(os.path.join(root, f))
else:
try:
os.mkdir(staging_filepath)
except OSError as error:
print(error)
print(dt.datetime.now(), ‘***’, ‘Staging folder ready’)

print(dt.datetime.now(), ‘***’, ‘Counting rows in file and getting header info…’)
if filetype == ‘zip’:
zipname = snow.get_infile()
filename = snow.get_file_within_zip()

with zf.ZipFile(zipname) as folder:
with folder.open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(zipname, nrows=1)
else:
with open(filename) as f:
row_count = 0
for _ in f:
row_count += 1
header = pd.read_csv(filename, nrows=1)
print(dt.datetime.now(), ‘***’, f’Header info gathered for your {str(row_count)} row file’)

# calculate number of iterations needed for file chunking (recommended to use 1 mill)
rows_to_read = snow.get_rows_to_read()
rows_div_mill = row_count / rows_to_read
iterations = math.ceil(rows_div_mill)
skip = 1

# set all types to str/obj for faster read
dtypes = {k: str for k in header.columns}
file_chunk_threads = []

print(dt.datetime.now(), ‘***’, ‘Creating thread list for file chunking and GZIP formatting…’)
if filetype == ‘zip’:
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read, filename, header,
dtypes, staging_filepath, zipname=zipname))
skip += rows_to_read
else:
for i in range(1, iterations+1):
file_chunk_threads.append(ZipToGzThread(i, skip, rows_to_read, filename, header,
dtypes, staging_filepath, zipname=None))
skip += rows_to_read
print(dt.datetime.now(), ‘***’, ‘Thread list created for file chunking and formatting’)

print(dt.datetime.now(), ‘***’, ‘Starting file chunking and formatting threads…’)
for fc_thread in file_chunk_threads:
fc_thread.start()

for fc_thread in file_chunk_threads:
fc_thread.join()
print(dt.datetime.now(), ‘***’, ‘Files chunked and formatted in folder:’, staging_folder)

# starting to stage files for transfer
staging_files = []

for root, dirs, files in os.walk(f”{staging_filepath}/.”):
for f in files:
staging_files.append(f)

print(dt.datetime.now(), ‘***’, ‘Total chunked files:’, len(staging_files))
put_statements = []

print(dt.datetime.now(), ‘***’, ‘Creating PUT statements for file chunks…’)
for staging_file in staging_files:
put_statements.append(f”’
PUT file://{staging_filepath}/{staging_file} @%{table_name}
SOURCE_COMPRESSION = GZIP
PARALLEL = 20
AUTO_COMPRESS = FALSE
”’)
print(dt.datetime.now(), ‘***’, ‘PUT statements created’)

put_threads = []
put_counter = 0

# create thread list
print(dt.datetime.now(), ‘***’, ‘Creating thread list for PUT statements…’)
for statement in put_statements:
put_threads.append(SfExecutionThread(put_counter, statement))
put_counter += 1

# execute the threads
print(dt.datetime.now(), ‘***’, ‘Starting PUT query threads…’)
for thread in put_threads:
thread.start()

for thread in put_threads:
thread.join()
print(dt.datetime.now(), ‘***’, ‘PUT threads complete. Data Staged’)

field_delimiter = snow.get_field_delimiter()
record_delimiter = snow.get_record_delimiter()

print(dt.datetime.now(), ‘***’, ‘Starting COPY INTO…’)
copy_into_sql = f”””COPY INTO {table_name}
FROM @%{table_name}
FILE_FORMAT = (REPLACE_INVALID_CHARACTERS = TRUE
SKIP_HEADER = 1
FIELD_DELIMITER = ‘{field_delimiter}’
RECORD_DELIMITER = ‘{record_delimiter}’)
ON_ERROR = CONTINUE”””
snow.snowflake_cursor().cursor().execute(copy_into_sql)
print(dt.datetime.now(), ‘***’, ‘COPY INTO complete. Data in’, table_name)

# clear staging table
print(dt.datetime.now(), ‘***’, ‘Clearing staging table…’)
remove_staging_sql = f”REMOVE @%{table_name}”
snow.snowflake_cursor().cursor().execute(remove_staging_sql)
print(dt.datetime.now(), ‘***’, ‘Staging table cleared’)

# clear local staging folder
if snow.get_rm_staging_files():
print(dt.datetime.now(), ‘***’, ‘Clearing local staging folder…’)
for root, dirs, files in os.walk(staging_filepath):
for f in files:
os.unlink(os.path.join(root, f))
print(dt.datetime.now(), ‘***’, ‘Local staging folder cleared’)

if snow.get_rm_staging_folder():
print(dt.datetime.now(), ‘***’, ‘Deleting staging folder…’)
os.rmdir(staging_filepath)
print(dt.datetime.now(), ‘***’, ‘Local staging folder deleted’)

elif filetype == ‘sql’:
print(‘This connector is not equipped to handle SQL transfers at this time’)
elif filetype in [‘hive’, ‘hadoop’]:
print(‘This connector is not equipped to handle Hive/Hadoop transfers at this time’)

end_time = dt.datetime.now()
print(‘Total file splitting, formatting, and upload process took’, end_time – start_time)

if __name__ == ‘__main__’:
main()

AUTHOR: Jess Brown

Jess Brown is a Data Engineer at RXA, a Growth Marketing Technology company fueled by data science and applied artificial intelligence.  Through our GMI (Growth Marketing Intelligence) platform and solutions; RXA helps attract, convert, retain, and grow the value of our client’s customers by isolating specific business problems and developing actionable data-driven AI solutions to achieve prescribed results.  Our Media Mix Modeling and Multi-touch attribution solutions are agnostic to mediums and channels to provide clients with an unbiased view of performance and optimization recommendations.  You can learn more about RXA solutions by visiting us at RXA.IO or learn@rxa.com. While RXA is agnostic to BI platforms, they have been recognized as Domo’s most influential partner for 2021 and have been named Domo’s Innovation partner of the year for the past two years.