Apache Airflow is an open-source tool to programmatically author, schedule, and monitor workflows. It is one of the most robust platforms used by Data Engineers for orchestrating workflows or pipelines. You can easily visualize your data pipelines’ dependencies, progress, logs, code, trigger tasks, and success status.
With the help of Apache Airflow and Python we can easily create workflows without using underlying infrastructure for scalability and security.
What is workflow?
In simple terms, workflow is a sequence of tasks. Workflow will be started on schedule or triggered by and event. Workflow is used to handle big data processing pipelines.
Workflows based on Airflow have tasks where outputs are inputs for other tasks. In Workflow loop back is not possible because the output of every step is an input in the next step.
Typical Example of Workflow
In Airflows, these workflows are represented as Directed Acyclic Graphs (DAG).
What is DAG?
A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code. DAG is nothing but the collection of tasks that you want to run one after other. Each node in the graph is a task. In other words when we decide the workflow, we need to think about what work should be divided in small tasks.
The graph is enforced to be acyclic so that there are no circular dependencies that can cause infinite execution loops
A DAG has no cycles, a DAG is a data pipeline in Apache Airflow. So, whenever you read “DAG”, it means “data pipeline”.
Valid Workflow of DAG
Invalid Workflow of DAG
What is Node?
Node is nothing but an operator. An operator is a collection of logic of what we want to achieve in DAG. (For example: If we want to write a DAG in Python we will have a Python Operator, similarly if we want to execute the script in bash operator, then there will be a Bash Operator.)
What is the Need for Apache Airflow in Trending Technologies?
- Workflow Monitor
- Monitoring plays is a crucial role in data management. Apache airflow allows us to monitor the workflows. It ensures that your systems and processes are performing as expected. Monitoring capabilities is really important for companies that use Airflow to orchestrate and schedule their long-running tasks.
- Scheduler
-
- Scheduler is very important in data management. Data engineers cannot keep an eye on each other as well as every workflow. With the help of Scheduler we can schedule a Workflow then trigger the task instances once their dependencies are complete. Behind the scenes, the scheduler spins up a sub-process that monitors and stays in sync with all DAGs in the specified DAG directory. Once per minute, by default, the scheduler collects DAG parsing results and checks whether any active tasks can be triggered.
- Monitoring UI
-
- Airflow’s native UI lets you visualize your DAG and task statuses. In addition, you can monitor a few native metrics from this UI, but there is a lot of room for improvement (we’ll get into that later). This can help you do some light monitoring and troubleshooting for you DAGs.
- Rest API
-
- Airflow’s API allows you to create workflows from external sources, and to be data product on top of it. The rest API allows you to use the same paradigm used to build pipelines, to create asynchronous workflows, such as custom machine learning training operations.
- Alerting Systems
-
- Airflow provides a default alerting system on failed tasks, email is the default, but alerting through Slack can be set up using a call back and the slack operator.
Amazon Web Services (AWS) provides a similar type of service called Amazon Managed Workflows for Apache Airflow (MWAA). So, let’s start with MWAA and see how we can monitor workflows.
Prerequisites:
- S3 Bucket
- VPC Network
- Workflow Environment
You will need to…
- Create an S3 Bucket in which you can upload Python code
- Create a VPC and Subnets
- Create a workflow in MWAA
The dashboard of Apache Workflow should look like what is shown below.
Create a Python code accordingly to monitor workflow. Following sample Python code, you will upload to a S3 Bucket.
from datetime import datetime
# The DAG object; we’ll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import Python Operator
from airflow.operators.dummy import DummyOperator
#from tasks.get_configs import get_configs
#from tasks.get_targets import get_targets
#from tasks.push_targets import push_targets
def print_hello():
return “Hello World”
dag = DAG (‘hello_world’ ,description=’testing’,
schedule_interval=’0 12 * * *’,
start_date = datetime (2021 ,1,1), catchup=False)
dummy_operator = DummyOperator(task_id=’dummy_task’, retries=3, dag=dag)
hello_operator = PythonOperator(task_id=’hello_task’, python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator
Once done we will hit the Airflow UI and we will get the following monitoring dashboard of Airflow. Here we can schedule a workflow monitor our workflow and manage alerting.
How Can Perficient Help You?
Perficient is a certified Amazon Web Services partner with more than 10 years of experience delivering enterprise-level applications and expertise in cloud platform solutions, contact center, application modernization, migrations, data analytics, mobile, developer and management tools, IoT, serverless, security, and more. Paired with our industry-leading strategy and team, Perficient is equipped to help enterprises tackle the toughest challenges and get the most out of their implementations and integrations.
Learn more about our AWS practice and get in touch with our team here!
Very Helpful !
Amazing blog praful sir, very helpful. Thanks!!