Data Migration (ETL) from Aurora PostgreSQL to SQL server RDS & Redshift Cluster using AWS-Glue

Pawan Kumar Ganjhu
5 min readApr 27, 2023

--

Source

Description:

Data migration is the process of transferring data from one storage system to another. In this case, we are migrating data from Aurora PostgreSQL to both SQL Server RDS and a Redshift Cluster, using AWS Glue and PySpark as the code language.

AWS Glue is a fully-managed extract, transform, and load (ETL) service that makes it easy to move data between data stores. PySpark, on the other hand, is a Python API for Apache Spark, a powerful distributed computing system that can handle large-scale data processing.

The migration process involves extracting the data from Aurora PostgreSQL using AWS Glue, transforming it as needed, and then loading it into both SQL Server RDS and the Redshift Cluster. PySpark is used to write the code that performs the data transformations, which can include filtering, joining, and aggregating the data.

Overall, this process enables organizations to move their data between different database systems seamlessly, leveraging the power of AWS Glue and PySpark to automate the process and make it more efficient.

Part 1: Aurora Postgres Table to SQL server RDS
Part 2: Aurora Postgres Table to Redshift Cluster

Pre-Requisite:

i. Set up the required infrastructure on AWS: Create an Aurora PostgreSQL database, a SQL Server RDS instance, a Redshift cluster, and an AWS Glue job.

ii. Configure the database endpoints: Ensure that the Aurora PostgreSQL database, SQL Server RDS instance, and Redshift cluster are properly configured and accessible from the AWS Glue job.

Task Pre-Development:

i. Write the ETL code: Use PySpark to write the code that extracts the data from the Aurora PostgreSQL database, transforms it as needed, and loads it into both the SQL Server RDS instance and Redshift cluster.

ii. Create an AWS Glue job: Use the AWS Glue console to create a new job and specify the source (Aurora PostgreSQL) and destination (SQL Server RDS and Redshift) data stores.

iii. Configure the job properties: Configure the AWS Glue job properties, including the job name, IAM role, and other settings such as the maximum number of concurrent runs.

Task Post Development:

i. Test and run the job: Test the AWS Glue job to ensure that it’s properly configured and can successfully migrate the data. Once testing is complete, run the job to initiate the actual data migration process.

ii. Monitor the job progress: Monitor the AWS Glue job progress to ensure that the data migration is proceeding as expected and troubleshoot any errors or issues that arise.

iii. Validate the data migration: Once the AWS Glue job completes, validate that the data has been successfully migrated to both the SQL Server RDS instance and Redshift cluster by querying the destination data stores.

Steps for Development:

i. Establish connection and read data from PostgreSQL Table in PySpark data frame.

# Reading data from Source : Aurora Postgres
df1 = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql://"+source_conn["url"]+":5432/"+source_conn["db"]) \
.option("dbtable", source_table_name) \
.option("user", source_db_properties["username"]) \
.option("password", source_db_properties["password"]) \
.option("driver", "org.postgresql.Driver") \
.load()

ii. Establish connection and read data from SQL Server Table in PySpark data frame.

# Reading data from Target : SQL server RDS
df2 = spark.read \
.format("jdbc") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
.option("url","jdbc:sqlserver://;serverName="+target_conn["url"]+";DATABASE="+target_conn["db"]) \
.option("user",target_db_properties["username"]) \
.option("password",target_db_properties["password"]) \
.option("dbtable",target_table_name) \
.load()

iii. Establish connection and read data from Redshift Table in PySpark data frame.

# Reading data from Target: Redshift
df3 = glueContext.read.format("jdbc") \
.option("url","jdbc:redshift://" + target_conn_red["url"] + ":5439/" + target_conn_red["db"]) \
.option("user", target_db_properties_red["username"]) \
.option("password", target_db_properties_red["password"]) \
.option("driver", "com.amazon.redshift.jdbc.Driver") \
.option("dbtable", target_table_name) \
.option("aws_iam_role", args['redshift_iam_role']) \
.option("tempdir", "s3://bucket_name/temp/") \
.load()

iv. Convert datatype of columns at target based on datatype of columns at source. (An example of Transformation, their could be various other types of Transformation based on business requirements)

# get the column names and datatypes from SQL Server
sqlserver_columns = df2.dtypes
# iterate over the Redshift columns and cast the datatype if the name matches a SQL Server column
for col_name, col_type in df1.dtypes:
for sqlserver_col_name, sqlserver_col_type in sqlserver_columns:
if col_name == sqlserver_col_name:
df1 = df1.withColumn(col_name, df1[col_name].cast(sqlserver_col_type))
# get the column names and datatypes from Redshift
redshift_columns = df3.dtypes
# iterate over the Redshift columns and cast the datatype if the name matches a SQL Server column
for col_name, col_type in df1.dtypes:
for redshift_col_name, redshift_col_type in redshift_columns:
if col_name == redshift_col_name:
df1 = df1.withColumn(col_name, df1[col_name].cast(redshift_col_type))

v. Find the new record at PostgreSQL table by exceptAll operation on SQL Server Table (One can use subtract method as well) e.g. [df_final_1 = df1.subtract(df2)]

# Elements of df1 (Aurora Postgres) not in df2 (SQL server RDS)
df_final_1 = df1.exceptAll(df2)
print("Record Count after exceptAll Operation for table : ",target_table_name," = ",df_final_1.count())

vi. Find the new record at PostgreSQL table by exceptall operation on Redshift Table

# Elements of df1 (Aurora Postgres) not in df3 (Redshift Cluster)
df_final_2 = df1.exceptAll(df3)
print("Record Count after exceptAll Operation for table : ",target_table_name," : ",df_final_2.count())

vii. Write the newly inserted records to SQL Server table (from Step v.)

    # Writing data to Target : SQL server RDS
if len(df_final_1.head(1)) > 0:
df_final_1.write \
.mode("append")\
.format("jdbc") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")\
.option("url","jdbc:sqlserver://;serverName="+target_conn["url"]+";DATABASE="+target_conn["db"]) \
.option("user",target_db_properties["username"]) \
.option("password",target_db_properties["password"]) \
.option("dbtable",target_table_name) \
.save()
print("SQL Server- Writing Done")
else:
print("No new Records at Source")

viii. Write the newly inserted records to Redshift Table (from Step vi.)

# Writing data to Target : Redshift Cluster
if len(df_final_2.head(1)) > 0:
df_final_2.write \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://" + target_conn_red["url"] + ":5439/" + target_conn_red["db"]) \
.option("user", target_db_properties_red["username"]).option("password",target_db_properties_red["password"]) \
.option("dbtable", target_table_name) \
.option("tempdir", "s3://bucket_name/temp/") \
.option("aws_iam_role", args['redshift_iam_role']) \
.mode("append") \
.save()
print("Redshift- Writing Done")
else:
print("No new Records at Source")

ix. Repeat steps i to viii. for all the tables

Code:

Hope you had a good learning :)

--

--

Pawan Kumar Ganjhu
Pawan Kumar Ganjhu

Written by Pawan Kumar Ganjhu

Data Engineer | Data & AI | R&D | Data Science | Data Analytics | Cloud

No responses yet