AWS Big Data Blog

Orchestrate Amazon EMR Serverless Spark jobs with Amazon MWAA, and data validation using Amazon Athena

As data engineering becomes increasingly complex, organizations are looking for new ways to streamline their data processing workflows. Many data engineers today use Apache Airflow to build, schedule, and monitor their data pipelines.

However, as the volume of data grows, managing and scaling these pipelines can become a daunting task. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) can help simplify the process of building, running, and managing data pipelines. By providing Apache Airflow as a fully managed platform, Amazon MWAA allows data engineers to focus on building data workflows instead of worrying about infrastructure.

Today, businesses and organizations require cost-effective and efficient ways to process large amounts of data. Amazon EMR Serverless is a cost-effective and scalable solution for big data processing that can handle large volumes of data. The Amazon Provider in Apache Airflow comes with EMR Serverless operators and is already included in Amazon MWAA, making it easy for data engineers to build scalable and reliable data processing pipelines. You can use EMR Serverless to run Spark jobs on the data, and use Amazon MWAA to manage the workflows and dependencies between these jobs. This integration can also help reduce costs by automatically scaling the resources needed to process data.

Amazon Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. You can use standard SQL to interact with data. Athena, a serverless and interactive analytics service, makes this possible without the need to manage complex infrastructure.

In this post, we use Amazon MWAA, EMR Serverless, and Athena to build a complete end-to-end data processing pipeline.

Solution overview

The following diagram illustrates the solution architecture.

The workflow includes the following steps:

  1. Create an Amazon MWAA workflow that retrieves data from your input Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use EMR Serverless to process the data stored in Amazon S3. EMR Serverless automatically scales up or down based on the workload, so you don’t need to worry about provisioning or managing any infrastructure.
  3. Use EMR Serverless to transform the data using PySpark code and then store the transformed data back in your S3 bucket.
  4. Use Athena to create an external table based on the S3 dataset and run queries to analyze the transformed data. Athena uses the AWS Glue Data Catalog to store the table metadata.

Prerequisites

You should have the following prerequisites:

Data preparation

To illustrate using EMR Serverless jobs with Apache Spark via Amazon MWAA and data validation using Athena, we use the publicly available NYC taxi dataset. Download the following datasets to your local machine:

  • Green taxi and Yellow taxi trip records – Trip records for yellow and green taxis, which include information such as pick-up and drop-off dates and times, locations, trip distances, and payment types. In our example, we use the latest Parquet files for 2022.
  • Dataset for Taxi zone lookup – A dataset that provides location IDs and corresponding zone details for taxis.

In later steps, we upload these datasets to Amazon S3.

Create solution resources

This section outlines the steps for setting up data processing and transformation.

Create an EMR Serverless application

You can create one or more EMR Serverless applications that use open source analytics frameworks like Apache Spark or Apache Hive. Unlike EMR on EC2, you do not need to delete or terminate EMR Serverless applications. EMR Serverless application is only a definition and once created, can be re-used as long as needed. This makes the MWAA pipeline simpler as now you just have to submit jobs to a pre-created EMR Serverless application.

By default, EMR Serverless application will auto-start on job submission and auto-stop when idle for 15 minutes by default to ensure cost efficiency. You can modify the amount of idle time or choose to turn the feature off.

To create an application using EMR Serverless console, follow the instructions in “Create an EMR Serverless application”. Note down the application ID as we will use it in following steps.

Create an S3 bucket and folders

Complete the following steps to set up your S3 bucket and folders:

  1. On the Amazon S3 console, create an S3 bucket to store the dataset.
  2. Note the name of the S3 bucket to use in later steps.
  3. Create an input_data folder for storing input data.
  4. Within that folder, create three separate folders, one for each dataset: green, yellow, and zone_lookup.

You can download and work with the latest datasets available. For our testing, we use the following files:

  • The green/ folder has the file green_tripdata_2022-06.parquet
  • The yellow/ folder has the file yellow_tripdata_2022-06.parquet
  • The zone_lookup/ folder has the file taxi_zone_lookup.csv

Set up the Amazon MWAA DAG scripts

Complete the following steps to set up your DAG scripts:

  1. Download the following scripts to your local machine:
    1. requirements.txt – A Python dependency is any package or distribution that is not included in the Apache Airflow base install for your Apache Airflow version on your Amazon MWAA environment. For this post, we use Boto3 version >=1.23.9.
    2. blog_dag_mwaa_emrs_ny_taxi.py – This script is a part of the Amazon MWAA DAG and consists of the following tasks: yellow_taxi_zone_lookup, green_taxi_zone_lookup, and ny_taxi_summary,. These tasks involve running Spark jobs to lookup taxi zones, and generating a data summary .
    3. green_zone.py – This PySpark script reads data files for green taxi rides and zone lookup, performs a join operation to combine them, and generates an output file containing green taxi rides with zone information. It utilizes temporary views for the df_green and df_zone data frames, performs column-based joins, and aggregates data like passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_green_zone, as Parquet files.
    4. yellow_zone.py – This PySpark script processes yellow taxi ride and zone lookup data files by joining them to generate an output file containing yellow taxi rides with zone information. The script accepts a user-provided S3 bucket name and initiates a Spark session with the application name yellow_zone. It reads the yellow taxi files and zone lookup file from the specified S3 bucket, creates temporary views, performs a join based on location ID, and calculates statistics such as passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_yellow_zone, as Parquet files.
    5. ny_taxi_summary.py – This PySpark script processes the green_zone and yellow_zone files to aggregate statistics on taxi rides, grouping data by service zones and location IDs. It requires an S3 bucket name as a command line argument, creates a SparkSession named ny_taxi_summary, reads the files from S3, performs a join, and generates a new data frame named ny_taxi_summary. It creates an output_data folder in the specified S3 bucket to write the resulting data frame to new Parquet files.
  2. On your local machine, update the blog_dag_mwaa_emrs_ny_taxi.py script with the following information:
    • Update your S3 bucket name in the following two lines:
      S3_LOGS_BUCKET = "<<bucket_name_here>>"
      S3_BASE_BUCKET = "<<bucket_name_here>>"
    • Update your role name ARN:
      JOB_ROLE_ARN = “<<emr_serverless_execution_role ARN here>>”
      e.g. arn:aws:iam::<<ACCOUNT_ID>>:role/<<ROLE_NAME>>
    • Update EMR Serverless Application ID. Use the Application ID created earlier.
      EMR_SERVERLESS_APPLICATION_ID  = “<<emr serverless application ID here>>
  3. Upload the requirements.txt file to the S3 bucket created earlier
  4. In the S3 bucket, create a folder named dags and upload the updated blog_dag_mwaa_emrs_ny_taxi.py file from your local machine.
  5. On the Amazon S3 console, create a new folder named scripts inside the S3 bucket and upload the scripts to this folder from your local machine.

Create an Amazon MWAA environment

To create an Airflow environment, complete the following steps:

  1. On the Amazon MWAA console, choose Create environment.
  2. For Name, enter mwaa_emrs_athena_pipeline.
  3. For Airflow version, choose the latest version (for this post, 2.5.1).
  4. For S3 Bucket, enter the path to your S3 bucket.
  5. For DAGs folder, enter the path to your dags folder.
  6. For Requirements file, enter the path to the requirements.txt file.
  7. Choose Next.
  8. For Virtual private cloud (VPC), choose a VPC that has a minimum of two private subnets.

This will populate two of the private subnets in your VPC.

  1. Under Web server access, select Public network.

This allows the Apache Airflow UI to be accessed over the internet by users granted access to the IAM policy for your environment.

  1. For Security group(s), select Create new security group.
  2. For Environment class, select mw1.small.
  3. For Execution role, choose Create a new role.
  4. For Role name, enter a name.
  5. Leave the other configurations as default and choose Next.
  6. On the next page, choose Create environment.

It may take about 20–30 minutes to create your Amazon MWAA environment.

  1. When the Amazon MWAA environment status changes to Available, navigate to the IAM console and update cluster execution role to add pass role privileges to emr_serverless_execution_role.

Trigger the Amazon MWAA DAG

To trigger the DAG, complete the following steps:

  1. On the Amazon MWAA console, choose Environments in the navigation pane.
  2. Open your environment and choose Open Airflow UI.
  3. Select blog_dag_mwaa_emr_ny_taxi, choose the play icon, and choose Trigger DAG.
  4. When the DAG is running, choose the DAG blog_dag_mwaa_emrs_ny_taxi and choose Graph to locate your DAG run workflow.

The DAG will take approximately 4–6 minutes to run all the scripts. You will see all the complete tasks and the overall status of the DAG will show as success.

To rerun the DAG, remove s3://<<your_s3_bucket here >>/output_data/.

Optionally, to understand how Amazon MWAA runs these tasks, choose the task you want to inspect.

Choose Run to view the task run details.

The following screenshot shows an example of the task logs.

If you like to dive deep in the execution logs, then on the EMR Serverless console, navigate to “Applications”. The Apache Spark driver logs will indicate the initiation of your job along with the details for executors, stages and tasks that were created by EMR Serverless. These logs can be helpful to monitor your job progress and troubleshoot failures.

By default, EMR Serverless will store application logs securely in Amazon EMR managed storage for a period of 30 days. However, you can also specify Amazon S3 or Amazon CloudWatch as your log delivery options during job submission.

Validate the final result set with Athena

Let’s validate the data loaded by the process using Athena SQL queries.

  1. On the Athena console, choose Query editor in the navigation pane.
  2. If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (<S3_BUCKET_NAME>/athena), then choose Save.
  3. In the query editor, enter the following query to create an external table:
CREATE EXTERNAL TABLE default.ny_taxi_summary(
  pu_service_zone string, 
  pulocationid bigint, 
  do_service_zone string, 
  dolocationid bigint, 
  passenger_count bigint, 
  trip_distance double, 
  fare_amount double, 
  extra double, 
  mta_tax double, 
  tip_amount double, 
  tolls_amount double, 
  improvement_surcharge double, 
  total_amount double, 
  congestion_surcharge double, 
  airport_fee double)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<<YOUR-S3-BUCKET Here>>/output_data/ny_taxi_summary/' -- *** Change bucket name to your bucket***
TBLPROPERTIES (
  'classification'='parquet', 
  'compressionType'='none');


Run the following query on the recently created ny_taxi_summary table to retrieve the first 10 rows to validate the data:

select * from default.ny_taxi_summary limit 10;

Clean up

To prevent future charges, complete the following steps:

  1. On the Amazon S3 console, delete the S3 bucket you created to store the Amazon MWAA DAG, scripts, and logs.
  2. On the Athena console, drop the table you created:
    drop table default.ny_taxi_summary;
  3. On the Amazon MWAA console, navigate to the environment that you created and choose Delete.
  4. On the EMR Studio console, delete the application.

To delete the application, navigate to the List applications page. Select the application that you created and choose Actions → Stop to stop the application. After the application is in the STOPPED state, select the same application and choose Actions → Delete.

Conclusion

Data engineering is a critical component of many organizations, and as data volumes continue to grow, it’s essential to find ways to streamline data processing workflows. The combination of Amazon MWAA, EMR Serverless, and Athena provides a powerful solution to build, run, and manage data pipelines efficiently. With this end-to-end data processing pipeline, data engineers can easily process and analyze large amounts of data quickly and cost-effectively without the need to manage complex infrastructure. The integration of these AWS services provides a robust and scalable solution for data processing, helping organizations make informed decisions based on their data insights.

Now that you’ve seen how to submit Spark jobs on EMR Serverless via Amazon MWAA, we encourage you to use Amazon MWAA to create a workflow that will run PySpark jobs via EMR Serverless.

We welcome your feedback and inquiries. Please feel free to reach out to us if you have any questions or comments.


About the authors

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Gaurav Parekh is a Solutions Architect helping AWS customers build large scale modern architecture. He specializes in data analytics and networking. Outside of work, Gaurav enjoys playing cricket, soccer and volleyball.


Audit History

December 2023: This post was reviewed for technical accuracy by Santosh Gantaram, Sr. Technical Account Manager.