AWS Glue, S3 to PostgreSQL (Upsert)

Krl
10 min readFeb 8, 2021

--

How to pull data from a data source, deduplicate it and upsert it to the target database. I started to be interested in how AWS solved this. AWS team created a service called AWS Glue.

AWS Glue is a fully managed serverless ETL service. Using Glue we minimalize work required to prepare data for our databases, lakes or warehouses. The process of moving data among various data-stores is pretty simple.

In this tutorial, we are going to create an ETL job for CSV reports stored in the S3 bucket. We going to extract it, transform it and perform upsert on the existing database.

Our reports are stored in the S3 bucket. There are 3 folders named as report types containing files in CSV format.

Performance Reports ( performance for each campaign )

[“Campaign ID”, “Name”, “Views”]#[“Campaign ID”, “Name”, “Clicks”, “Cost”, “Day”, “Impressions”, “Views”]

Age Reports ( performance for each age with its adgroup, device, and campaign)

[“Campaign ID”, “Name”, “Views”, “Ad group ID”, “Device”, “Age Range”]#[“Campaign ID”, “Campaign”, “Clicks”, “Cost”, “Day”, “Impressions”, “Views”, “Ad group”, “Ad group ID”, “Device”, “Age Range”]

Gender Reports (performance for each gender with its adgroup, device, and campaign)

[“Campaign ID”, “Campaign”, “Clicks”, “Cost”, “Day”, “Impressions”, “Views”, “Ad group”, “Ad group ID”, “Device”, “Age Range”]

In our case, we need to crawl this data periodically (as they will be appended to mentioned folders) and upsert it to the PostgreSQL database.

Let’s define our simple database schema and let’s say we would like to extract campaigns, ad groups and consolidate reports in a single table.

We’ll need to :

  • Configure Classifier: for .csv files
  • Configure Crawler: for our s3 bucket
  • Save reports metadata to DataCatalog
  • Configure DevEndpoint (Access data from our local Apache Zeppelin notebook)
  • Create AwsGlue DatabaseConnection ( however in our case we might not need it, we need to have proper IAM role that can access database ).
  • Configure Job: ( PySpark script which will “glue” everything together :) )
  • Transform data from DataCatalog
  • Deal with duplicates, in my example, we cannot ensure data uniqueness inside S3 bucket
  • Save staging tables to a JDBC (PostgreSQL) source
  • Import Python 2.7 lib for our Job
  • Execute SQL queries: UPSERT, DROP

DataCatalog

Metastore for a our adwords reports datasources

DataCatalog is a managed store that lets us store and share metadata the same way as you would do in an Apache Hive metastore. There is only one DataCatalog that we consider as our repository to query and transform data. It centralizes all information on the data in one collection.

To populate our DataCatalog with AdWords reports metadata we need a source, Classifier, and Crawler.

Classifier

A Classifier reads the data in a data store. If it recognizes the format of the data, it generates a schema.

Create classifier for .csv . Crawler -> Classifier. Hit Create and that's it, our classifier is ready to be used by Crawler

Crawler

The Crawler go through your data, and inspect portions of it to determine the schema.

Let’s create our crawler

Choose a name, tags, don’t forget to add created classifier

Specify crawler source type

pick Data Stores, click next

Now pick folder inside S3 bucket we want to crawl

Add another datastore

YES And pick other folders analogically for Age and Gender reports folder

Choose existing IAM role, update or create a new one

We can schedule this crawler or have it run on demand.
This is a DataCatalog database name, pick or create. Found metadata about tables will be stored in named database

Review summary and click Finish

Our Crawler is ready to work.

Click Run Crawler (this might take a moment)

When Crawler finish running we can see created tables

DataCatalog now contains metadata about those 3 reports.
Metadata for one of the report type

If you use VPC and the access to S3 doesn’t work, you might need to create vpc s3 endpoint.

Endpoint endpoint for s3 failed. VPC S3 endpoint validation failed for SubnetId: subnet-11111. VPC: vpc-aaaaaaa. 
Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-222222 in Vpc vpc-bbbbbb.

We have created our Classifier and Crawler, now it’s the time to start work with the data.

Dev Endpoint

Aws Glue can expose for us Dev endpoint which we can use for local access to data stored in our data source.

Make sure you work with AWS Glue in the region that S3 bucket lives

Advise: DELETE your endpoint as you finished your work. It will generate cost as it keeps spark cluster running, I strongly suggest to have restricted and controled access to aws glue.

Create endpoint in ‘ETL -> Dev Endpoints’
Name endpoint and select the IAM role.
Skip Network confirmation as we want to access the only S3 bucket.
Provide Public key you would like to use to access the endpoint.

In this example, we’ll use Apache Zeppelin’s notebook.

  1. Download Apache Zeppelin (the version with all interpreters) from the Zeppelin download page onto your local machine.
  2. Open Zeppelin in your browser by navigating to http://localhost:8080.
  3. In Zeppelin in the browser, open the drop-down menu at anonymous in the upper-right corner of the page, and choose Interpreter. On the interpreter’s page, search for spark, and choose edit on the right. Make the following changes:

Select the Connect to existing process checkbox, and then set Host to localhost and Port to 9007 (or whatever other port you are using for port forwarding).

In Properties, set master to yarn-client.

If there is a spark.executor.memory property, delete it by choosing the x in the action column.

If there is a spark.driver.memory property, delete it by choosing the x in the action column.

4. Choose Save at the bottom of the page, and then choose OK to confirm that you want to update the interpreter and restart it. Use the browser back button to return to the Zeppelin start page.

5. Next, use SSH local port forwarding to forward a local port (here, 9007) to the remote destination defined by AWS Glue (169.254.76.1:9007).

PUBLIC_ENDPOINT_DNS can be found in Endpoint details

ssh -i KEY_USED_WHEN_CREATED_DEV_ENDPOINT -NTL 9007:169.254.76.1:9007 glue@PUBLIC_ENDPOINT_DNS

6. Running a Sample Script

In the local Zeppelin notebook start page, choose to create a new note. Name the new note, and confirm spark as the interpreter.

Records Count:   36323root
|-- campaign id: long
|-- campaign: string
|-- clicks: long
|-- cost: long
|-- day: string
|-- impressions: long
|-- views: long

TIP: If your schema has a field with the type of choice. To solve this we need to implement transform. For example, if we have the fields in our schema like:

|-- field1: choice
| |-- long```
| |-- string
|-- field2: choice
| |-- double
| |-- string

We can transform it inside our script:

ResolveChoice.apply(frame = datasource, specs = [(‘field1','cast:double'),('field2','cast:long')])

Now you can interact with our data source.

If you would like to write locally to RDS data, DataFrame writer will need a configured database connection.

Configure a connection to the database

You’ll need a PostgreSQL database inside RDS. Once you got that let’s add a connection to it.

Go to Databases -> Connections -> Add Connection

Name your connection and pick RDS, PostgreSQL database
Pick a database and configure name and credentials

Review summary, click Finish and test your connection.

self-referencing Inbound rule issued
Unable to find a suitable security group. Change connection type to JDBC and retry adding your connection.
You need to set up self-referencing Inbound rule

Transform: prepare data and schema

Job

A job is your business logic required to perform extract, transform and load (ETL) work. Job runs are initiated by triggers that can be scheduled or driven by events.

You define sources, destinations, and transformations. Enables to work with data from source to target destination using Apache Spark API (PySpark) script.

Most ETL jobs are triggered on schedule or when the data arrives.

Bookmarks are a mechanism to prevent processing the same file for the second time. Glue uses transformation context to index processed files (transformation_ctx).

DynamicFrame

Aws glue has handy DynamicFrame aside from SparkSQL DataFrame.

A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type. You can resolve these inconsistencies to make your datasets compatible with data stores that require a fixed schema.

Example getting started generated script

Name your job, pick the IAM role, Enable Job Bookmark if you want to avoid parsing the same files more than once.
Pick data source.
Choose `Create tables in your data target` and choose defined database connection
Change mapping of fields

Click `Save` and here we go, our generated script :

AWS Glue bootstrapped for us a basic script with simple flow:

  1. Get Reports table (simple data catalog from S3)
  2. Transform: ApplyMapping
  3. Transform: ResolveChoice
  4. Transform: DropNullFields
  5. DataSink: Append to database

In the case of our report, we need to create more custom flow with all 3 report types.

Let’s start working on our more real life Job example.

The flow we want to build :

  1. Extract reports (take one that was not processed (Bookmarks are set to Enabled)
  • Get campaign performance report
  • Get age report
  • Get gender report

2. Map reports

  • For each row add type (PERFORMANCE, AGE or GENDER)
  • Generate MD5 REPORT_ID for uniqueness, each record as follows:

Campaign Performance decoded string :

{campaign id}_{day}

Age Performance decoded string :

{campaign id}_{ad group id}_{day}_{type}_{device}_{age range}

Gender Performance decoded string :

{campaign id}_{ad group id}_{day}_{type}_{device}_{gender}

3. Save campaings from each report ( if data exists, it might now as all .csv were already processed )

  • Transform: SelectFields : [‘campaign’, ‘campaign id’]
  • Remove duplicated records
  • Transform: ApplyMapping (change campaign to name, campaign id to id)
  • Overwrite staging table with extracted campaigns

4. Save ad_groups from Age report and Gender report (if data exists )

  • Transform: SelectFields : [‘ad group’, ‘ad group id’]
  • Remove duplicated records
  • Transform: ApplyMapping (change ad group to name, ad group id to id)
  • Overwrite staging table with extracted adgroups

5. Save reports for each report type

  • Transform: DropFields :
  • ['ad group', 'campaign'] from age or gender reports
  • ['campaign'] from performance report
  • Remove duplicated records
  • Transform: ApplyMapping
  • (change report_id to id, change day from string to day etc.)
  • Overwrite staging table with extracted report

6. Populate database

  • Upsert from Staging tables to Target tables
  • Drop Staging tables

We can’t ensure that new data will be free of duplicated records so we are not going to use DynamicFrameWriter, as it only appends to the database table so we going to going to use Spark DataFrame write() to create staging tables and use upsert to overwrite existing records with fresh data.

UPSERT needs to be done manually as glue doesn’t provide any solution for PostgreSQL (For Redshift you can use preactions and post actions). We will need to import the PostgreSQL driver (pg8000) to execute our queries for upsert.

Import External Library (pg8000) for AWS Glue Job

We need to add an external library to our PySpark Job. pg8000 (lib need to work under Python 2.7 and should be raw python code), current version might not work as it has new dependencies

  • please pull version 1.12.4
  • pack folder pg8000 from within the library, upload it to S3 bucket of choice
In Job Edit or when creating a Job under Security configuration, script libraries, and job parameters (optional) you can add your libraries inside the field Python library path. If you like to add more libs, separate then with a semicolon.

Connecting to the Database. Writing and executing queries.

Passing Job Parameters

In Job Edit or when creating a Job, under Security configuration, script libraries, and job parameters

Inside the script you can access these parameters

About UPSERT

It is not supported. DynamicFrameWriter can either append to or overwrite an existing table. If your application requires more complex logic you’ll have to deal with this manually (as we do in our case).

One option is to use an action (foreach, foreachPartition) with a standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database, which we are going to do.

Create Staging tables

DynamicFrameWriter can only append data into tables. In our case, we require truncating and overwriting staging tables on each run with new data.

In order to do so, we need to use Spark write() to overwrite staging table

Executing sql using pg8000

In our case to achieve UPSERT is to do it manually using postgres driver :

Final Job script

That’s it, now you have a ready script that will crawl our S3 bucket and populate our destination database. Just run it when you need it, schedule it or configure a trigger.

I hope that this howto helped you understand how to glue everything together.

--

--