How to pull data from a data source, deduplicate it and upsert it to the target database. I started to be interested in how AWS solved this. AWS team created a service called AWS Glue.
AWS Glue is a fully managed serverless ETL service. Using Glue we minimalize work required to prepare data for our databases, lakes or warehouses. The process of moving data among various data-stores is pretty simple.
In this tutorial, we are going to create an ETL job for CSV reports stored in the S3 bucket. We going to extract it, transform it and perform upsert on the existing database.
Our reports are stored in the S3 bucket. There are 3 folders named as report types containing files in CSV format.
Performance Reports ( performance for each campaign )
[“Campaign ID”, “Name”, “Views”]#[“Campaign ID”, “Name”, “Clicks”, “Cost”, “Day”, “Impressions”, “Views”]
Age Reports ( performance for each age with its adgroup, device, and campaign)
[“Campaign ID”, “Name”, “Views”, “Ad group ID”, “Device”, “Age Range”]#[“Campaign ID”, “Campaign”, “Clicks”, “Cost”, “Day”, “Impressions”, “Views”, “Ad group”, “Ad group ID”, “Device”, “Age Range”]
Gender Reports (performance for each gender with its adgroup, device, and campaign)
[“Campaign ID”, “Campaign”, “Clicks”, “Cost”, “Day”, “Impressions”, “Views”, “Ad group”, “Ad group ID”, “Device”, “Age Range”]
In our case, we need to crawl this data periodically (as they will be appended to mentioned folders) and upsert it to the PostgreSQL database.
We’ll need to :
- Configure Classifier: for
.csv
files - Configure Crawler: for our s3 bucket
- Save reports metadata to DataCatalog
- Configure DevEndpoint (Access data from our local Apache Zeppelin notebook)
- Create AwsGlue DatabaseConnection ( however in our case we might not need it, we need to have proper IAM role that can access database ).
- Configure Job: ( PySpark script which will “glue” everything together :) )
- Transform data from DataCatalog
- Deal with duplicates, in my example, we cannot ensure data uniqueness inside S3 bucket
- Save staging tables to a JDBC (PostgreSQL) source
- Import Python 2.7 lib for our Job
- Execute SQL queries: UPSERT, DROP
DataCatalog
Metastore for a our adwords reports datasources
DataCatalog is a managed store that lets us store and share metadata the same way as you would do in an Apache Hive metastore. There is only one DataCatalog that we consider as our repository to query and transform data. It centralizes all information on the data in one collection.
Classifier
A Classifier
reads the data in a data store. If it recognizes the format of the data, it generates a schema.
Crawler
The Crawler
go through your data, and inspect portions of it to determine the schema.
Let’s create our crawler
Specify crawler source type
pick Data Stores
, click next
Now pick folder inside S3 bucket we want to crawl
Add another datastore
YES
And pick other folders analogically for Age
and Gender
reports folder
Choose existing IAM role, update or create a new one
Review summary and click Finish
Our Crawler is ready to work.
Click Run Crawler
(this might take a moment)
When Crawler finish running we can see created tables
If you use VPC and the access to S3 doesn’t work, you might need to create vpc s3 endpoint.
Endpoint endpoint for s3 failed. VPC S3 endpoint validation failed for SubnetId: subnet-11111. VPC: vpc-aaaaaaa.
Reason: Could not find S3 endpoint or NAT gateway for subnetId: subnet-222222 in Vpc vpc-bbbbbb.
We have created our Classifier and Crawler, now it’s the time to start work with the data.
Dev Endpoint
Aws Glue can expose for us Dev endpoint which we can use for local access to data stored in our data source.
Make sure you work with AWS Glue in the region that S3 bucket lives
Advise: DELETE your endpoint as you finished your work. It will generate cost as it keeps spark cluster running, I strongly suggest to have restricted and controled access to aws glue.
In this example, we’ll use Apache Zeppelin’s notebook.
- Download Apache Zeppelin (the version with all interpreters) from the Zeppelin download page onto your local machine.
- Open Zeppelin in your browser by navigating to http://localhost:8080.
- In Zeppelin in the browser, open the drop-down menu at anonymous in the upper-right corner of the page, and choose Interpreter. On the interpreter’s page, search for spark, and choose edit on the right. Make the following changes:
Select the Connect to existing process checkbox, and then set Host to localhost and Port to 9007 (or whatever other port you are using for port forwarding).
In Properties, set master to yarn-client.
If there is a spark.executor.memory property, delete it by choosing the x in the action column.
If there is a spark.driver.memory property, delete it by choosing the x in the action column.
4. Choose Save at the bottom of the page, and then choose OK to confirm that you want to update the interpreter and restart it. Use the browser back button to return to the Zeppelin start page.
5. Next, use SSH local port forwarding to forward a local port (here, 9007) to the remote destination defined by AWS Glue (169.254.76.1:9007).
PUBLIC_ENDPOINT_DNS can be found in
Endpoint details
ssh -i KEY_USED_WHEN_CREATED_DEV_ENDPOINT -NTL 9007:169.254.76.1:9007 glue@PUBLIC_ENDPOINT_DNS
6. Running a Sample Script
In the local Zeppelin notebook start page, choose to create a new note. Name the new note, and confirm spark as the interpreter.
Records Count: 36323root
|-- campaign id: long
|-- campaign: string
|-- clicks: long
|-- cost: long
|-- day: string
|-- impressions: long
|-- views: long
TIP: If your schema has a field with the type of
choice
. To solve this we need to implement transform. For example, if we have the fields in our schema like:
|-- field1: choice
| |-- long```
| |-- string
|-- field2: choice
| |-- double
| |-- stringWe can transform it inside our script:
ResolveChoice.apply(frame = datasource, specs = [(‘
field1
','cast:double'),('field2
','cast:long')])
Now you can interact with our data source.
If you would like to write locally to RDS data, DataFrame writer will need a configured database connection.
Configure a connection to the database
You’ll need a PostgreSQL database inside RDS. Once you got that let’s add a connection to it.
Go to Databases
-> Connections
-> Add Connection
Review summary, click Finish
and test your connection.
self-referencing Inbound rule issued
Unable to find a suitable security group. Change connection type to JDBC and retry adding your connection.
You need to set up self-referencing Inbound rule
Transform: prepare data and schema
Job
A job is your business logic required to perform extract, transform and load (ETL) work. Job runs are initiated by triggers that can be scheduled or driven by events.
You define sources, destinations, and transformations. Enables to work with data from source to target destination using Apache Spark API (PySpark) script.
Most ETL jobs are triggered on schedule or when the data arrives.
Bookmarks
are a mechanism to prevent processing the same file for the second time. Glue uses transformation context to index processed files (transformation_ctx
).
DynamicFrame
Aws glue has handy DynamicFrame aside from SparkSQL DataFrame.
A
DynamicFrame
is similar to aDataFrame
, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required, and explicitly encodes schema inconsistencies using a choice (or union) type. You can resolve these inconsistencies to make your datasets compatible with data stores that require a fixed schema.
Example getting started generated script
Click `Save` and here we go, our generated script :
AWS Glue bootstrapped for us a basic script with simple flow:
- Get Reports table (simple data catalog from S3)
- Transform: ApplyMapping
- Transform: ResolveChoice
- Transform: DropNullFields
- DataSink: Append to database
In the case of our report, we need to create more custom flow with all 3 report types.
Let’s start working on our more real life Job example.
The flow we want to build :
- Extract reports (take one that was not processed (
Bookmarks
are set toEnabled
)
- Get campaign performance report
- Get age report
- Get gender report
2. Map reports
- For each row add
type
(PERFORMANCE, AGE or GENDER) - Generate MD5 REPORT_ID for uniqueness, each record as follows:
Campaign Performance decoded string :
{campaign id}_{day}
Age Performance decoded string :
{campaign id}_{ad group id}_{day}_{type}_{device}_{age range}
Gender Performance decoded string :
{campaign id}_{ad group id}_{day}_{type}_{device}_{gender}
3. Save campaings
from each report ( if data exists, it might now as all .csv were already processed )
- Transform: SelectFields : [‘campaign’, ‘campaign id’]
- Remove duplicated records
- Transform: ApplyMapping (change
campaign
toname
,campaign id
toid
) - Overwrite staging table with extracted campaigns
4. Save ad_groups
from Age report and Gender report (if data exists )
- Transform: SelectFields : [‘ad group’, ‘ad group id’]
- Remove duplicated records
- Transform: ApplyMapping (change
ad group
toname
,ad group id
toid
) - Overwrite staging table with extracted adgroups
5. Save reports
for each report type
- Transform: DropFields :
['ad group', 'campaign']
from age or gender reports['campaign']
from performance report- Remove duplicated records
- Transform: ApplyMapping
- (change
report_id
toid
, changeday
fromstring
today
etc.) - Overwrite staging table with extracted report
6. Populate database
- Upsert from Staging tables to Target tables
- Drop Staging tables
We can’t ensure that new data will be free of duplicated records so we are not going to use DynamicFrameWriter, as it only appends to the database table so we going to going to use Spark DataFrame write() to create staging tables and use upsert to overwrite existing records with fresh data.
UPSERT needs to be done manually as glue doesn’t provide any solution for PostgreSQL (For Redshift you can use preactions and post actions). We will need to import the PostgreSQL driver (pg8000
) to execute our queries for upsert.
Import External Library (pg8000) for AWS Glue Job
We need to add an external library to our PySpark Job. pg8000 (lib need to work under Python 2.7 and should be raw python code), current version might not work as it has new dependencies
- please pull version
1.12.4
- pack folder
pg8000
from within the library, upload it to S3 bucket of choice
Connecting to the Database. Writing and executing queries.
Passing Job Parameters
Inside the script you can access these parameters
About UPSERT
It is not supported. DynamicFrameWriter can either append to or overwrite an existing table. If your application requires more complex logic you’ll have to deal with this manually (as we do in our case).
One option is to use an action (foreach, foreachPartition) with a standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database, which we are going to do.
Create Staging tables
DynamicFrameWriter can only append data into tables. In our case, we require truncating and overwriting staging tables on each run with new data.
In order to do so, we need to use Spark write() to overwrite staging table
Executing sql using pg8000
In our case to achieve UPSERT is to do it manually using postgres driver :
Final Job script
That’s it, now you have a ready script that will crawl our S3 bucket and populate our destination database. Just run it when you need it, schedule it or configure a trigger.
I hope that this howto helped you understand how to glue everything together.