Apache Iceberg Compaction

Apache Iceberg Compaction

Compact your Iceberg tables using Spark or AWS Athena

Introduction

Apache Iceberg introduces a powerful compaction feature, especially beneficial for Change Data Capture (CDC) workloads. This document outlines the key properties and commands necessary for effective Iceberg table management, focusing on compaction and maintenance operations, when:

  1. Interfacing with Amazon Athena's abstraction layer over Iceberg's native capabilities.

  2. Using Apache Spark.

Overview of Compaction in Apache Iceberg

Compaction in Apache Iceberg is crucial for optimizing data storage and retrieval, particularly in environments with high data mutation rates. This process involves rewriting data files to improve query performance and remove obsolete data associated with old snapshots. Effective tuning of Iceberg's properties is essential for achieving optimal results.

Using Amazon Athena for Iceberg Table Management

Amazon Athena provides an abstraction layer for managing Iceberg tables through two main functions: OPTIMIZE and VACUUM. These functions facilitate table compaction and the removal of outdated data files, streamlining Iceberg table management within Athena.

To compact a table and remove all of the data files related to old snapshots, this will be a sample flow:

// Compaction
OPTIMIZE my_db.my_table REWRITE DATA USING BIN_PACK;

// Removing old snapshots and files
ALTER TABLE my_db.my_table SET TBLPROPERTIES (
  'vacuum_max_metadata_files_to_keep'='1'
)
VACUUM my_db.my_table;

// Validating if old snapshots are removed and only one `replace` operation 
// remains in the snapshots
select * from "my_db"."my_table$snapshots";
select * from "my_db"."my_table$history";

Now let's investigate each part individually.

1. Compaction

To compact a table and optimize its layout, the OPTIMIZE command is used as follows:

OPTIMIZE my_db.my_table REWRITE DATA USING BIN_PACK;

This command rewrites the data in the specified table using the BIN_PACK strategy, creating a new snapshot that consolidates older files within the partition. Note that this retains old snapshots and data files.

2. Removing Old Snapshots and Files

To remove outdated snapshots and their associated data files, configure the vacuum_max_metadata_files_to_keep property and use the VACUUM command:

ALTER TABLE my_db.my_table SET TBLPROPERTIES (
  'vacuum_max_metadata_files_to_keep'='1'
)
VACUUM my_db.my_table;

By setting vacuum_max_metadata_files_to_keep to 1, you instruct the system to retain only the most recent metadata file, thus removing unnecessary old data files from storage. This step is crucial for maintaining storage efficiency and cost-effectiveness.

💡
When you set vacuum_max_metadata_files_to_keep property to 1, Iceberg only stores 1 recoverable snapshot. On a Production workload, make sure to choose the value based on your use case so you can recover to older snapshots if something happens.

3. Validation

After compaction and vacuuming, validate the operations' success by querying the table's snapshots and history:

SELECT * FROM "my_db"."my_table$snapshots";
SELECT * FROM "my_db"."my_table$history";

These queries help ensure that old snapshots are removed as expected and only one replace operation (the recent compaction) remains recorded in the table's snapshots and history.

Using Apache Spark

In addition to Amazon Athena, Apache Spark can also be used to manage and optimize Iceberg tables.

In our case, we are using Apache Spark on Production to make all of our workflow unified and easy to track and maintain. To achieve so, we have created a IcebergOptimizer Python class which we are running it on AWS Glue to optimize all of the tables stored in Glue Catalog:

import logging
from datetime import datetime, timedelta
from pyspark.sql import SparkSession

logger = logging.getLogger(__name__)

class IcebergOptimizer:
    def __init__(self, spark: SparkSession, catalog: str = "glue_catalog"):
        self.spark = spark
        self.catalog = catalog

    def compact_table(self, table: str, strategy: str = "BIN_PACK") -> None:
        """Compact the given table using the specified strategy."""
        query = f"CALL {self.catalog}.system.rewrite_data_files(table => '{table}', strategy => '{strategy}')"
        self._execute_query(query)

    def expire_snapshots(self, table: str, snapshot_max_age: int, min_snapshots: int, max_concurrent_deletes: int) -> None:
        """Expire old snapshots of the given table."""
        delete_before = (datetime.now() - timedelta(days=snapshot_max_age)).isoformat()
        query = f"""CALL {self.catalog}.system.expire_snapshots(
                        table => '{table}',
                        older_than => TIMESTAMP '{delete_before}',
                        retain_last => {min_snapshots},
                        max_concurrent_deletes => {max_concurrent_deletes})"""
        self._execute_query(query)

    def remove_orphan_files(self, table: str, orphan_files_max_age: int, max_concurrent_deletes: int) -> None:
        """Remove orphan files from the given table."""
        delete_before = (datetime.now() - timedelta(days=orphan_files_max_age)).isoformat()
        query = f"""CALL {self.catalog}.system.remove_orphan_files(
                        table => '{table}',
                        older_than => TIMESTAMP '{delete_before}',
                        max_concurrent_deletes => {max_concurrent_deletes},
                        dry_run => false)"""
        self._execute_query(query)

    def _execute_query(self, query: str) -> None:
        """Execute the given SQL query."""
        logger.debug("Executing query: %s", query)
        if not self.spark:
            logger.exception("Spark session is None!")
        else:
            self.spark.sql(query).show(truncate=False, vertical=True)

    def run_optimization(self, tables: list[str], snapshot_max_age: int, min_snapshots: int, 
                         max_concurrent_deletes: int, orphan_files_max_age: int, compaction_strategy: str = "BIN_PACK") -> None:
        """Run the optimization process for the specified tables."""
        for table in tables:
            logger.info(f"Compacting {table} table")
            self.compact_table(table, compaction_strategy)

            logger.info(f"Expiring old snapshots of {table} table")
            self.expire_snapshots(table, snapshot_max_age, min_snapshots, max_concurrent_deletes)

            logger.info(f"Removing orphan files of {table} table")
            self.remove_orphan_files(table, orphan_files_max_age, max_concurrent_deletes)

Summary

Managing Apache Iceberg tables involves careful compaction and maintenance operations to optimize performance and storage efficiency. By following the strategies outlined in this document, you can effectively manage and optimize your Apache Iceberg tables, particularly when dealing with high-frequency data mutations in CDC workloads.