Enhancing Airflow Task Monitoring with Email Notifications

Pawan Kumar Ganjhu
8 min readJul 6, 2023

--

Image from — Source

Introduction:

Apache Airflow is a popular open-source platform used for orchestrating and scheduling workflows. It provides powerful features for monitoring and managing tasks, including the ability to send email notifications. In this article, we will explore how to leverage Airflow’s email capabilities to enhance task monitoring, specifically focusing on sending email notifications for task success and failure events.

Prerequisites:

Before we dive into the details, make sure you have a working knowledge of Apache Airflow and have it set up in your environment. Familiarity with Python programming and email protocols will also be beneficial.

Setting up Email Configuration:

To enable email notifications in Airflow, you need to configure the email settings in your Airflow installation. This typically involves specifying the SMTP server details, email sender address, and other relevant parameters. Refer to the Airflow documentation or your system administrator for the specific configuration steps.

Sending Success Emails:

When a task completes successfully, it’s often useful to receive an email notification confirming its success. To achieve this in Airflow, you can define a Python function that sends the success email and set it as the on_success_callback for the respective task. This callback function should retrieve the relevant task and execution information from the provided context and use the send_email function or a custom email library to send the email. Include essential details like task ID, execution date, and log URL in the email body for comprehensive information.

Sending Failure Emails:

In case a task fails during execution, receiving an email notification with the failure details is crucial for timely error identification and troubleshooting. Similar to success emails, you can define a separate Python function to handle failure emails and set it as the on_failure_callback for the task. The failure email function should retrieve the task and execution information, check the task status, and retrieve the error message from the task's logs if available. Include this error message in the email body along with other relevant information.

Code Example:

To illustrate the implementation, we provided a sample code snippet in the article that demonstrates the setup of success and failure email callbacks using PythonOperator and the send_email function. The code showcases how to extract task and execution details from the context, construct the email content, and send it to the specified recipients.

Note:

Remember to adapt the code examples and configurations provided in this article to suit your specific Airflow environment and requirements. Ensure that your Airflow installation is properly configured for email functionality and that the necessary dependencies are installed.

Additional Tips:

  • Experiment with customizing the email content by adding more task-related information or styling the email body.
  • Consider including additional recipients or adjusting the email subject to fit your team’s communication needs.
  • Explore other email-related operators or libraries supported by Airflow, such as the EmailOperator, to further enhance email notifications.

Remember to revise and tailor the article to fit your blog’s style and specific requirements. Include relevant screenshots or code snippets if necessary to enhance the reader’s understanding.

Sample Code- Example 1

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.email import send_email
from datetime import datetime

def send_success_status_email(context):
task_instance = context['task_instance']
task_status = task_instance.current_state()

subject = f"Airflow Task {task_instance.task_id} {task_status}"
body = f"The task {task_instance.task_id} finished with status: {task_status}.\n\n" \
f"Task execution date: {context['execution_date']}\n" \
f"Log URL: {task_instance.log_url}\n\n"

to_email = "abc@example.com" # Specify the recipient email address

send_email(to=to_email, subject=subject, html_content=body)

def send_failure_status_email(context):
task_instance = context['task_instance']
task_status = task_instance.current_state()

subject = f"Airflow Task {task_instance.task_id} {task_status}"
body = f"The task {task_instance.task_id} finished with status: {task_status}.\n\n" \
f"Task execution date: {context['execution_date']}\n" \
f"Log URL: {task_instance.log_url}\n\n"

to_email = "abc@example.com" # Specify the recipient email address

send_email(to=to_email, subject=subject, html_content=body)


# Create a DAG and define your tasks
dag = DAG(
'email_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)

task_to_watch = ...

success_email_task = PythonOperator(
task_id='success_email_task',
python_callable=send_success_status_email,
provide_context=True,
dag=dag
)

failure_email_task = PythonOperator(
task_id='failure_email_task',
python_callable=send_failure_status_email,
provide_context=True,
dag=dag
)

# Set the on_success_callback and on_failure_callback
success_email_task.set_upstream(task_to_watch)
failure_email_task.set_upstream(task_to_watch)

Explanation:

  1. The code defines two separate Python functions, send_success_status_email and send_failure_status_email, to handle success and failure email notifications, respectively.
  2. Each function receives the context object, which contains information about the task execution, including the task instance and execution date.
  3. The send_success_status_email function retrieves the task status using task_instance.current_state() and constructs the email subject and body accordingly.
  4. The send_failure_status_email function additionally checks if the task has failed, retrieves the error message from the task logs using the LogReader utility, and appends it to the email body.
  5. The send_email function is used to send the email, with the recipient email address specified in the to_email variable.
  6. In the DAG definition, you should replace task_to_watch with the task that you want to monitor for success and failure.
  7. Two PythonOperator tasks, success_email_task and failure_email_task, are created with their respective task IDs and the corresponding email callback functions as the python_callable parameter.
  8. The provide_context=True parameter allows the callback functions to access the task context, including the task instance and execution date.
  9. The set_upstream method is used to set the task dependencies, indicating that the success_email_task and failure_email_task should be executed after the task_to_watch task completes.
  10. Finally, the DAG is defined with a start date and no schedule interval (indicating a one-time execution).

Make sure to replace abc@example.com with the actual recipient email address where you want to receive the notifications.

This code demonstrates the basic setup for success and failure email notifications in Airflow. You can further customize the email content, add additional recipients, or explore other email-related operators or libraries supported by Airflow to enhance email notifications based on your specific requirements.

The code provided for sending success and failure email notifications can be used with any type of Apache Airflow operator, not just PythonOperator.

The key aspect to note is that the on_success_callback and on_failure_callback parameters of the operator should be set to the respective email callback functions (send_success_status_email and send_failure_status_email in the provided code).

Here’s an example of how you can apply the email notifications to different types of Airflow operators:

Sample Code- Example 2

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.email import send_email
from datetime import datetime

def send_success_status_email(context):
# Email success notification logic...

def send_failure_status_email(context):
# Email failure notification logic...

dag = DAG(
'email_example',
start_date=datetime(2023, 1, 1),
schedule_interval=None
)

task_to_watch = ...

# Using BashOperator with email notifications
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Hello, Airflow!"',
on_success_callback=send_success_status_email,
on_failure_callback=send_failure_status_email,
dag=dag
)

# Using EmailOperator with email notifications
email_task = EmailOperator(
task_id='email_task',
to='abc@example.com',
subject='Airflow Email Test',
html_content='This is a test email from Airflow.',
on_success_callback=send_success_status_email,
on_failure_callback=send_failure_status_email,
dag=dag
)

# Set the dependencies
bash_task.set_upstream(task_to_watch)
email_task.set_upstream(task_to_watch)

In this example, the email notifications are applied to a BashOperator and an EmailOperator. The on_success_callback and on_failure_callback parameters of each operator are set to the corresponding email callback functions.

Ensure that you adjust the code according to your specific DAG structure, operators, and email configurations.

Example-3


'''
# Code to run the Salesforce Sales Volume Task
# Importing required libraries
'''

from datetime import datetime, timedelta
from airflow.operators.email_operator import EmailOperator
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.email import send_email
from airflow.operators.bash_operator import BashOperator

default_args = {
'owner': 'airflow_master',
'depends_on_past': False,
'start_date': datetime(2023,7,4),
'email': ['abc.def@xyz.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1, # Number of retries
'retry_delay': timedelta(minutes=5), # Delay between retries
'email_to': ['abc.def@xyz.com'], # Add the 'to' email address here
'email_cc': ['abc.def@xyz.com'], # Add the 'cc' email address here
}

def send_status_email(context):
task_status = context['task_instance'].current_state()

subject = f"Airflow Task- {context['task_instance'].task_id}: {task_status}"
body = f"<html> <head> <body><br> Hi App Support Team <br> The task <b> {context['task_instance'].task_id} </b> finished with status: <b>{task_status}</b> <br><br> Task execution date: {context['execution_date']} <br> <p>Log URL: {context['task_instance'].log_url} <br> Regards<br> Dev Team </p></body> </head> </html>"

to_email = ["abc.def@xyz.com"] # Replace with the primary recipient email address
cc_email = ["abc.def@xyz.com"] # Replace with the CC recipient email address

send_email(to=to_email, cc=cc_email, subject=subject, html_content=body)

def send_failure_status_email(context):
#task_status = context['task_instance'].current_state()
task_instance = context['task_instance']
task_status = task_instance.current_state()

subject = f"Airflow Task- {context['task_instance'].task_id}: {task_status}"
body = f"<html> <head> <body><br> Hi App Support Team <br> The task <b> {context['task_instance'].task_id} </b> finished with status: <b>{task_status}</b> <br><br> Task execution date: {context['execution_date']} <br> <p>Log URL: {context['task_instance'].log_url}<br><br> NOTE: Kindly resolve the error so as the UPSTREAM tasks can run, until then it's blocked. <br><br> Regards<br> Dev Team</p></body> </head> </html>"

to_email = ["abc.def@xyz.com"] # Replace with the primary recipient email address
cc_email = ["abc.def@xyz.com"] # Replace with the CC recipient email address

send_email(to=to_email, cc=cc_email, subject=subject, html_content=body)

with DAG('email_alerting', default_args=default_args, schedule_interval=None, tags=['abc','def','ijk','xyz']) as dag:

# Define tasks
start = DummyOperator(task_id='start')
start.on_success_callback = send_status_email
start.on_failure_callback = send_failure_status_email

step_1 = DummyOperator(task_id='step_1')
step_1.on_success_callback = send_status_email
step_1.on_failure_callback = send_failure_status_email

step_2 = DummyOperator(task_id='step_2')
step_2.on_success_callback = send_status_email
step_2.on_failure_callback = send_failure_status_email

# Add other steps

data_valifate_file = DummyOperator(task_id='data_valifate_file')
data_valifate_file.on_success_callback = send_status_email
data_valifate_file.on_failure_callback = send_failure_status_email

step_21 = DummyOperator(task_id='step_21')
step_21.on_success_callback = send_status_email
step_21.on_failure_callback = send_failure_status_email

step_22 = DummyOperator(task_id='step_22')
step_22.on_success_callback = send_status_email
step_22.on_failure_callback = send_failure_status_email

# step_23 = DummyOperator(task_id='step_23')
step_23 = BashOperator(
task_id='step_23',
bash_command='exit 1', # Command that will exit with a non-zero code
dag=dag
)

step_23.on_success_callback = send_status_email
step_23.on_failure_callback = send_failure_status_email

end = DummyOperator(task_id='end')
end.on_success_callback = send_status_email
end.on_failure_callback = send_failure_status_email

# Define dependencies
start >> step_1 >> [step_6, step_7]
step_2 >> [step_6, step_7]
start >> [step_1, step_2, step_3, step_4, step_5, data_valifate_file] >> step_21 >> step_22 >> step_23 >> end
step_6 >> step_21
step_7 >> step_21
start >> step_17 >> step_18
step_17 >> step_19 >> end
step_18 >> step_19 >> end
start >> step_20 >> end

Graph View:

Success Email:

Failure Email:

Conclusion:

Enhancing task monitoring in Apache Airflow through email notifications provides valuable insights into task execution status and outcomes. By leveraging Airflow’s email capabilities, you can receive timely notifications for successful task completions and failures, enabling effective monitoring and issue resolution.

This article demonstrated the process of setting up success and failure email notifications in Airflow. It covered the configuration of email settings, the creation of Python functions to handle email notifications, and the utilization of callback functions with various Airflow operators.

Customizing the email content with task-specific details such as task ID, execution date, and log URLs allows for comprehensive information that aids in troubleshooting. Setting up email notifications for different operator types enables monitoring across a wide range of tasks in workflows.

Remember to adapt the provided code and configurations to match your Airflow environment. Thoroughly test the email functionality and ensure proper configuration of email server settings and credentials for reliable email delivery.

With enhanced task monitoring through email notifications, you can stay informed about task execution, promptly address failures, and maintain smooth data workflows.

Implementing email notifications in Airflow allows for proactive monitoring, rapid issue response, and efficient workflow management, contributing to improved data pipeline reliability and productivity.

Utilize the concepts and code presented in this article as a starting point, and explore additional email-related features and libraries supported by Airflow to further enhance task monitoring capabilities.

Sign up to discover human stories that deepen your understanding of the world.

Free

Distraction-free reading. No ads.

Organize your knowledge with lists and highlights.

Tell your story. Find your audience.

Membership

Read member-only stories

Support writers you read most

Earn money for your writing

Listen to audio narrations

Read offline with the Medium app

--

--

Pawan Kumar Ganjhu
Pawan Kumar Ganjhu

Written by Pawan Kumar Ganjhu

Data Engineer | Data & AI | R&D | Data Science | Data Analytics | Cloud

Responses (1)

Write a response