Apache Iceberg Compaction
Compact your Iceberg tables using Spark or AWS Athena
Introduction
Apache Iceberg introduces a powerful compaction feature, especially beneficial for Change Data Capture (CDC) workloads. This document outlines the key properties and commands necessary for effective Iceberg table management, focusing on compaction and maintenance operations, when:
Interfacing with Amazon Athena's abstraction layer over Iceberg's native capabilities.
Using Apache Spark.
Overview of Compaction in Apache Iceberg
Compaction in Apache Iceberg is crucial for optimizing data storage and retrieval, particularly in environments with high data mutation rates. This process involves rewriting data files to improve query performance and remove obsolete data associated with old snapshots. Effective tuning of Iceberg's properties is essential for achieving optimal results.
Using Amazon Athena for Iceberg Table Management
Amazon Athena provides an abstraction layer for managing Iceberg tables through two main functions: OPTIMIZE
and VACUUM
. These functions facilitate table compaction and the removal of outdated data files, streamlining Iceberg table management within Athena.
To compact a table and remove all of the data files related to old snapshots, this will be a sample flow:
// Compaction
OPTIMIZE my_db.my_table REWRITE DATA USING BIN_PACK;
// Removing old snapshots and files
ALTER TABLE my_db.my_table SET TBLPROPERTIES (
'vacuum_max_metadata_files_to_keep'='1'
)
VACUUM my_db.my_table;
// Validating if old snapshots are removed and only one `replace` operation
// remains in the snapshots
select * from "my_db"."my_table$snapshots";
select * from "my_db"."my_table$history";
Now let's investigate each part individually.
1. Compaction
To compact a table and optimize its layout, the OPTIMIZE
command is used as follows:
OPTIMIZE my_db.my_table REWRITE DATA USING BIN_PACK;
This command rewrites the data in the specified table using the BIN_PACK
strategy, creating a new snapshot that consolidates older files within the partition. Note that this retains old snapshots and data files.
2. Removing Old Snapshots and Files
To remove outdated snapshots and their associated data files, configure the vacuum_max_metadata_files_to_keep
property and use the VACUUM
command:
ALTER TABLE my_db.my_table SET TBLPROPERTIES (
'vacuum_max_metadata_files_to_keep'='1'
)
VACUUM my_db.my_table;
By setting vacuum_max_metadata_files_to_keep
to 1
, you instruct the system to retain only the most recent metadata file, thus removing unnecessary old data files from storage. This step is crucial for maintaining storage efficiency and cost-effectiveness.
vacuum_max_metadata_files_to_keep
property to 1, Iceberg only stores 1 recoverable snapshot. On a Production workload, make sure to choose the value based on your use case so you can recover to older snapshots if something happens.3. Validation
After compaction and vacuuming, validate the operations' success by querying the table's snapshots and history:
SELECT * FROM "my_db"."my_table$snapshots";
SELECT * FROM "my_db"."my_table$history";
These queries help ensure that old snapshots are removed as expected and only one replace operation (the recent compaction) remains recorded in the table's snapshots and history.
Using Apache Spark
In addition to Amazon Athena, Apache Spark can also be used to manage and optimize Iceberg tables.
In our case, we are using Apache Spark on Production to make all of our workflow unified and easy to track and maintain. To achieve so, we have created a IcebergOptimizer
Python class which we are running it on AWS Glue to optimize all of the tables stored in Glue Catalog:
import logging
from datetime import datetime, timedelta
from pyspark.sql import SparkSession
logger = logging.getLogger(__name__)
class IcebergOptimizer:
def __init__(self, spark: SparkSession, catalog: str = "glue_catalog"):
self.spark = spark
self.catalog = catalog
def compact_table(self, table: str, strategy: str = "BIN_PACK") -> None:
"""Compact the given table using the specified strategy."""
query = f"CALL {self.catalog}.system.rewrite_data_files(table => '{table}', strategy => '{strategy}')"
self._execute_query(query)
def expire_snapshots(self, table: str, snapshot_max_age: int, min_snapshots: int, max_concurrent_deletes: int) -> None:
"""Expire old snapshots of the given table."""
delete_before = (datetime.now() - timedelta(days=snapshot_max_age)).isoformat()
query = f"""CALL {self.catalog}.system.expire_snapshots(
table => '{table}',
older_than => TIMESTAMP '{delete_before}',
retain_last => {min_snapshots},
max_concurrent_deletes => {max_concurrent_deletes})"""
self._execute_query(query)
def remove_orphan_files(self, table: str, orphan_files_max_age: int, max_concurrent_deletes: int) -> None:
"""Remove orphan files from the given table."""
delete_before = (datetime.now() - timedelta(days=orphan_files_max_age)).isoformat()
query = f"""CALL {self.catalog}.system.remove_orphan_files(
table => '{table}',
older_than => TIMESTAMP '{delete_before}',
max_concurrent_deletes => {max_concurrent_deletes},
dry_run => false)"""
self._execute_query(query)
def _execute_query(self, query: str) -> None:
"""Execute the given SQL query."""
logger.debug("Executing query: %s", query)
if not self.spark:
logger.exception("Spark session is None!")
else:
self.spark.sql(query).show(truncate=False, vertical=True)
def run_optimization(self, tables: list[str], snapshot_max_age: int, min_snapshots: int,
max_concurrent_deletes: int, orphan_files_max_age: int, compaction_strategy: str = "BIN_PACK") -> None:
"""Run the optimization process for the specified tables."""
for table in tables:
logger.info(f"Compacting {table} table")
self.compact_table(table, compaction_strategy)
logger.info(f"Expiring old snapshots of {table} table")
self.expire_snapshots(table, snapshot_max_age, min_snapshots, max_concurrent_deletes)
logger.info(f"Removing orphan files of {table} table")
self.remove_orphan_files(table, orphan_files_max_age, max_concurrent_deletes)
Summary
Managing Apache Iceberg tables involves careful compaction and maintenance operations to optimize performance and storage efficiency. By following the strategies outlined in this document, you can effectively manage and optimize your Apache Iceberg tables, particularly when dealing with high-frequency data mutations in CDC workloads.