AWS Big Data Blog

Introducing Amazon MWAA support for the Airflow REST API and web server auto scaling

Apache Airflow is a popular platform for enterprises looking to orchestrate complex data pipelines and workflows. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed service that streamlines the setup and operation of secure and highly available Airflow environments in the cloud.

In this post, we’re excited to introduce two new features that address common customer challenges and unlock new possibilities for building robust, scalable, and flexible data orchestration solutions using Amazon MWAA. First, the Airflow REST API support enables programmatic interaction with Airflow resources like connections, Directed Acyclic Graphs (DAGs), DAGRuns, and Task instances. Second, the option to horizontally scale web server capacity helps you handle increased demand, whether from REST API requests, command line interface (CLI) usage, or more concurrent Airflow UI users. Both features are available for all actively supported Amazon MWAA versions, including version 2.4.3 and newer.

Airflow REST API support

A frequently requested feature from Amazon MWAA customers has been the ability to interact with their workflows programmatically using Airflow’s APIs. The introduction of REST API support in Amazon MWAA addresses this need, providing a standardized way to access and manage your Airflow environment. With the new REST API, you can now invoke DAG runs, manage datasets, or get the status of Airflow’s metadata database, trigger, and scheduler—all without relying on the Airflow web UI or CLI.

Another example is building monitoring dashboards that aggregate the status of your DAGs across multiple Amazon MWAA environments, or invoke workflows in response to events from external systems, such as completed database jobs or new user signups.

This feature opens up a world of possibilities for integrating your Amazon MWAA environments with other systems and building custom solutions that use the power of your data orchestration pipelines.

To demonstrate this new capability, we use the REST API to invoke a new DAG run. Follow the process detailed in the following sections.

Authenticate with the Airflow REST API

For a user to authenticate with the REST API, they need the necessary permissions to create a web login token, similar to how it works with the Airflow UI. Refer to Creating an Apache Airflow web login token for more details. The user’s AWS Identity and Access Management (IAM) role or policy must include the CreateWebLoginToken permission to generate a token for authenticating. Furthermore, the user’s permissions for interacting with the REST API are determined by the Airflow role assigned to them within Amazon MWAA. The Airflow roles govern the user’s ability to perform various operations, such as invoking DAG runs, checking statuses, or modifying configurations, through the REST API endpoints.

The following is an example of the authentication process:

def get_session_info(region, env_name):
    """
    Retrieves the web server hostname and session cookie for an MWAA environment.
    
    Args:
        region (str): The AWS region where the MWAA environment is located.
        env_name (str): The name of the MWAA environment.

    Returns:
        tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
    """

    logging.basicConfig(level=logging.INFO)

    try:
        # Initialize MWAA client and request a web login token
        mwaa = boto3.client('mwaa', region_name=region)
        response = mwaa.create_web_login_token(Name=env_name)
        
        # Extract the web server hostname and login token
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]
        
        # Construct the URL needed for authentication 
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Make a POST request to the MWAA login url using the login payload
        response = requests.post(
            login_url,
            data=login_payload,
            timeout=10
        )

        # Check if login was succesfull 
        if response.status_code == 200:
        
            # Return the hostname and the session cookie 
            return (
                web_server_host_name,
                response.cookies["session"]
            )
        else:
            # Log an error
            logging.error("Failed to log in: HTTP %d", response.status_code)
            return None
    except requests.RequestException as e:
         # Log any exceptions raised during the request to the MWAA login endpoint
        logging.error("Request failed: %s", str(e))
        return None
    except Exception as e:
        # Log any other unexpected exceptions
        logging.error("An unexpected error occurred: %s", str(e))
        return None

The get_session_info function uses the AWS SDK for Python (Boto3) and the python request library for the initial steps required for authentication, retrieving a web token and a session cookie, which is valid for 12 hours. These will be used for subsequent REST API requests.

Invoke the Airflow REST API endpoint

When authentication is complete, you have the credentials to start sending requests to the API endpoints. In the following example, we use the endpoint /dags/{dag_id}/dagRuns to initiate a DAG run:

def trigger_dag(region, env_name, dag_name):
    """
    Triggers a DAG in a specified MWAA environment using the Airflow REST API.

    Args:
    region (str): AWS region where the MWAA environment is hosted.
    env_name (str): Name of the MWAA environment.
    dag_name (str): Name of the DAG to trigger.
    """

    logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")

    # Retrieve the web server hostname and session cookie for authentication
    try:
        web_server_host_name, session_cookie = get_session_info(region, env_name)
        if not session_cookie:
            logging.error("Authentication failed, no session cookie retrieved.")
            return
    except Exception as e:
        logging.error(f"Error retrieving session info: {str(e)}")
        return

    # Prepare headers and payload for the request
    cookies = {"session": session_cookie}
    json_body = {"conf": {}}

    # Construct the URL for triggering the DAG
    url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"

    # Send the POST request to trigger the DAG
    try:
        response = requests.post(url, cookies=cookies, json=json_body)
        # Check the response status code to determine if the DAG was triggered successfully
        if response.status_code == 200:
            logging.info("DAG triggered successfully.")
        else:
            logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
    except requests.RequestException as e:
        logging.error(f"Request to trigger DAG failed: {str(e)}")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # Check if the correct number of arguments is provided
    if len(sys.argv) != 4:
        logging.error("Incorrect usage. Proper format: python script_name.py <region> <env_name> <dag_name>")
        sys.exit(1)

    region = sys.argv[1]
    env_name = sys.argv[2]
    dag_name = sys.argv[3]

    # Trigger the DAG with the provided arguments
    trigger_dag(region, env_name, dag_name)

The following is the complete code of trigger_dag.py:

import sys
import boto3
import requests
import logging

def get_session_info(region, env_name):

    """
    Retrieves the web server hostname and session cookie for an MWAA environment.
    
    Args:
        region (str): The AWS region where the MWAA environment is located.
        env_name (str): The name of the MWAA environment.

    Returns:
        tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
    """

    logging.basicConfig(level=logging.INFO)

    try:
        # Initialize MWAA client and request a web login token
        mwaa = boto3.client('mwaa', region_name=region)
        response = mwaa.create_web_login_token(Name=env_name)
        
        # Extract the web server hostname and login token
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]
        
        # Construct the URL needed for authentication 
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Make a POST request to the MWAA login url using the login payload
        response = requests.post(
            login_url,
            data=login_payload,
            timeout=10
        )

        # Check if login was succesfull 
        if response.status_code == 200:
        
            # Return the hostname and the session cookie 
            return (
                web_server_host_name,
                response.cookies["session"]
            )
        else:
            # Log an error
            logging.error("Failed to log in: HTTP %d", response.status_code)
            return None
    except requests.RequestException as e:
         # Log any exceptions raised during the request to the MWAA login endpoint
        logging.error("Request failed: %s", str(e))
        return None
    except Exception as e:
        # Log any other unexpected exceptions
        logging.error("An unexpected error occurred: %s", str(e))
        return None

def trigger_dag(region, env_name, dag_name):
    """
    Triggers a DAG in a specified MWAA environment using the Airflow REST API.

    Args:
    region (str): AWS region where the MWAA environment is hosted.
    env_name (str): Name of the MWAA environment.
    dag_name (str): Name of the DAG to trigger.
    """

    logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")

    # Retrieve the web server hostname and session cookie for authentication
    try:
        web_server_host_name, session_cookie = get_session_info(region, env_name)
        if not session_cookie:
            logging.error("Authentication failed, no session cookie retrieved.")
            return
    except Exception as e:
        logging.error(f"Error retrieving session info: {str(e)}")
        return

    # Prepare headers and payload for the request
    cookies = {"session": session_cookie}
    json_body = {"conf": {}}

    # Construct the URL for triggering the DAG
    url = f"https://{web_server_host_name}/api/v1/dags/{dag_name}/dagRuns"

    # Send the POST request to trigger the DAG
    try:
        response = requests.post(url, cookies=cookies, json=json_body)
        # Check the response status code to determine if the DAG was triggered successfully
        if response.status_code == 200:
            logging.info("DAG triggered successfully.")
        else:
            logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
    except requests.RequestException as e:
        logging.error(f"Request to trigger DAG failed: {str(e)}")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # Check if the correct number of arguments is provided
    if len(sys.argv) != 4:
        logging.error("Incorrect usage. Proper format: python script_name.py <region> <env_name> <dag_name>")
        sys.exit(1)

    region = sys.argv[1]
    env_name = sys.argv[2]
    dag_name = sys.argv[3]

    # Trigger the DAG with the provided arguments
    trigger_dag(region, env_name, dag_name)

Run the request script

Run the request script with the following code, providing your AWS Region, Amazon MWAA environment name, and DAG name:

python3 trigger_dag.py <region> <env_name> <dag_name>

Validate the API result

The following screenshot shows the result in the CLI.

Check the DAG run in the Airflow UI

The following screenshot shows the DAG run status in the Airflow UI.

You can use any other endpoint in the REST API to enable programmatic control, automation, integration, and management of Airflow workflows and resources. To learn more about the Airflow REST API and its various endpoints, refer to the Airflow documentation.

Web server auto scaling

Another key request from Amazon MWAA customers has been the ability to dynamically scale their web servers to handle fluctuating workloads. Previously, you were constrained by two web servers provided with an Airflow environment on Amazon MWAA and had no way to horizontally scale web server capacity, which could lead to performance issues during peak loads. The new web server auto scaling feature in Amazon MWAA solves this problem. By automatically scaling the number of web servers based on CPU utilization and active connection count, Amazon MWAA makes sure your Airflow environment can seamlessly accommodate increased demand, whether from REST API requests, CLI usage, or more concurrent Airflow UI users.

Set up web server auto scaling

To set up auto scaling for your Amazon MWAA environment web servers, follow these steps:

  1. On the Amazon MWAA console, navigate to the environment you want to configure auto scaling for.
  2. Choose Edit.
  3. Choose Next.
  4. On the Configure advanced settings page, in the Environment class section, add the maximum and minimum web server count. For this example, we set the upper limit to 5 and lower limit to 2.

These settings allow Amazon MWAA to automatically scale up the Airflow web server when demand increases and scale down conservatively when demand decreases, optimizing resource usage and cost.

Trigger auto scaling programmatically

After you configure auto scaling, you might want to test how it behaves under simulated conditions. Using the Python code structure we discussed earlier for invoking a DAG, you can also use the Airflow REST API to simulate a load test and see how well your auto scaling setup responds. For the purpose of load testing, we have configured our Amazon MWAA environment with an mw1.small instance class. The following is an example implementation using load_test.py:

import sys
import time
import boto3
import requests
import logging
import concurrent.futures

def get_session_info(region, env_name):
    """
    Retrieves the web server hostname and session cookie for an MWAA environment.
    
    Args:
        region (str): The AWS region where the MWAA environment is located.
        env_name (str): The name of the MWAA environment.

    Returns:
        tuple: A tuple containing the web server hostname and session cookie, or (None, None) on failure.
    """
    try:
        # Create an MWAA client in the specified region
        mwaa = boto3.client('mwaa', region_name=region)
        # Request a web login token for the specified environment
        response = mwaa.create_web_login_token(Name=env_name)
        web_server_host_name = response["WebServerHostname"]
        web_token = response["WebToken"]

        # Construct the login URL and payload for authentication
        login_url = f"https://{web_server_host_name}/aws_mwaa/login"
        login_payload = {"token": web_token}

        # Authenticate and obtain the session cookie
        response = requests.post(login_url, data=login_payload, timeout=10)
        if response.status_code == 200:
            return web_server_host_name, response.cookies["session"]
        else:
            logging.error(f"Failed to log in: HTTP {response.status_code}")
            return None, None
    except requests.RequestException as e:
        logging.error(f"Request failed: {e}")
        return None, None
    except Exception as e:
        logging.error(f"An unexpected error occurred: {e}")
        return None, None
    
def call_rest_api(web_server_host_name, session_cookie):
    """
    Calls the Airflow web server API to fetch details of all DAGs and measures the time taken for the call.

    Args:
        web_server_host_name (str): The hostname of the MWAA web server.
        session_cookie (str): The session cookie for authentication.
    """
    # Define the endpoint for fetching all DAGs
    url = f"https://{web_server_host_name}/api/v1/dags"
    headers = {'Content-Type': 'application/json', 'Cookie': f'session={session_cookie}'}

    try:
        start_time = time.time()
        response = requests.get(url, headers=headers)
        elapsed_time = time.time() - start_time

        if response.status_code == 200:
            logging.info(f"API call successful, fetched {len(response.json()['dags'])} DAGs, took {elapsed_time:.2f} seconds")
        else:
            logging.error(f"API call failed with status code: {response.status_code}, took {elapsed_time:.2f} seconds")
    except requests.RequestException as e:
        logging.error(f"Error during API call: {e}")

def run_load_test(web_server_host_name, session_cookie, qps, duration):
    """
    Performs a load test by sending concurrent requests at a specified rate over a given duration.

    Args:
        web_server_host_name (str): The hostname of the MWAA web server.
        session_cookie (str): The session cookie for authentication.
        qps (int): Queries per second.
        duration (int): Duration of the test in seconds.
    """
    interval = 1.0 / qps
    start_time = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=qps) as executor:
        while time.time() - start_time < duration:
            futures = [executor.submit(call_rest_api, web_server_host_name, session_cookie) for _ in range(qps)]
            concurrent.futures.wait(futures)
            time.sleep(interval)
    
    logging.info("Load test completed.")

def main(region, env_name, qps, duration):
    """
    Main function to execute the load testing script.

    Args:
        region (str): AWS region where the MWAA environment is hosted.
        env_name (str): Name of the MWAA environment.
        qps (int): Queries per second.
        duration (int): Duration in seconds.
    """
    web_server_host_name, session_cookie = get_session_info(region, env_name)
    if not web_server_host_name or not session_cookie:
        logging.error("Failed to retrieve session information")
        return

    run_load_test(web_server_host_name, session_cookie, qps, duration)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    if len(sys.argv) != 5:
        logging.error("Incorrect usage. Proper format: python load_test.py <region> <env_name> <qps> <duration>")
        sys.exit(1)

    region = sys.argv[1]
    env_name = sys.argv[2]
    qps = int(sys.argv[3])
    duration = int(sys.argv[4])

    main(region, env_name, qps, duration)

The Python code uses thread pooling and concurrency concepts to help test the auto scaling performance of your web server by simulating traffic. This script automates the process of sending a specific number of requests per second to your web server, enabling you to trigger an auto scaling event.

You can use the following command to run the script. You have to provide the Region, Amazon MWAA environment name, how many queries per seconds you want to run against the web server, and the duration for which you want the load test to run.

python load_test.py <region> <env_name> <qps> <duration>

For example:

python load_test.py us_west_2 MyMWAAEnvironment 10 1080

The preceding command will run 10 queries per second for 18 minutes.

When the script is running, you will start seeing rows that show how long (in seconds) it took for the web server to process the request.

This time will gradually start to increase. As active connection count or CPU usage increase, Amazon MWAA will dynamically scale the web servers to accommodate the load.

As new web servers come online, your environment will be able to handle increased load, and the response time will drop. Amazon MWAA provides web server container metrics in the AWS/MWAA service namespace in Amazon CloudWatch, allowing you to monitor the web server performance. The following screenshots show an example of the auto scaling event.

Recommendation

Determining the appropriate minimum and maximum web server count involves carefully considering your typical workload patterns, performance requirements, and cost constrains. To set these values, consider metrics like the required REST API throughput at peak times and the maximum number of concurrent UI users you expect to have. It’s important to note that Amazon MWAA can support up to 10 queries per second (QPS) for the Airflow REST API at full scale for any environment size, provided you follow the recommended number of DAGs.

Amazon MWAA integration with CloudWatch provides granular metrics and monitoring capabilities to help you find the optimal configuration for your specific use case. If you anticipate periods of consistently high demand or increased workloads for an extended duration, you can configure your Amazon MWAA environment to maintain a higher minimum number of web servers. By setting the minimum web server setting to 2 or more, you can make sure your environment always has sufficient capacity to handle load peaks without needing to wait for auto scaling to provision additional resources. This comes at the cost of running more web server instances, which is a trade-off between cost-optimization and responsiveness.

Conclusion

Today, we are announcing the availability of the Airflow REST API and web server auto scaling in Amazon MWAA. The REST API provides a standardized way to programmatically interact with and manage resources in your Amazon MWAA environments. This enables seamless integration, automation, and extensibility of Amazon MWAA within your organization’s existing data and application landscape. With web server auto scaling, you can automatically increase the number of web server instances based on resource utilization, and Amazon MWAA makes sure your Airflow workflows can handle fluctuating workloads without manual intervention.

These features lay the foundation for you to build more robust, scalable, and flexible data orchestration pipelines. We encourage you to use them to streamline your data engineering operations and unlock new possibilities for your business.

To start building with Amazon MWAA, see Get started with Amazon Managed Workflows for Apache Airflow.

Stay tuned for future updates and enhancements to Amazon MWAA that will continue to enhance the developer experience and unlock new opportunities for data-driven organizations.


About the Authors

Mansi Bhutada is an ISV Solutions Architect based in the Netherlands. She helps customers design and implement well-architected solutions in AWS that address their business problems. She is passionate about data analytics and networking. Beyond work, she enjoys experimenting with food, playing pickleball, and diving into fun board games.

Kartikay KhatorKartikay Khator is a Solutions Architect within the Global Life Sciences at AWS, where he dedicates his efforts to developing innovative and scalable solutions that cater to the evolving needs of customers. His expertise lies in harnessing the capabilities of AWS Analytics services. Extending beyond his professional pursuits, he finds joy and fulfillment in the world of running and hiking. Having already completed two marathons, he is currently preparing for his next marathon challenge.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect, MWAA and AWS Glue ETL expert. He’s on a mission to make life easier for customers who are facing complex data integration and orchestration challenges. His secret weapon? Fully managed AWS services that can get the job done with minimal effort. Follow Kamen on LinkedIn to keep up to date with the latest MWAA and AWS Glue features and news!