AWS Big Data Blog

Use AWS Glue ETL to perform merge, partition evolution, and schema evolution on Apache Iceberg

As enterprises collect increasing amounts of data from various sources, the structure and organization of that data often need to change over time to meet evolving analytical needs. However, altering schema and table partitions in traditional data lakes can be a disruptive and time-consuming task, requiring renaming or recreating entire tables and reprocessing large datasets. This hampers agility and time to insight.

Schema evolution enables adding, deleting, renaming, or modifying columns without needing to rewrite existing data. This is critical for fast-moving enterprises to augment data structures to support new use cases. For example, an ecommerce company may add new customer demographic attributes or order status flags to enrich analytics. Apache Iceberg manages these schema changes in a backward-compatible way through its innovative metadata table evolution architecture.

Similarly, partition evolution allows seamless adding, dropping, or splitting partitions. For instance, an ecommerce marketplace may initially partition order data by day. As orders accumulate, and querying by day becomes inefficient, they may split to day and customer ID partitions. Table partitioning organizes big datasets most efficiently for query performance. Iceberg gives enterprises the flexibility to incrementally adjust partitions rather than requiring tedious rebuild procedures. New partitions can be added in a fully compatible way without downtime or having to rewrite existing data files.

This post demonstrates how you can harness Iceberg, Amazon Simple Storage Service (Amazon S3), AWS Glue, AWS Lake Formation, and AWS Identity and Access Management (IAM) to implement a transactional data lake supporting seamless evolution. By allowing for painless schema and partition adjustments as data insights evolve, you can benefit from the future-proof flexibility needed for business success.

Overview of solution

For our example use case, a fictional large ecommerce company processes thousands of orders each day. When orders are received, updated, cancelled, shipped, delivered, or returned, the changes are made in their on-premises system, and those changes need to be replicated to an S3 data lake so that data analysts can run queries through Amazon Athena. The changes can contain schema updates as well. Due to the security requirements of different organizations, they need to manage fine-grained access control for the analysts through Lake Formation.

The following diagram illustrates the solution architecture.

The solution workflow includes the following key steps:

  1. Ingest data from on premises into a Dropzone location using a data ingestion pipeline.
  2. Merge the data from the Dropzone location into Iceberg using AWS Glue.
  3. Query the data using Athena.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Set up the infrastructure with AWS CloudFormation

To create your infrastructure with an AWS CloudFormation template, complete the following steps:

  1. Log in as an administrator to your AWS account.
  2. Open the AWS CloudFormation console.
  3. Choose Launch Stack:
  4. For Stack name, enter a name (for this post, icebergdemo1).
  5. Choose Next.
  6. Provide information for the following parameters:
    1. DatalakeUserName
    2. DatalakeUserPassword
    3. DatabaseName
    4. TableName
    5. DatabaseLFTagKey
    6. DatabaseLFTagValue
    7. TableLFTagKey
    8. TableLFTagValue
  7. Choose Next.
  8. Choose Next again.
  9. In the Review section, review the values you entered.
  10. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Submit.

In a few minutes, the stack status will change to CREATE_COMPLETE.

You can go to the Outputs tab of the stack to see all the resources it has provisioned. The resources are prefixed with the stack name you provided (for this post, icebergdemo1).

Create an Iceberg table using Lambda and grant access using Lake Formation

To create an Iceberg table and grant access on it, complete the following steps:

  1. Navigate to the Resources tab of the CloudFormation stack icebergdemo1 and search for logical ID named LambdaFunctionIceberg.
  2. Choose the hyperlink of the associated physical ID.

You’re redirected to the Lambda function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.

  1. On the Configuration tab, choose Environment variables in the left pane.
  1. On the Code tab, you can inspect the function code.

The function uses the AWS SDK for Python (Boto3) APIs to provision the resources. It assumes the provisioned data lake admin role to perform the following tasks:

  • Grant DATA_LOCATION_ACCESS access to the data lake admin role on the registered data lake location
  • Create Lake Formation Tags (LF-Tags)
  • Create a database in the AWS Glue Data Catalog using the AWS Glue create_database API
  • Assign LF-Tags to the database
  • Grant DESCRIBE access on the database using LF-Tags to the data lake IAM user and AWS Glue ETL IAM role
  • Create an Iceberg table using the AWS Glue create_table API:
response_create_table = glue_client.create_table(
DatabaseName= 'icebergdb1',
OpenTableFormatInput= { 
 'IcebergInput': { 
 'MetadataOperation': 'CREATE',
 'Version': '2'
 }
},
TableInput={
    'Name': ‘ecomorders’,
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'ordernum', 'Type': 'int'},
            {'Name': 'sku', 'Type': 'string'},
            {'Name': 'quantity','Type': 'int'},
            {'Name': 'category','Type': 'string'},
            {'Name': 'status','Type': 'string'},
            {'Name': 'shipping_id','Type': 'string'}
        ],  
        'Location': 's3://icebergdemo1-s3bucketiceberg-vthvwwblrwe8/iceberg/'
    },
    'TableType': 'EXTERNAL_TABLE'
    }
)
  • Assign LF-Tags to the table
  • Grant DESCRIBE and SELECT on the Iceberg table LF-Tags for the data lake IAM user
  • Grant ALL, DESCRIBE, SELECT, INSERT, DELETE, and ALTER access on the Iceberg table LF-Tags to the AWS Glue ETL IAM role
  1. On the Test tab, choose Test to run the function.

When the function is complete, you will see the message “Executing function: succeeded.”

Lake Formation helps you centrally manage, secure, and globally share data for analytics and machine learning. With Lake Formation, you can manage fine-grained access control for your data lake data on Amazon S3 and its metadata in the Data Catalog.

To add an Amazon S3 location as Iceberg storage in your data lake, register the location with Lake Formation. You can then use Lake Formation permissions for fine-grained access control to the Data Catalog objects that point to this location, and to the underlying data in the location.

The CloudFormation stack registered the data lake location.

Data location permissions in Lake Formation enable principals to create and alter Data Catalog resources that point to the designated registered Amazon S3 locations. Data location permissions work in addition to Lake Formation data permissions to secure information in your data lake.

Lake Formation tag-based access control (LF-TBAC) is an authorization strategy that defines permissions based on attributes. In Lake Formation, these attributes are called LF-Tags. You can attach LF-Tags to Data Catalog resources, Lake Formation principals, and table columns. You can assign and revoke permissions on Lake Formation resources using these LF-Tags. Lake Formation allows operations on those resources when the principal’s tag matches the resource tag.

Verify the Iceberg table from the Lake Formation console

To verify the Iceberg table, complete the following steps:

  1. On the Lake Formation console, choose Databases in the navigation pane.
  2. Open the details page for icebergdb1.

You can see the associated database LF-Tags.

  1. Choose Tables in the navigation pane.
  2. Open the details page for ecomorders.

In the Table details section, you can observe the following:

  • Table format shows as Apache Iceberg
  • Table management shows as Managed by Data Catalog
  • Location lists the data lake location of the Iceberg table

In the LF-Tags section, you can see the associated table LF-Tags.

In the Table details section, expand Advanced table properties to view the following:

  • metadata_location points to the location of the Iceberg table’s metadata file
  • table_type shows as ICEBERG

On the Schema tab, you can view the columns defined on the Iceberg table.

Integrate Iceberg with the AWS Glue Data Catalog and Amazon S3

Iceberg tracks individual data files in a table instead of directories. When there is an explicit commit on the table, Iceberg creates data files and adds them to the table. Iceberg maintains the table state in metadata files. Any change in table state creates a new metadata file that atomically replaces the older metadata. Metadata files track the table schema, partitioning configuration, and other properties.

Iceberg requires file systems that support the operations to be compatible with object stores like Amazon S3.

Iceberg creates snapshots for the table contents. Each snapshot is a complete set of data files in the table at a point in time. Data files in snapshots are stored in one or more manifest files that contain a row for each data file in the table, its partition data, and its metrics.

The following diagram illustrates this hierarchy.

When you create an Iceberg table, it creates the metadata folder first and a metadata file in the metadata folder. The data folder is created when you load data into the Iceberg table.

Contents of the Iceberg metadata file

The Iceberg metadata file contains a lot of information, including the following:

  • format-version –Version of the Iceberg table
  • Location – Amazon S3 location of the table
  • Schemas – Name and data type of all columns on the table
  • partition-specs – Partitioned columns
  • sort-orders – Sort order of columns
  • properties – Table properties
  • current-snapshot-id – Current snapshot
  • refs – Table references
  • snapshots – List of snapshots, each containing the following information:
    • sequence-number – Sequence number of snapshots in chronological order (the highest number represents the current snapshot, 1 for the first snapshot)
    • snapshot-id – Snapshot ID
    • timestamp-ms – Timestamp when the snapshot was committed
    • summary – Summary of changes committed
    • manifest-list – List of manifests; this file name starts with snap-< snapshot-id >
  • schema-id – Sequence number of the schema in chronological order (the highest number represents the current schema)
  • snapshot-log – List of snapshots in chronological order
  • metadata-log – List of metadata files in chronological order

The metadata file has all the historical changes to the table’s data and schema. Reviewing the contents on the metafile file directly can be a time-consuming task. Fortunately, you can query the Iceberg metadata using Athena.

Iceberg framework in AWS Glue

AWS Glue 4.0 supports Iceberg tables registered with Lake Formation. In the AWS Glue ETL jobs, you need the following code to enable the Iceberg framework:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
aws_account_id = boto3.client('sts').get_caller_identity().get('Account')

args = getResolvedOptions(sys.argv, ['JOB_NAME','warehouse_path']
    
# Set up configuration for AWS Glue to work with Apache Iceberg
conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", args['warehouse_path'])
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
conf.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

For read/write access to underlying data, in addition to Lake Formation permissions, the AWS Glue IAM role to run the AWS Glue ETL jobs was granted lakeformation: GetDataAccess IAM permission. With this permission, Lake Formation grants the request for temporary credentials to access the data.

The CloudFormation stack provisioned the four AWS Glue ETL jobs for you. The name of each job starts with your stack name (icebergdemo1). Complete the following steps to view the jobs:

  1. Log in as an administrator to your AWS account.
  2. On the AWS Glue console, choose ETL jobs in the navigation pane.
  3. Search for jobs with icebergdemo1 in the name.

Merge data from Dropzone into the Iceberg table

For our use case, the company ingests their ecommerce orders data daily from their on-premises location into an Amazon S3 Dropzone location. The CloudFormation stack loaded three files with sample orders for 3 days, as shown in the following figures. You see the data in the Dropzone location s3://icebergdemo1-s3bucketdropzone-kunftrcblhsk/data.

The AWS Glue ETL job icebergdemo1-GlueETL1-merge will run daily to merge the data into the Iceberg table. It has the following logic to add or update the data on Iceberg:

  • Create a Spark DataFrame from input data:
df = spark.read.format(dropzone_dataformat).option("header", True).load(dropzone_path)
df = df.withColumn("ordernum", df["ordernum"].cast(IntegerType())) \
    .withColumn("quantity", df["quantity"].cast(IntegerType()))
df.createOrReplaceTempView("input_table")
  • For a new order, add it to the table
  • If the table has a matching order, update the status and shipping_id:
stmt_merge = f"""
    MERGE INTO glue_catalog.{database_name}.{table_name} AS t
    USING input_table AS s 
    ON t.ordernum= s.ordernum
    WHEN MATCHED 
            THEN UPDATE SET 
                t.status = s.status,
                t.shipping_id = s.shipping_id
    WHEN NOT MATCHED THEN INSERT *
    """
spark.sql(stmt_merge)

Complete the following steps to run the AWS Glue merge job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the ETL job icebergdemo1-GlueETL1-merge.
  3. On the Actions dropdown menu, choose Run with parameters.
  4. On the Run parameters page, go to Job parameters.
  5. For the --dropzone_path parameter, provide the S3 location of the input data (icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge1).
  6. Run the job to add all the orders: 1001, 1002, 1003, and 1004.
  7. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge2.
  8. Run the job again to add orders 2001 and 2002, and update orders 1001, 1002, and 1003.
  9. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/data/merge3.
  10. Run the job again to add order 3001 and update orders 1001, 1003, 2001, and 2002.

Go to the data folder of table to see the data files written by Iceberg when you merged the data into the table using the Glue ETL job icebergdemo1-GlueETL1-merge.

Query Iceberg using Athena

The CloudFormation stack created the IAM user iceberguser1, which has read access on the Iceberg table using LF-Tags. To query Iceberg using Athena via this user, complete the following steps:

  1. Log in as iceberguser1 to the AWS Management Console.
  2. On the Athena console, choose Workgroups in the navigation pane.
  3. Locate the workgroup that CloudFormation provisioned (icebergdemo1-workgroup)
  4. Verify Athena engine version 3.

The Athena engine version 3 supports Iceberg file formats, including Parquet, ORC, and Avro.

  1. Go to the Athena query editor.
  2. Choose the workgroup icebergdemo1-workgroup on the dropdown menu.
  3. For Database, choose icebergdb1. You will see the table ecomorders.
  4. Run the following query to see the data in the Iceberg table:
    SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

  5. Run the following query to see table’s current partitions:
    DESCRIBE icebergdb1.ecomorders ;

Partition-spec describes how table is partitioned. In this example, there are no partitioned fields because you didn’t define any partitions on the table.

Iceberg partition evolution

You may need to change your partition structure; for example, due to trend changes of common query patterns in downstream analytics. A change of partition structure for traditional tables is a significant operation that requires an entire data copy.

Iceberg makes this straightforward. When you change the partition structure on Iceberg, it doesn’t require you to rewrite the data files. The old data written with earlier partitions remains unchanged. New data is written using the new specifications in a new layout. Metadata for each of the partition versions is kept separately.

Let’s add the partition field category to the Iceberg table using the AWS Glue ETL job icebergdemo1-GlueETL2-partition-evolution:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD PARTITION FIELD category ;

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL2-partition-evolution. When the job is complete, you can query partitions using Athena.

DESCRIBE icebergdb1.ecomorders ;

SELECT * FROM "icebergdb1"."ecomorders$partitions";

You can see the partition field category, but the partition values are null. There are no new data files in the data folder, because partition evolution is a metadata operation and doesn’t rewrite data files. When you add or update data, you will see the corresponding partition values populated.

Iceberg schema evolution

Iceberg supports in-place table evolution. You can evolve a table schema just like SQL. Iceberg schema updates are metadata changes, so no data files need to be rewritten to perform the schema evolution.

To explore the Iceberg schema evolution, run the ETL job icebergdemo1-GlueETL3-schema-evolution via the AWS Glue console. The job runs the following SparkSQL statements:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD COLUMNS (shipping_carrier string) ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    RENAME COLUMN shipping_id TO tracking_number ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ALTER COLUMN ordernum TYPE bigint ;

In the Athena query editor, run the following query:

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum asc ;

You can verify the schema changes to the Iceberg table:

  • A new column has been added called shipping_carrier
  • The column shipping_id has been renamed to tracking_number
  • The data type of the column ordernum has changed from int to bigint
    DESCRIBE icebergdb1.ecomorders;

Positional update

The data in tracking_number contains the shipping carrier concatenated with the tracking number. Let’s assume that we want to split this data in order to keep the shipping carrier in the shipping_carrier field and the tracking number in the tracking_number field.

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL4-update-table. The job runs the following SparkSQL statement to update the table:

UPDATE glue_catalog.icebergdb1.ecomorders
SET shipping_carrier = substring(tracking_number,1,3),
    tracking_number = substring(tracking_number,4,50)
WHERE tracking_number != '' ;

Query the Iceberg table to verify the updated data on tracking_number and shipping_carrier.

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

Now that the data has been updated on the table, you should see the partition values populated for category:

SELECT * FROM "icebergdb1"."ecomorders$partitions"
ORDER BY partition;

Clean up

To avoid incurring future charges, clean up the resources you created:

  1. On the Lambda console, open the details page for the function icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.
  2. In the Environment variables section, choose the key Task_To_Perform and update the value to CLEANUP.
  3. Run the function, which drops the database, table, and their associated LF-Tags.
  4. On the AWS CloudFormation console, delete the stack icebergdemo1.

Conclusion

In this post, you created an Iceberg table using the AWS Glue API and used Lake Formation to control access on the Iceberg table in a transactional data lake. With AWS Glue ETL jobs, you merged data into the Iceberg table, and performed schema evolution and partition evolution without rewriting or recreating the Iceberg table. With Athena, you queried the Iceberg data and metadata.

Based on the concepts and demonstrations from this post, you can now build a transactional data lake in an enterprise using Iceberg, AWS Glue, Lake Formation, and Amazon S3.


About the Author

Satya Adimula is a Senior Data Architect at AWS based in Boston. With over two decades of experience in data and analytics, Satya helps organizations derive business insights from their data at scale.