PySpark for Data Engineering Beginners: An Extensive Guide

Pawan Kumar Ganjhu
10 min readMay 10, 2023

Image from — futurism.com

Introduction

Data engineering plays a crucial role in processing and transforming large volumes of data. Apache Spark, an open-source big data processing framework, provides a powerful tool called PySpark that allows data engineers to work with big data using Python. PySpark combines the simplicity of Python with the scalability and speed of Spark, making it an excellent choice for data engineering tasks. In this guide, we will explore PySpark for data engineering beginners, covering installation, requirements, and providing code examples to help you get started.

Table of Contents

  1. Installation and Setup
  2. Basics of PySpark
  3. Data Manipulation with PySpark
  4. Data Sources and Formats
  5. Working with Structured Data
  6. Handling Missing Data
  7. Aggregating and Summarizing Data
  8. Joining and Combining Data
  9. Working with Streaming Data
  10. PySpark and Cloud Computing
  11. Conclusion

1. Installation and Setup

Before we dive into PySpark, we need to set up our environment. Follow these steps to install PySpark:

Step 1: Install Apache Spark

  1. Go to the Apache Spark website (https://spark.apache.org/downloads.html).
  2. Download the latest stable version of Apache Spark.
  3. Extract the downloaded file to your desired location.

Step 2: Install Java Development Kit (JDK)

  1. PySpark requires Java to run. Install the Java Development Kit (JDK) if you don’t have it installed already.
  2. Download and install the JDK from the Oracle website (https://www.oracle.com/java/technologies/javase-jdk11-downloads.html).

Step 3: Set Environment Variables

  1. Open your terminal or command prompt.
  2. Set the SPARK_HOME environment variable to the location where you extracted Apache Spark.
  3. Append the bin directory of Apache Spark to your PATH environment variable.
  4. Set the JAVA_HOME environment variable to the location where you installed the JDK.

Step 4: Install PySpark

  1. Open your terminal or command prompt.
  2. Run the following command to install PySpark using pip:
pip install pyspark

Once you have completed these steps, you have successfully installed PySpark and are ready to start using it for data engineering tasks.

2. Basics of PySpark

PySpark provides a Python API for interacting with Spark. It allows you to create Spark applications using Python, leveraging the distributed computing capabilities of Spark. Here’s a simple code example to demonstrate the basics of PySpark:

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("PySparkExample").getOrCreate()

# Create a DataFrame
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])

# Print the DataFrame
df.show()

# Perform a simple transformation
df_filtered = df.filter(df["Age"] > 30)

# Print the filtered DataFrame
df_filtered.show()

# Stop the SparkSession
spark.stop()

In this example, we first import the SparkSession class from the pyspark.sql module. We then create a SparkSession object, which serves as the entry point for interacting with Spark.

Next, we create a DataFrame by providing a list of tuples containing our data and the column names. We use the createDataFrame method of the SparkSession object to create the DataFrame.

We then print the DataFrame using the show method, which displays the content of the DataFrame a tabular format.

Next, we perform a simple transformation on the DataFrame by filtering out rows where the “Age” column is greater than 30. We use the filter method on the DataFrame and provide a condition using column operations.

We then print the filtered DataFrame to see the results.

Finally, we stop the SparkSession using the stop method, which releases the resources occupied by Spark.

This basic example demonstrates how to create a SparkSession, create a DataFrame, perform transformations, and display the results. It forms the foundation for more complex data engineering tasks with PySpark.

3. Data Manipulation with PySpark

One of the key tasks in data engineering is data manipulation. PySpark provides a rich set of functions and APIs for manipulating data. Let’s explore some common data manipulation tasks using PySpark:

Selecting Columns

To select specific columns from a DataFrame, you can use the select method. Here's an example:

# Selecting specific columns
df.select("Name", "Age").show()

Adding Columns

You can add new columns to a DataFrame using the withColumn method. Here's an example:

# Adding a new column
df_with_gender = df.withColumn("Gender", "Female")
df_with_gender.show()

Filtering Rows

To filter rows based on a condition, you can use the filter or where methods. Here's an example:

# Filtering rows based on a condition
df_filtered = df.filter(df["Age"] > 30)
df_filtered.show()

Sorting Data

To sort the DataFrame based on one or more columns, you can use the orderBy method. Here's an example:

# Sorting the DataFrame
df_sorted = df.orderBy(df["Age"])
df_sorted.show()

Grouping and Aggregating Data

To perform aggregation operations like sum, count, average, etc., you can use the groupBy and agg methods. Here's an example:

# Grouping and aggregating data
df_grouped = df.groupBy("Gender").agg({"Age": "avg"})
df_grouped.show()

These are just a few examples of data manipulation tasks you can perform with PySpark. The framework provides a wide range of functions and operations to handle complex data manipulation requirements efficiently.

4. Data Sources and Formats

PySpark supports various data sources and formats, allowing you to read and write data from different storage systems and file formats. Some commonly used data sources and formats in PySpark are:

CSV

To read and write CSV files, you can use the csv data source. Here's an example:

# Reading a CSV file
df_csv = spark.read.csv("data.csv", header=True, inferSchema=True)

# Writing a DataFrame to a CSV file
df_csv.write.csv("output.csv", header=True)

Parquet

Parquet is a columnar storage file format optimized for big data processing. PySpark provides built-in support for reading and writing Parquet files. Here’s an example:

# Reading a Parquet file
df_parquet = spark.read.parquet("data.parquet")

# Writing a DataFrame to a Parquet file
df_parquet.write.parquet("output.parquet")

JSON

To read and write JSON files, you can use the json data source. Here's an example:

# Reading a JSON file
df_json = spark.read.json("data.json")

# Writing a DataFrame to a JSON file
df_json.write.json("output.json")

JDBC

PySpark allows you to interact with databases using the JDBC (Java Database Connectivity) API. This enables you to read data from and write data to various database systems. Here’s an example:

# Reading data from a database table
df_jdbc = spark.read.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/mydatabase") \
.option("dbtable", "mytable").option("user", "myuser").option("password", "mypassword").load()

# Writing data to a database table
df_jdbc.write.format("jdbc").option("url", "jdbc:postgresql://localhost:5432/mydatabase") \
.option("dbtable", "mytable").option("user", "myuser").option("password", "mypassword").mode("append").save()

PySpark also supports various other data sources and formats such as Avro, ORC, Delta Lake, and more. You can explore the official PySpark documentation for detailed information on working with different data sources and formats.

5. Working with Structured Data

PySpark provides a powerful API for working with structured data, allowing you to define and manipulate data using structured schemas. This helps in enforcing data consistency and performing efficient data processing. Let’s explore some features related to structured data in PySpark:

Defining a Schema

You can define a schema for your DataFrame explicitly. Here’s an example:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define the schema
schema = StructType([
StructField("Name", StringType(), nullable=False),
StructField("Age", IntegerType(), nullable=True),
])

# Create a DataFrame with the defined schema
df_with_schema = spark.createDataFrame(data, schema)

Renaming Columns

To rename columns in a DataFrame, you can use the withColumnRenamed method. Here's an example:

# Renaming a column
df_renamed = df.withColumnRenamed("Name", "Full Name")
df_renamed.show()

Dropping Columns

If you want to remove columns from a DataFrame, you can use the drop method. Here's an example:

# Dropping a column
df_dropped = df.drop("Age")
df_dropped.show()

Working with Null Values

PySpark provides functions to handle null values in DataFrame columns. Here are a few examples:

# Checking for null values
df_null = df.filter(df["Age"].isNull())

# Filling null values with a default value
df_filled = df.fillna({"Age": 0})

# Dropping rows with null values
df_no_null = df.dropna()

These features help you structure and manipulate your data effectively, making it easier to process and analyze.

6. Handling Missing Data

Dealing with missing data is a common challenge in data engineering. PySpark offers several techniques to handle missing data efficiently. Let’s explore some methods:

Dropping Rows with Missing Data

You can drop rows that contain missing data using the dropna method. Here's an example:

# Dropping rows with missing data
df_no_missing = df.dropna()

Filling Missing Data

To fill missing data with a default value or specific strategy, you can use the fillna method. Here's an example:

# Filling missing data with a default value
df_filled = df.fillna(0)

# Filling missing data with the mean value of a column
mean_age = df.select("Age").agg({"Age": "mean"}).first()[0]
df_filled = df.fillna

Handling Missing Data with Imputation

Another approach to handling missing data is imputation, where missing values are replaced with estimated or predicted values. PySpark provides a library called Imputer to perform imputation. Here's an example:

from pyspark.ml.feature import Imputer

# Create an Imputer object
imputer = Imputer(inputCols=["Age"], outputCols=["Age_imputed"])

# Fit the imputer model on the DataFrame
imputer_model = imputer.fit(df)

# Apply the imputation on the DataFrame
df_imputed = imputer_model.transform(df)

By using techniques like dropping rows, filling missing data, or imputation, you can handle missing data effectively in your data engineering pipelines.

7. Aggregating and Summarizing Data

PySpark provides powerful capabilities for aggregating and summarizing data. These operations allow you to calculate statistics, perform calculations, and summarize your data. Let’s explore some common aggregation and summarization techniques in PySpark:

Grouping and Aggregating Data

To group data based on one or more columns and perform aggregation operations, you can use the groupBy and agg methods. Here's an example:

# Grouping and aggregating data
df_grouped = df.groupBy("Gender").agg({"Age": "avg", "Salary": "sum"})
df_grouped.show()

Calculating Descriptive Statistics

PySpark provides functions to calculate descriptive statistics such as mean, standard deviation, minimum, maximum, etc. Here’s an example:

# Calculating descriptive statistics
df_stats = df.describe(["Age", "Salary"])
df_stats.show()

Pivot Tables

To create a pivot table from your DataFrame, you can use the pivot method. Here's an example:

# Creating a pivot table
df_pivot = df.groupBy("Gender").pivot("City").agg({"Salary": "sum"})
df_pivot.show()

These aggregation and summarization techniques help you gain insights from your data and perform calculations at scale.

8. Joining and Combining Data

In data engineering, combining and joining data from multiple sources is a common task. PySpark provides various methods to join and combine DataFrames. Let’s explore some techniques:

Inner Join

To perform an inner join between two DataFrames based on a common column, you can use the join method with the how parameter set to "inner". Here's an example:

# Performing an inner join
df_joined = df1.join(df2, df1["ID"] == df2["ID"], "inner")
df_joined.show()

Outer Join

To perform an outer join (including all rows from both DataFrames) based on a common column, you can use the join method with the how parameter set to "outer". Here's an example:

# Performing an outer join
df_joined = df1.join(df2, df1["ID"] == df2["ID"], "outer")
df_joined.show()

Union

To combine two DataFrames vertically (i.e., appending rows), you can use the union method. Here's an example:

# Combining DataFrames using union
df_combined = df1.union(df2)
df_combined.show()

These techniques enable you to merge and combine data from multiple sources efficiently.

9. Working with Streaming Data

PySpark provides support for processing streaming data using its Structured Streaming API. This allows you to perform real-time data processing and analytics on continuous data streams. Here’s a high level overview of working with streaming data in PySpark:

Creating a Streaming DataFrame

To work with streaming data, you first need to create a Streaming DataFrame by defining a source and a schema. Here’s an example of reading data from a Kafka topic:

from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StringType, IntegerType

# Define the schema for the streaming data
schema = StructType().add("name", StringType()).add("age", IntegerType())

# Create a Streaming DataFrame from a Kafka topic
streaming_df = spark.readStream.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "topic-name") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("data"))

Applying Transformations

Once you have a Streaming DataFrame, you can apply transformations and computations to the streaming data. For example, you can filter, aggregate, or join the streaming data with other DataFrames.

Writing Streaming Output

You can write the transformed streaming data to various sinks such as console, file systems, databases, or other streaming systems. Here’s an example of writing the data to the console:

# Write the transformed streaming data to the console
query = streaming_df.writeStream.outputMode("append").format("console").start()
query.awaitTermination()

Running the Streaming Job

To start processing the streaming data, you need to start the streaming query. Use the start() method to initiate the query execution. You can then use awaitTermination() to keep the streaming job running until explicitly stopped.

Working with streaming data requires additional considerations such as checkpointing, managing watermarks for event time processing, and handling late data. The Structured Streaming API provides comprehensive documentation on these topics to help you build robust and scalable streaming data pipelines.

10. PySpark and Cloud Computing

PySpark seamlessly integrates with cloud computing platforms, enabling you to scale your data engineering tasks and leverage cloud-based services. Here are a few examples:

Amazon Web Services (AWS) Integration

PySpark can interact with various AWS services such as Amazon S3 for data storage, Amazon Redshift for data warehousing, and Amazon EMR for running Spark clusters.

Google Cloud Platform (GCP) Integration

PySpark integrates with GCP services like Google Cloud Storage, BigQuery, and Dataproc. It allows you to read and write data from these services and leverage cloud-based infrastructure for scalable processing.

Microsoft Azure Integration

PySpark supports integration with Azure services like Azure Blob Storage, Azure Data Lake Storage, Azure SQL Database, and Azure Databricks. This enables you to work with data stored in Azure and leverage Azure’s data processing and analytics capabilities.

Each cloud platform provides specific libraries, connectors, and configuration options to facilitate seamless integration with PySpark. You can refer to the respective cloud provider’s documentation for detailed instructions on using PySpark in the cloud environment.

11. Conclusion

In this extensive guide, we explored PySpark for data engineering beginners. We covered the installation and setup process, discussed the basics of PySpark, and provided code examples for data manipulation, working with different data sources and formats, handling missing data, aggregating and summarizing data, joining and combining data, working with streaming data, and integrating PySpark with cloud computing platforms.

PySpark provides a powerful and scalable framework for data engineering tasks, allowing you to process and transform large volumes of data efficiently. By mastering PySpark, you can become proficient in handling big data and build robust data engineering pipelines.

Remember to consult the official PySpark documentation for detailed information, explore additional features and advanced techniques, and stay updated with the latest developments in the PySpark ecosystem.

As you continue your journey in PySpark data engineering, don’t hesitate to explore more complex scenarios, such as advanced data transformations, machine learning with PySpark’s MLlib library, and optimizing performance for large-scale data processing.

Additionally, consider joining online communities, forums, and attending data engineering conferences to connect with fellow practitioners, learn from their experiences, and stay up to date with industry trends.

With PySpark’s extensive capabilities, you are equipped to tackle a wide range of data engineering challenges and unlock the potential of big data. So, dive in, experiment with real-world datasets, and unleash the power of PySpark for your data engineering endeavors.

Happy coding and happy data engineering!

http://redbird-coding.com/

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

Pawan Kumar Ganjhu
Pawan Kumar Ganjhu

Written by Pawan Kumar Ganjhu

Data Engineer | Data & AI | R&D | Data Science | Data Analytics | Cloud

Responses (1)

Write a response

crucial

very important