AWS Big Data Blog

Build a transactional data lake using Apache Iceberg, AWS Glue, and cross-account data shares using AWS Lake Formation and Amazon Athena

Building a data lake on Amazon Simple Storage Service (Amazon S3) provides numerous benefits for an organization. It allows you to access diverse data sources, build business intelligence dashboards, build AI and machine learning (ML) models to provide customized customer experiences, and accelerate the curation of new datasets for consumption by adopting a modern data architecture or data mesh architecture.

However, many use cases, like performing change data capture (CDC) from an upstream relational database to an Amazon S3-based data lake, require handling data at a record level. Performing an operation like inserting, updating, and deleting individual records from a dataset requires the processing engine to read all the objects (files), make the changes, and rewrite entire datasets as new files. Furthermore, making the data available in the data lake in near-real time often leads to the data being fragmented over many small files, resulting in poor query performance and compaction maintenance.

In 2022, we announced that you can enforce fine-grained access control policies using AWS Lake Formation and query data stored in any supported file format using table formats such as Apache Iceberg, Apache Hudi, and more using Amazon Athena queries. You get the flexibility to choose the table and file format best suited for your use case and get the benefit of centralized data governance to secure data access when using Athena.

In this post, we show you how to configure Lake Formation using Iceberg table formats. We also explain how to upsert and merge in an S3 data lake using an Iceberg framework and apply Lake Formation access control using Athena.

Iceberg is an open table format for very large analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon S3. Iceberg also helps guarantee data correctness under concurrent write scenarios.

Solution overview

To explain this setup, we present the following architecture, which integrates Amazon S3 for the data lake (Iceberg table format), Lake Formation for access control, AWS Glue for ETL (extract, transform, and load), and Athena for querying the latest inventory data from the Iceberg tables using standard SQL.

The solution workflow consists of the following steps, including data ingestion (Steps 1–3), data governance (Step 4), and data access (Step 5):

  1. We use AWS Database Migration Service (AWS DMS) or a similar tool to connect to the data source and move incremental data (CDC) to Amazon S3 in CSV format.
  2. An AWS Glue PySpark job reads the incremental data from the S3 input bucket and performs deduplication of the records.
  3. The job then invokes Iceberg’s MERGE statements to merge the data with the target S3 bucket.
  4. We use the AWS Glue Data Catalog as a centralized catalog, which is used by AWS Glue and Athena. An AWS Glue crawler is integrated on top of S3 buckets to automatically detect the schema. Lake Formation allows you to centrally manage permissions and access control for Data Catalog resources in your S3 data lake. You can use fine-grained access control in Lake Formation to restrict access to data in query results.
  5. We use Athena integrated with Lake Formation to query data from the Iceberg table using standard SQL and validate table- and column-level access on Iceberg tables.

For this solution, we assume that the raw data files are already available in Amazon S3, and focus on processing the data using AWS Glue with Iceberg table format. We use sample item data that has the following attributes:

  • op – This represents the operation on the source record. This shows values I to represent insert operations, U to represent updates, and D to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to Amazon S3. Make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.
  • product_id – This is the primary key column in the source data table.
  • category – This column represents the category of an item.
  • product_name – This is the name of the product.
  • quantity_available – This is the quantity available in the inventory. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.
  • last_update_time – This is the time when the item record was updated at the source data.

We demonstrate implementing the solution with the following steps:

  1. Create an S3 bucket for input and output data.
  2. Create input and output tables using Athena.
  3. Insert the data into the Iceberg table from Athena.
  4. Query the Iceberg table using Athena.
  5. Upload incremental (CDC) data for further processing.
  6. Run the AWS Glue job again to process the incremental files.
  7. Query the Iceberg table again using Athena.
  8. Define Lake Formation policies.

Prerequisites

For Athena queries, we need to configure an Athena workgroup with engine version 3 to support Iceberg table format.

To validate cross-account access through Lake Formation for Iceberg table, in this post we used two accounts (primary and secondary).

Now let’s dive into the implementation steps.

Create an S3 bucket for input and output data

Before we run the AWS Glue job, we have to upload the sample CSV files to the input bucket and process them with AWS Glue PySpark code for the output.

To create an S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Specify the bucket name asiceberg-blog and leave the remaining fields as default.

S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as<Bucket-Name>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE}might help you get a unique name.

  1. On the bucket details page, choose Create folder.
  2. Create two subfolders. For this post, we createiceberg-blog/raw-csv-input andiceberg-blog/iceberg-output.
  3. Upload theLOAD00000001.csvfile into the raw-csv-input folder.

The following screenshot provides a sample of the input dataset.

Create input and output tables using Athena

To create input and output Iceberg tables in the AWS Glue Data Catalog, open the Athena query editor and run the following queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_lf_db;

As we explain later in this post, it’s essential to record the data locations when incorporating Lake Formation access controls.

-- Create external table in input CSV files. Replace the S3 path with your bucket name
CREATE EXTERNAL TABLE iceberg_lf_db.csv_input(
op string,
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
'areColumnsQuoted'='false',
'classification'='csv',
'columnsOrdered'='true',
'compressionType'='none',
'delimiter'=',',
'typeOfData'='file');

-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
CREATE TABLE iceberg_lf_db.iceberg_table_lf (
product_id bigint,
category string,
product_name string,
quantity_available bigint,
last_update_time timestamp)
PARTITIONED BY (category, bucket(16,product_id))
LOCATION 's3://glue-iceberg-demo/iceberg_blog/iceberg-output/'
TBLPROPERTIES (
'table_type'='ICEBERG',
'format'='parquet',
'write_target_data_file_size_bytes'='536870912'
);

-- Validate the input data
SELECT * FROM iceberg_lf_db.csv_input;

SELECT * FROM iceberg_lf_db.iceberg_table_lf;

Alternatively, you can use an AWS Glue crawler to create the table definition for the input files.

Insert the data into the Iceberg table from Athena

Optionally, we can insert data into the Iceberg table through Athena using the following code:

insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (200,'Mobile','Mobile brand 1',25,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (201,'Laptop','Laptop brand 1',20,cast('2023-01-19 09:51:40' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (202,'Tablet','Kindle',30,cast('2023-01-19 09:51:41' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (203,'Speaker','Alexa',10,cast('2023-01-19 09:51:42' as timestamp));
insert into iceberg_lf_demo.iceberg_lf_output_athena (product_id,category,product_name,quantity_available,last_update_time) values (204,'Speaker','Alexa',50,cast('2023-01-19 09:51:43' as timestamp));

For this post, we load the data using an AWS Glue job. Complete the following steps to create the job:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. Select Visual with a blank canvas.
  4. Choose Create.
  5. Choose Edit script.
  6. Replace the script with the following script:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql.functions import *
from awsglue.dynamicframe import DynamicFrame

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, max

from pyspark.conf import SparkConf

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
conf = SparkConf()

## spark.sql.catalog.job_catalog.warehouse can be passed as an ## runtime argument with value as the S3 path
## Please make sure to pass runtime argument –
## iceberg_job_catalog_warehouse with value as the S3 path 
conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)


## Read Input Table
## glueContext.create_data_frame.from_catalog can be more 
## performant and can be replaced in place of 
## create_dynamic_frame.from_catalog.

IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_lf_db", table_name = "csv_input", transformation_ctx = "IncrementalInputDyF")
IncrementalInputDF = IncrementalInputDyF.toDF()

if not IncrementalInputDF.rdd.isEmpty():
## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation
IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)

# Add new columns to capture OP value and what is the latest timestamp
inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))

# Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output
NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)

# Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements
finalInputDF.createOrReplaceTempView("incremental_input_data")
finalInputDF.show()

## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation
IcebergMergeOutputDF = spark.sql("""
MERGE INTO job_catalog.iceberg_lf_db.iceberg_table_lf t
USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
ON t.product_id = s.product_id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time
WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
""")

job.commit()
  1. On the Job details tab, specify the job name (iceberg-lf).
  2. For IAM Role, assign an AWS Identity and Access Management (IAM) role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  3. For Glue version, choose Glue 4.0 (Glue 3.0 is also supported).
  4. For Language, choose Python 3.
  5. Make sure Job bookmark has the default value of Enable.
  6. For Job parameters, add the following:
    1. Add the key--datalake-formatswith the valueiceberg.
    2. Add the key--iceberg_job_catalog_warehouse with the value as your S3 path (s3://<bucket-name>/<iceberg-warehouse-path>).
  7. Choose Save and then Run, which should write the input data to the Iceberg table with a MERGE statement.

Query the Iceberg table using Athena

After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:

SELECT * FROM iceberg_lf_db.iceberg_table_lf limit 10;

The output of the query should match the input, with one difference: the Iceberg output table doesn’t have theopcolumn.

Upload incremental (CDC) data for further processing

After we process the initial full load file, let’s upload an incremental file.

This file includes updated records on two items.

Run the AWS Glue job again to process incremental files

Because the AWS Glue job has bookmarks enabled, the job picks up the new incremental file and performs a MERGE operation on the Iceberg table.

To run the job again, complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Select the job and choose Run.

For this post, we run the job manually, but you can configure your AWS Glue jobs to run as part of an AWS Glue workflow or via AWS Step Functions (for more information, see Manage AWS Glue Jobs with Step Functions).

Query the Iceberg table using Athena after incremental data processing

When the incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for items 200 and 201.

The following screenshot shows the output.

Define Lake Formation policies

For data governance, we use Lake Formation. Lake Formation is a fully managed service that simplifies data lake setup, supports centralized security management, and provides transactional access on top of your data lake. Moreover, it enables data sharing across accounts and organizations. There are two ways to share data resources in Lake Formation: named resource access control (NRAC) and tag-based access control (TBAC). NRAC uses AWS Resource Access Manager (AWS RAM) to share data resources across accounts using Lake Formation V3. Those are consumed via resource links that are based on created resource shares. Lake Formation tag-based access control (LF-TBAC) is another approach to share data resources in Lake Formation, which defines permissions based on attributes. These attributes are called LF-tags.

In this example, we create databases in the primary account. Our NRAC database is shared with a data domain via AWS RAM. Access to data tables that we register in this database will be handled through NRAC.

Configure access controls in the primary account

In the primary account, complete the following steps to set up access controls using Lake Formation:

  1. On the Lake Formation console, choose Data lake locations in the navigation pane.
  2. Choose Register location.
  3. Update the Iceberg Amazon S3 location path shown in the following screenshot.

Grant access to the database to the secondary account

To grant database access to the external (secondary) account, complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Choose External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the database name.

The first grant should be at database level, and the second grant is at table level.

  1. For Database permissions, specify your permissions (for this post, we select Describe).
  2. Choose Grant.

Now you need to grant permissions at the table level.

  1. Select External accounts and enter the secondary account number.
  2. Select Named data catalog resources.
  3. Verify the table name.
  4. For Table permissions, specify the permissions you want to grant. For this post, we select Select and Describe.
  5. Choose Grant.

If you see the following error, you must revokeIAMAllowedPrincipalsfrom the data lake permissions.

To do so, select IAMAllowedPrincipals and choose Revoke.

Choose Revoke again to confirm.

After you revoke the data permissions, the permissions should appear as shown in the following screenshot.

Add AWS Glue IAM role permissions

Because the IAM principal role was revoked, the AWS Glue IAM role that was used in the AWS Glue job needs to be added exclusively to grant access as shown in the following screenshot.

You need to repeat these steps for the AWS Glue IAM role at table level.

Verify the permissions granted to the AWS Glue IAM role on the Lake Formation console.

Grant access to the Iceberg table to the external account

In the secondary account, complete the following steps to grant access to the Iceberg table to external account.

  1. On the AWS RAM console, choose Resource shares in the navigation pane.
  2. Choose the resource shares invitation sent from the primary account.
  3. Choose Accept resource share.

The resource status should now be active.

Next, you need to create a resource link for the shared Iceberg table and access through Athena.

  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Select the Iceberg table (shared from the primary account).
  3. On the Actions menu, choose Create resource link.
  4. For Resource link name, enter a name (for this post,iceberg_table_lf_demo).
  5. For Database, choose your database and verify the shared table and database are automatically populated.
  6. Choose Create.
  7. Select your table and on the Actions menu, choose View data.

You’re redirected to the Athena console, where you can query the data.

Grant column-based access in the primary account

For column-level restricted access, you need to grant access at the column level on the Iceberg table. Complete the following steps:

  1. On the Lake Formation console, navigate to your database.
  2. On the Actions menu, choose Grant.
  3. Select External accounts and enter the secondary account number.
  4. Select Named data catalog resources.
  5. Verify the table name.
  6. For Table permissions, choose the permissions you want to grant. For this post, we select Select.
  7. Under Data permissions, choose Column-based access.
  8. Select Include columns and choose your permission filters (for this post, Category and Quantity_available).
  9. Choose Grant.

Data with restricted columns can now be queried through the Athena console.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. In your secondary account, log in to the Lake Formation console.
  2. Drop the resource share table.
  3. In your primary account, log in to the Lake Formation console.
  4. Revoke the access you configured.
  5. Drop the AWS Glue tables and database.
  6. Delete the AWS Glue job.
  7. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

This post explains how you can use the Iceberg framework with AWS Glue and Lake Formation to define cross-account access controls and query data using Athena. It provides an overview of Iceberg and its features and integration approaches, and explains how you can ingest data, grant cross-account access, and query data through a step-by-step guide.

We hope this gives you a great starting point for using Iceberg to build your data lake platform along with AWS analytics services to implement your solution.


About the Authors

Vikram Sahadevan is a Senior Resident Architect on the AWS Data Lab team. He enjoys efforts that focus around providing prescriptive architectural guidance, sharing best practices, and removing technical roadblocks with joint engineering engagements between customers and AWS technical resources that accelerate data, analytics, artificial intelligence, and machine learning initiatives.

Suvendu Kumar Patra possesses 18 years of experience in infrastructure, database design, and data engineering, and he currently holds the position of Senior Resident Architect at Amazon Web Services. He is a member of the specialized focus group, AWS Data Lab, and his primary duties entail working with executive leadership teams of strategic AWS customers to develop their roadmaps for data, analytics, and AI/ML. Suvendu collaborates closely with customers to implement data engineering, data hub, data lake, data governance, and EDW solutions, as well as enterprise data strategy and data management.