Using Python and MySQL in the ETL Process: Using Python and SQLAlchemy

Emil Drkušić, Database designer and developer, financial analyst

by
Emil Drkušić
Database designer and developer, financial analyst

Posted: June 26, 2019

database design, data modeling, ETL, Python, SQLAlchemy

In the previous two articles of this series, we discussed how to use Python and SQLAlchemy to perform the ETL process. Today we’ll do the same, but this time using Python and SQL Alchemy without SQL commands in textual format. This will enable us to use SQLAlchemy regardless of the database engine we’re connected to. So, let’s start.

Today we’ll discuss how to perform the ETL process using Python and SQLAlchemy. We’ll create a script to extract daily data from our operational database, transform it, and then load into our data warehouse.

This is the third article in the series. If you haven’t read the first two articles (Using Python and MySQL in the ETL Process and SQLAlchemy), I strongly encourage you to do so before continuing.

This entire series is a continuation of our data warehouse series:

Okay, now let’s get started on today’s topic. First, let’s look at the data models.

The Data Models



Operational (live) database data model




DWH data model


These are the two data models we’ll be using. For more info about data warehouses (DWHs), check out these articles:

Why SQLAlchemy?

The whole idea behind SQLAlchemy is that after we import databases, we don’t need SQL code that’s specific to the related database engine. Instead, we can import objects into SQLAlchemy and use the SQLAlchemy syntax for statements. That will allow us to use the same language no matter what database engine we’re connected to. The main advantage here is that a developer doesn’t need to take care of the differences between different database engines. Your SQLAlchemy program will work exactly the same (with minor changes) if you migrate to a different database engine.

I’ve decided to use only SQLAlchemy commands and Python lists to communicate to temporary storage and between different databases. The key reasons behind this decision is that 1) Python lists are well-known, and 2) the code would be readable for those without Python skills.

This isn’t to say that SQLAlchemy is perfect. It has certain limitations, which we’ll discuss later. For now, let’s just take a look at the code below:

Running the script and the result

Running the script and the result

This is the Python command used to call our script. The script checks the data in the operational database, compares the values with the DWH, and imports the new values. In this example, we’re updating values in two dimension tables and one fact table; the script returns the appropriate output. The whole script is written so that you can run it multiple times a day. It will delete “old” data for that day and replace it with new.

Let's analyze the whole script, starting from the top.

Importing SQLAlchemy

The first thing we need to do is to import the modules we’ll use in the script. Usually, you’ll import your modules as you’re writing the script. In most cases, you won’t know exactly which modules you’ll need at the outset.

from datetime import date

# import SQLAlchemy
from sqlalchemy import create_engine, select, MetaData, Table, and_, func, case

We’ve imported Python’s datetime module, which supplies us with classes that work with dates.

Next, we have the sqlalchemy module. We won’t import the whole module, just the things we need – some specific to SQLAlchemy (create_engine, MetaData, Table), some SQL statement parts (select, and_, case), and func, which enables us to use functions like count() and sum().

Connecting to the Databases

We’ll need to connect to two databases on our server. We could connect to more databases (MySQL, SQL Server, or any other) from different servers if needed. In this case, both databases are MySQL databases and are stored on my local machine.

# connect to databases
engine_live = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_live')
connection_live = engine_live.connect()
engine_dwh = sqlalchemy.create_engine('mysql+pymysql://:@localhost:3306/subscription_dwh')
connection_dwh = engine_dwh.connect()

metadata = MetaData(bind=None)

We’ve created two engines and two connections. I won’t go into details here because we’ve already explained this bit in the previous article.

Updating the dim_time Dimension

Goal: Insert yesterday’s date if it’s not already inserted in the table.

In our script, we’ll update two dimension tables with new values. The rest of them follow the same pattern, so we’ll only go over this once; we don’t need to write down nearly identical code a few more times.

The idea is very simple. We’ll always run the script to insert new data for yesterday. Therefore, we need to check if that date was inserted in the dimension table. If it is already there, we won’t do anything; if it isn’t, we’ll add it. Let’s take a look at the code to update the dim_time table.

First, we’ll check if the date exists. If it doesn’t exist, we’ll add it. We start with storing yesterday’s date in a variable. In Python, you do it this way:

yesterday = date.fromordinal(date.today().toordinal()-1)
yesterday_str = str(yesterday)

The first line takes a current date, converts it to a numerical value, subtracts 1 from that value, and converts that numerical value back to a date (yesterday = today – 1). The second line stores the date in a textual format.

Next, we’ll test if the date is already in the database:

table_dim_time = Table('dim_time', metadata, autoload = True, autoload_with = engine_dwh)
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday_str)
result = connection_dwh.execute(stmt).fetchall()
date_exists = len(result)

After loading the table, we’ll run a query that should return all rows from the dimension table where the time/date value equals yesterday. The result could have 0 (no such date in the table) or 1 row (the date is already in the table).

If the date is not already in the table, we’ll use the insert() command to add it:

if date_exists == 0:
  print("New value added.")
  stmt = table_dim_time.insert().values(time_date=yesterday, time_year=yesterday.year, time_month=yesterday.month, time_week=yesterday.isocalendar()[1], time_weekday=yesterday.weekday())
  connection_dwh.execute(stmt)
else:
  print("No new values.")

One new thing here I would like to point to is the usage of. .year, .month, .isocalendar()[1], and .weekday to get dateparts.

Updating the dim_city Dimension

Goal: Insert new cities if there are any (i.e. compare the list of cities in the live database to the list of cities in the DWH and add missing ones).

Updating the dim_time dimension was pretty simple. We simply tested if a date was in the table and inserted it if it wasn’t already there. To test a value in the DWH database, we used a Python variable (yesterday). We’ll use that process again, but this time with lists.

Since there is not an easy way to combine tables from different databases in a single SQLAlchemy query, we can’t use the approach outlined in Part 1 of this series. Therefore, we’ll need an object to store the values needed to communicate between these two databases. I’ve decided to use lists, because they are common and they do the job.

First, we’ll load the country and city tables from a live database into the relevant objects.

# dim_city
print("\nUpdating... dim_city")
table_city = Table('city', metadata, autoload = True, autoload_with = engine_live)
table_country = Table('country', metadata, autoload = True, autoload_with = engine_live)
table_dim_city = Table('dim_city', metadata, autoload = True, autoload_with = engine_dwh)

Next, we’ll load the dim_city table from the DWH into a list:

# load whole dwh table in the list
stmt = select([table_dim_city]);
table_dim_city_list = connection_dwh.execute(stmt).fetchall()

Then we’ll do the same for the values from the live database. We’ll join the tables country and city so we have all the data needed in this list:

# load all live values in the list
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name])\
	.select_from(table_city\
	.join(table_country))
table_city_list = connection_live.execute(stmt).fetchall()

Now we’ll loop through the list containing the data from the live database. For each record, we’ll compare values (city_name, postal_code, and country_name). If we don’t find such values, we’ll add a new record into the dim_city table.

# loop through live_db table
# for each record test if it is missing in the dwh table
new_values_added = 0
for city in table_city_list:
	id = -1;
	for dim_city in table_dim_city_list:
		if city[0] == dim_city[1] and city[1] == dim_city[2] and city[2] == dim_city[3]:
			id = dim_city[0]
	if id == -1:
		stmt = table_dim_city.insert().values(city_name=city[0], postal_code=city[1], country_name=city[2])
		connection_dwh.execute(stmt)
		new_values_added = 1
if new_values_added == 0:
	print("No new values.")
else:
	print("New value(s) added.")

To determine if the value is already in the DWH, we tested a combination of attributes that should be unique. (The primary key from the live database doesn’t help us much here.) We can use similar code to update other dictionaries. It’s not the nicest solution, but it’s still a pretty elegant one. And it will do exactly what we need.

Updating the fact_customer_subscribed Table

Goal: If we have old data for yesterday's date, delete it first. Add yesterday’s data into the DWH – regardless if we’ve deleted something in the previous step or not.

After updating all the dimension tables, we should update the fact tables. In our script, we’ll update only one fact table. The reasoning is the same as in the previous section: updating other tables would follow the same pattern, so we would mostly repeat the code.

Before inserting values in the fact table, we need to know the values of the related keys from the dimension tables. To do that, we’ll again load dimensions into lists and compare them with values from the live database.

The first thing we’ll do is load the customer and fact_customer_subscribed tables into objects:

# fact_customer_subscribed
print("\nUpdating... fact_customer_subscribed")

table_customer = Table('customer', metadata, autoload = True, autoload_with = engine_live)
table_fact_customer_subscribed = Table('fact_customer_subscribed', metadata, autoload = True, autoload_with = engine_dwh)

Now, we’ll need to find keys for the related time dimension. Since we’re always inserting data for yesterday, we’ll search for that date in the dim_time table and use it’s ID. The query returns 1 row, and the ID is in the first position (the index starts from 0, so that’s result[0][0]):

# find key for the dim_time dimension
stmt = select([table_dim_time]).where(table_dim_time.columns.time_date == yesterday)
result = connection_dwh.execute(stmt).fetchall()
dim_time_id = result[0][0]

For that time, we’ll delete all associated records from the fact table:

# delete any existing data in the fact table for that time dimension value
stmt = table_fact_customer_subscribed.delete().where(table_fact_customer_subscribed.columns.dim_time_id == dim_time_id)
connection_dwh.execute(stmt)

Okay, now we have the ID of the time dimension stored in the dim_time_id variable. This was easy because we can have only one time dimension value. The story will be different for the city dimension. First, we’ll load all the values we need – values which uniquely describe the city (not the ID), and aggregated values:

# prepare data for insert
stmt = select([table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name, func.sum(case([(table_customer.columns.active == 1, 1)], else_=0)).label('total_active'), func.sum(case([(table_customer.columns.active == 0, 1)], else_=0)).label('total_inactive'), func.sum(case([(and_(table_customer.columns.active == 1, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_new'), func.sum(case([(and_(table_customer.columns.active == 0, func.date(table_customer.columns.time_updated) == yesterday), 1)], else_=0)).label('daily_canceled')])\
	.select_from(table_customer\
	.join(table_city)\
	.join(table_country))\
	.group_by(table_city.columns.city_name, table_city.columns.postal_code, table_country.columns.country_name)

There are a few things I would like to emphasize about the query above:

  • func.sum(...) is SUM(...) from “standard SQL”.
  • The case(...) syntax uses and_ before conditions, not between them.
  • .label(...) functions like a SQL AS alias.
  • We’re using \ to move to the next line and increase the readability of the query. (Trust me, it’s pretty much unreadable without the slash – I’ve tried it :) )
  • .group_by(...) plays the role of SQL’s GROUP BY.

Next, we’ll loop through every record returned using the previous query. For each record, we’ll compare values that uniquely define a city (city_name, postal_code, country_name) with the values stored in the list created out of the DWH dim_city table. If all three values match, we’ll store the ID from the list and use it when inserting new data. This way, for every record, we’ll have IDs for both dimensions:

# loop through all new records
# use time dimension
# for each record find key for city dimension
# insert row
new_values = connection_live.execute(stmt).fetchall()
for new_value in new_values:
	dim_city_id = -1;
	for dim_city in table_dim_city_list:
		if new_value[0] == dim_city[1] and new_value[1] == dim_city[2] and new_value[2] == dim_city[3]:
			dim_city_id = dim_city[0]
	if dim_city_id > 0:	
		stmt_insert = table_fact_customer_subscribed.insert().values(dim_city_id=dim_city_id, dim_time_id=dim_time_id, total_active=new_value[3], total_inactive=new_value[4], daily_new=new_value[5], daily_canceled=new_value[6])
		connection_dwh.execute(stmt_insert)
		dim_city_id = -1
print("Completed.")

And that’s it. We’ve updated our DWH. The script would be much longer if we updated all the dimension and fact tables. The complexity would also be greater when a fact table is related to more dimension tables. In that case, we’d need a for loop for each dimension table.

This Doesn’t Work!

I was very disappointed when I wrote this script and then found out that something like this won’t work:

stmt = select([table_city.columns.city_name])\
	.select_from(table_city\
	.outerjoin(table_dim_city, table_city.columns.city_name == table_dim_city.columns.city_name))\
	.where(table_dim_city.columns.id.is_(None))

In this example, I’m trying to use tables from two different databases. If we establish two separate connections, the first connection won’t “see” tables from another connection. If we connect directly to the server, and not to a database, we won’t be able to load tables.

Until this changes (hopefully soon), you’ll need to use some kind of structure (e.g. what we did today) to communicate between the two databases. This complicates the code, because you need to replace a single query with two lists and nested for loops.

Share Your Thoughts About SQLAlchemy and Python

This was the last article in this series. But who knows? Maybe we’ll try another approach in upcoming articles, so stay tuned. In the meantime, please share your thoughts about SQLAlchemy and Python in combination with databases. What do you think we lack in this article? What would you add? Tell us in the comments below.

You can download the complete script that we used in this article here.

And special thanks goes to Dirk J Bosman (@dirkjobosman), who recommended this article series.

 
 
 

Try our online database modeler. No registration. No commitments.

 
 
Tags
 
Subscribe to our newsletter

If you find this article useful, join our weekly newsletter to be notified about the latest posts.

 
 
 
Vertabelo Academy It's time to speak the new lingua franca of the Web! Online Course ● Tons of Exercises ● Designed for Beginners DETAILS Check our other courses: