Compaction Benchmarks
The following benchmark evaluates performance, environment configuration, and operational considerations for compacting Apache Iceberg tables using Apache Spark rewrite_data_files and OLake Fusion compaction on a CDC-like TPCH workload.
Benchmark Environment
- Dataset: This benchmark uses TPC-H 300 GB data generated with TPC-H
dbgen. - Total rows: At TPC-H 300 GB scale, the
lineitemtable contains approximately 1.8 billion rows. - The average row size for
lineitemis 120 bytes. - Destination data size: The Iceberg dataset size on Google Cloud Storage after ingestion was approximately 85 GB.
- Source database instance: Azure
Standard_D8ads_v5(8 vCores, 32 GiB memory). - OLake ingestion compute: Azure
Standard D64ls v5(64 vCPUs, 128 GiB memory). - TPCH query execution resources:
Master: GCPc4a-standard-8(8 vCPU, 32 GB RAM)
Workers: 2 x GCPc4a-standard-32(32 vCPU, 128 GB RAM each). - Fusion and Spark compaction resources:
Master: GCPc4a-standard-8(8 vCPU, 32 GB RAM)
Workers: 2 x GCPc4a-highmem-16(16 vCPU, 128 GB RAM each). - Compaction runtime configuration (used for both Spark and Fusion):
Setting Value spark-conf.spark.executor.instances4 spark-conf.spark.executor.cores7 spark-conf.spark.executor.memory45g spark-conf.spark.executor.memoryOverhead12g spark-conf.spark.driver.memory18g spark-conf.spark.driver.memoryOverhead10g
Compaction Results
1. Speed Comparison
This compaction comparison evaluates end-to-end compaction runtime for Apache Spark and OLake Fusion on the same Iceberg workload, with average TPC-H Query 6 runtime kept comparable across both setups to ensure a fair baseline for total time and relative execution speed.
| Engine | Total Compaction Time | Relative to Fusion |
|---|---|---|
| OLake Fusion | 27 mins 2 secs | - |
| Apache Spark | 55 mins 47 secs | 2.06 x slower |
Key takeaway: OLake Fusion delivered about 2.06x faster compaction than Apache Spark.
2. Cost Comparison
| Instance type | Cost per hour (USD) |
|---|---|
c4a-standard-8 | $0.38 |
c4a-highmem-16 | $0.99 |
Using the compaction infrastructure and total compaction durations from this benchmark:
| Engine | Resources included for compaction cost | Hourly infrastructure cost | Total time | Total compaction cost |
|---|---|---|---|---|
| Fusion | c4a-standard-82 x c4a-highmem-16 | $2.36/hour | 27 mins 02 secs | $1.06 |
| Spark | c4a-standard-82 x c4a-highmem-16 | $2.36/hour | 55 mins 47 secs | $2.19 |
OLake Fusion is open-source and can be deployed on Docker or Kubernetes; you pay only for the compute and storage you provision.
Benchmark Workflow
-
Data preparation: Generated TPCH data at 300 GB scale, loaded it into Azure PostgreSQL server, and ingested only the
lineitemtable into a GCP Iceberg bucket using OLake (full load). -
CDC-like change generation: Used a Python script to run periodic
UPDATEstatements onlineitem, changingl_commentfor approximately 200,000 random rows every 2 minutes to simulate continuous CDC pressure.View CDC simulation script
import psycopg2
import time
HOST = "<HOSTNAME>"
PORT = 5432
USER = "<USERNAME>"
PASSWORD = "<PASSWORD>"
DATABASE = "<DATABASE>"
SCHEMA = "tpch"
TABLE = "lineitem"
RUNS = 120 # total number of runs
INTERVAL_SECONDS = 120 # 2 min between scheduled run starts (see loop below)
TABLESAMPLE_PCT = 0.0112 # ~200k rows out of 1.8B
print(f"Connecting to {HOST}/{DATABASE} ...")
conn = None
cursor = None
try:
conn = psycopg2.connect(
host=HOST,
port=PORT,
user=USER,
password=PASSWORD,
dbname=DATABASE,
sslmode="require",
)
conn.autocommit = False
cursor = conn.cursor()
print("Connected successfully.")
print(f"Total runs : {RUNS}")
print(f"Interval : every {INTERVAL_SECONDS}s")
print(f"Rows per run : ~200k (TABLESAMPLE SYSTEM {TABLESAMPLE_PCT}%)")
update_sql = f"""
UPDATE {SCHEMA}.{TABLE} t
SET l_comment = substring(l_comment, 1, char_length(l_comment) - 1)
|| chr(65 + (random() * 25)::int)
FROM (
SELECT ctid
FROM {SCHEMA}.{TABLE} TABLESAMPLE SYSTEM ({TABLESAMPLE_PCT})
) s
WHERE t.ctid = s.ctid
"""
next_run_monotonic = time.monotonic()
for run_idx in range(RUNS):
# First run starts immediately; subsequent runs are anchored to the
# original start time so interval drift never accumulates.
if run_idx > 0:
sleep_seconds = next_run_monotonic - time.monotonic()
if sleep_seconds > 0:
print(f" Sleeping {round(sleep_seconds, 1)}s until next run ...")
time.sleep(sleep_seconds)
next_run_monotonic += INTERVAL_SECONDS
print(f"\n[{run_idx + 1}/{RUNS}] Running UPDATE on {SCHEMA}.{TABLE} - ~200k random rows ...")
start = time.time()
cursor.execute(update_sql)
rows_affected = cursor.rowcount
conn.commit()
elapsed = round(time.time() - start, 2)
print(f"[{run_idx + 1}/{RUNS}] Done. Rows updated: {rows_affected} | Time taken: {elapsed}s")
except Exception as e:
print(f"ERROR: {e}")
if conn:
conn.rollback()
print("Transaction rolled back.")
raise
finally:
if cursor:
cursor.close()
if conn:
conn.close()
print("\nConnection closed.") -
Analytical query execution: After the first 2 update cycles, we started running TPC-H Query 6 on the
lineitemIceberg table; the query was executed continuously without any pause between consecutive runs.View TPCH Query script
#!/usr/bin/env python3
import logging
import time
import datetime
from pyspark.sql import SparkSession
# Logging Setup
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("tpch-benchmark")
CATALOG = "benchmark"
# Iceberg namespace in Lakekeeper (REST catalog); must hold `lineitem`.
DATABASE = "spark_compact"
# Lakekeeper REST catalog warehouse id (matches spark.sql.catalog.benchmark.warehouse).
LAKEKEEPER_WAREHOUSE = "benchmarking"
LAKEKEEPER_URI = "http://10.20.0.64:30081/catalog"
# Run Config
TOTAL_RUNS = 200
BREAK_SECONDS = 0 # no pause between runs
# TPC-H Q6 - Forecasting Revenue Change
# Single table scan on lineitem only. No joins, no GROUP BY.
# Pure sequential scan - best query to measure compaction impact.
TPCH_Q6 = f"""
SELECT
SUM(l_extendedprice * l_discount) AS revenue
FROM
{CATALOG}.{DATABASE}.lineitem
WHERE
l_shipdate >= DATE '1994-01-01'
AND l_shipdate < DATE '1995-01-01'
AND l_discount BETWEEN 0.05 AND 0.07
AND l_quantity < 24
"""
# SparkSession
log.info("Initializing SparkSession...")
spark = (
SparkSession.builder
.appName("tpch-q6-benchmark")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.benchmark",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.benchmark.catalog-impl",
"org.apache.iceberg.rest.RESTCatalog")
.config("spark.sql.catalog.benchmark.uri",
LAKEKEEPER_URI)
.config("spark.sql.catalog.benchmark.warehouse",
LAKEKEEPER_WAREHOUSE)
.config("spark.sql.catalog.benchmark.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.benchmark.s3.endpoint",
"https://storage.googleapis.com/")
.config("spark.sql.catalog.benchmark.s3.path-style-access", "true")
.config("spark.sql.catalog.benchmark.client.region", "ap-south-1")
.config("spark.hadoop.fs.s3a.endpoint", "https://storage.googleapis.com")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.endpoint.region", "ap-south-1")
.config("spark.sql.shuffle.partitions", "128")
.config("spark.sql.defaultCatalog", "benchmark")
.getOrCreate()
)
spark.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
spark.sparkContext.setLogLevel("INFO")
log.info("SparkSession initialized successfully.")
log.info(f" Spark version : {spark.version}")
log.info(f" App name : {spark.sparkContext.appName}")
log.info(f" Master : {spark.sparkContext.master}")
log.info(f" Default parallelism : {spark.sparkContext.defaultParallelism}")
log.info(f" Lakekeeper URI : {LAKEKEEPER_URI}")
log.info(f" Catalog warehouse : {LAKEKEEPER_WAREHOUSE}")
log.info(f" Namespace (db) : {DATABASE}")
log.info(f" Total Q6 runs : {TOTAL_RUNS}")
log.info(f" Break between runs : {BREAK_SECONDS}s")
run_results = [] # {run, started_at, finished_at, elapsed_s, revenue, status}
# BENCHMARK LOOP - Q6 x TOTAL_RUNS, no pause between runs
for run_number in range(1, TOTAL_RUNS + 1):
log.info("")
log.info("#" * 70)
log.info(f"#{'':^68}#")
log.info(f"#{'RUN ' + str(run_number) + ' OF ' + str(TOTAL_RUNS):^68}#")
log.info(f"#{'':^68}#")
log.info("#" * 70)
log.info("")
log.info("=" * 70)
log.info(" TPC-H Q6 - Forecasting Revenue Change")
log.info(f" Table : {CATALOG}.{DATABASE}.lineitem")
log.info(" Type : Single table scan | No joins | No GROUP BY")
log.info("=" * 70)
run_start_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.info(f" Query started at : {run_start_ts}")
q_start = time.time()
try:
result = spark.sql(TPCH_Q6)
rows = result.collect()
q_elapsed = round(time.time() - q_start, 2)
run_end_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
revenue_val = rows[0]["revenue"] if rows else None
log.info(f" Query finished at : {run_end_ts}")
log.info(f" Query time : {q_elapsed}s ({round(q_elapsed / 60, 2)} min)")
log.info("")
log.info(" Result:")
log.info(f" revenue = {revenue_val}")
run_results.append({
"run": run_number,
"started_at": run_start_ts,
"finished_at": run_end_ts,
"elapsed_s": q_elapsed,
"revenue": revenue_val,
"status": "OK",
})
except Exception as e:
q_elapsed = round(time.time() - q_start, 2)
run_end_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.error("=" * 70)
log.error(f" Q6 FAILED after {q_elapsed}s")
log.error(f" Error: {e}")
log.error("=" * 70)
run_results.append({
"run": run_number,
"started_at": run_start_ts,
"finished_at": run_end_ts,
"elapsed_s": q_elapsed,
"revenue": None,
"status": f"FAILED: {e}",
})
log.info("")
log.info("-" * 70)
log.info(f" END OF RUN {run_number} OF {TOTAL_RUNS}")
log.info(f" Elapsed : {q_elapsed}s ({round(q_elapsed / 60, 2)} min)")
remaining = TOTAL_RUNS - run_number
if remaining > 0:
log.info(f" Remaining runs : {remaining}")
else:
log.info(" All runs completed.")
log.info("-" * 70)
if remaining > 0 and BREAK_SECONDS > 0:
log.info(f" Cooling down for {BREAK_SECONDS}s before next run...")
time.sleep(BREAK_SECONDS)
# FINAL SUMMARY
log.info("")
log.info("=" * 70)
log.info(" BENCHMARK COMPLETE - ALL RUNS SUMMARY")
log.info("=" * 70)
log.info(f" {'RUN':<6} {'STARTED AT':<22} {'ELAPSED (s)':<14} {'STATUS'}")
log.info(f" {'-'*4:<6} {'-'*19:<22} {'-'*11:<14} {'-'*10}")
ok_times = []
for r in run_results:
elapsed_str = str(r["elapsed_s"]) if r["status"] == "OK" else "FAILED"
log.info(f" {r['run']:<6} {r['started_at']:<22} {elapsed_str:<14} {r['status']}")
if r["status"] == "OK":
ok_times.append(r["elapsed_s"])
log.info("")
if ok_times:
log.info(f" Successful runs : {len(ok_times)} / {TOTAL_RUNS}")
log.info(f" Min query time : {min(ok_times)}s")
log.info(f" Max query time : {max(ok_times)}s")
log.info(f" Avg query time : {round(sum(ok_times) / len(ok_times), 2)}s")
log.info(f" Total query time : {round(sum(ok_times), 2)}s "
f"({round(sum(ok_times) / 60, 2)} min)")
else:
log.error(" No successful runs to summarize.")
log.info("=" * 70)
log.info("")
log.info("All done. Stopping SparkSession.")
spark.stop()View spark-submit command for TPCH Query execution
gcloud dataproc jobs submit pyspark gs://dz-benchmark/tpch_dataproc.py \
--cluster=dz-olake-tpch-21042026 \
--region=asia-south1 \
--properties="^#^spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.2,org.apache.iceberg:iceberg-aws-bundle:1.7.2#spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions#spark.sql.catalog.benchmark=org.apache.iceberg.spark.SparkCatalog#spark.sql.catalog.benchmark.catalog-impl=org.apache.iceberg.rest.RESTCatalog#spark.sql.catalog.benchmark.uri=http://10.20.0.64:30081/catalog#spark.sql.catalog.benchmark.warehouse=benchmarking#spark.sql.catalog.benchmark.io-impl=org.apache.iceberg.aws.s3.S3FileIO#spark.sql.catalog.benchmark.s3.endpoint=https://storage.googleapis.com/#spark.sql.catalog.benchmark.s3.path-style-access=true#spark.sql.catalog.benchmark.client.region=ap-south-1#spark.sql.defaultCatalog=benchmark#spark.hadoop.fs.s3a.endpoint=https://storage.googleapis.com#spark.hadoop.fs.s3a.path.style.access=true#spark.hadoop.fs.s3a.endpoint.region=ap-south-1#spark.dynamicAllocation.enabled=false#spark.executor.instances=12#spark.executor.cores=5#spark.executor.memory=15g#spark.executor.memoryOverhead=4g#spark.driver.memory=18g#spark.driver.memoryOverhead=10g#spark.sql.parquet.enableVectorizedReader=false#spark.sql.iceberg.vectorization.enabled=false#spark.hadoop.io.native.lib.available=false#spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1ReservePercent=20 -XX:UseAVX=0 -Darrow.enable_unsafe_memory_access=false -Darrow.enable_null_check_for_get=true -Dhadoop.io.native.lib.available=false#spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -Xss8m -XX:UseAVX=0 -Darrow.enable_unsafe_memory_access=false -Darrow.enable_null_check_for_get=true" -
Parallel compaction phase: After completing 5 ingestion cycles, we ran compaction jobs for a total of 2 hours while both CDC-style updates and repeated TPC-H Query 6 executions continued in parallel, specifically to measure how concurrent compaction impacts query latency and runtime stability.
Apache Spark
Spark compaction was executed using Apache Iceberg's rewrite_data_files procedure through a scheduled Spark job.
The compaction job was scheduled with each run triggered at a fixed interval of 20 minutes, allowing compaction to operate alongside ongoing ingestion activity and repeated analytical query execution.
The Spark rewrite_data_files job is configured with the following compaction parameters:
strategy: Binpack (Rewrites many small files into fewer, size-balanced files to improve scan efficiency)target-file-size-bytes: 512 MB (Preferred output file size for rewritten data files)max-file-size-bytes: 614.4 MB (Upper size limit allowed for generated output files)min-file-size-bytes: 384 MB (Lower size threshold used to select smaller files for rewrite)max-concurrent-file-group-rewrites: 28 (Maximum number of file groups rewritten in parallel in one run)partial-progress.enabled: False (Requires a full rewrite attempt instead of committing partial-progress batches)delete-file-threshold: 1 (Triggers rewrite when at least one delete file is associated with a file group)
View Spark compaction script (Python)
#!/usr/bin/env python3
import logging
import time
import datetime
from pyspark.sql import SparkSession
# ─── Logging Setup ─────────────────────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger("iceberg-compaction")
CATALOG = "benchmark"
DATABASE = "spark_compact"
TABLES = [
"lineitem",
]
# ─── Scheduler Config ──────────────────────────────────────────────────────────
TOTAL_RUNS = 20
INTERVAL_MINUTES = 20
INTERVAL_SECONDS = INTERVAL_MINUTES * 60
# ─── TPC-H Q6 — Forecasting Revenue Change ─────────────────────────────────────
def build_tpch_q6(catalog, db):
return {
"Q6 - Forecasting Revenue Change": f"""
SELECT
SUM(l_extendedprice * l_discount) AS revenue
FROM
{catalog}.{db}.lineitem
WHERE
l_shipdate >= date '1994-01-01'
AND l_shipdate < date '1995-01-01'
AND l_discount BETWEEN 0.05 AND 0.07
AND l_quantity < 24
"""
}
# ─── Helper: Per-table file stats (optionally pinned to a snapshot) ─────────────
def get_table_file_stats(spark, catalog, table_full, snapshot_id=None):
"""
content = 0 -> data files
content = 2 -> equality delete files
snapshot_id -> if provided, reads file state at that exact snapshot (time travel)
if None, reads current state
"""
version_clause = f"VERSION AS OF {snapshot_id}" if snapshot_id else ""
try:
row = spark.sql(
f"SELECT COUNT(*) AS file_count, SUM(file_size_in_bytes) AS total_bytes "
f"FROM {catalog}.{table_full}.files {version_clause} WHERE content = 0"
).collect()[0]
data_count = row["file_count"]
data_mb = round(row["total_bytes"] / (1024 * 1024), 2) if row["total_bytes"] else 0
except Exception as e:
log.warning(f" Could not fetch data file stats: {e}")
data_count, data_mb = "N/A", "N/A"
try:
eq_count = spark.sql(
f"SELECT COUNT(*) AS file_count "
f"FROM {catalog}.{table_full}.files {version_clause} WHERE content = 2"
).collect()[0]["file_count"]
except Exception as e:
log.warning(f" Could not fetch equality delete file stats: {e}")
eq_count = "N/A"
return data_count, data_mb, eq_count
# ─── Helper: Find the snapshot ID that compaction created ───────────────────────
def get_compaction_snapshot(spark, catalog, table_full, compaction_started_at):
"""
After rewrite_data_files completes, find the snapshot with operation='replace'
that was committed after compaction started. This is the exact compaction snapshot.
Returns (snapshot_id, committed_at) or (None, None) if nothing was rewritten.
"""
try:
rows = spark.sql(
f"SELECT snapshot_id, committed_at "
f"FROM {catalog}.{table_full}.snapshots "
f"WHERE operation = 'replace' "
f"AND committed_at >= TIMESTAMP '{compaction_started_at}' "
f"ORDER BY committed_at ASC "
f"LIMIT 1"
).collect()
if rows:
snap_id = rows[0]["snapshot_id"]
snap_ts = str(rows[0]["committed_at"])
log.info(f" Compaction snapshot found : id={snap_id} | committed_at={snap_ts}")
return snap_id, snap_ts
else:
log.warning(f" No replace snapshot found after {compaction_started_at} — "
f"compaction may have found nothing to rewrite.")
return None, None
except Exception as e:
log.warning(f" Could not find compaction snapshot: {e}")
return None, None
# ─── Helper: Count OLake ingestion appends during compaction window ─────────────
def count_ingestions_during_compaction(spark, catalog, table_full,
pre_committed_at, compaction_committed_at):
"""
Counts append snapshots committed strictly AFTER pre_committed_at and
up to AND INCLUDING compaction_committed_at.
Each OLake ingestion commit adds exactly: 1 data file + 1 equality-delete file.
We subtract this count from the raw POST snapshot counts to isolate the
pure compaction effect (no ingestion noise).
Returns 0 safely if either timestamp is None.
"""
if not pre_committed_at or not compaction_committed_at:
log.warning(" Skipping ingestion count — missing pre or compaction timestamp.")
return 0
try:
row = spark.sql(
f"SELECT COUNT(*) AS cnt "
f"FROM {catalog}.{table_full}.snapshots "
f"WHERE operation IN ('append', 'overwrite') "
f"AND committed_at > TIMESTAMP '{pre_committed_at}' "
f"AND committed_at <= TIMESTAMP '{compaction_committed_at}'"
).collect()[0]
cnt = row["cnt"]
log.info(f" OLake ingestion commits during compaction window : {cnt} "
f"(each = +1 data file, +1 eq-delete file)")
return cnt
except Exception as e:
log.warning(f" Could not count ingestion snapshots during compaction: {e}")
return 0
# ─── Helper: Run TPC-H Q6 and return elapsed time ───────────────────────────────
def run_tpch_q6(spark, label, tpch_queries):
log.info("=" * 70)
log.info(f" TPC-H Query Benchmark [{label}]")
log.info("=" * 70)
results = {}
for qname, qsql in tpch_queries.items():
try:
q_start = time.time()
spark.sql(qsql).collect()
q_elapsed = round(time.time() - q_start, 2)
log.info(f" {qname:<40} : {q_elapsed}s ({round(q_elapsed / 60, 2)} min)")
results[qname] = q_elapsed
except Exception as e:
log.warning(f" {qname:<40} : FAILED — {e}")
results[qname] = "FAILED"
log.info("=" * 70)
return results
# ─── Core compaction logic for a single run ─────────────────────────────────────
def run_compaction(spark, run_number, tpch_queries):
overall_start_time = time.time()
overall_start_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.info("=" * 70)
log.info(f" OVERALL COMPACTION RUN STARTED AT : {overall_start_ts}")
log.info(f" Tables to compact : {TABLES}")
log.info("=" * 70)
for table_name in TABLES:
TABLE = f"{DATABASE}.{table_name}"
FULL_TABLE = f"{CATALOG}.{TABLE}"
log.info("")
log.info("=" * 70)
log.info(f" TABLE: {FULL_TABLE}")
log.info("=" * 70)
# Last 5 snapshots — pre
try:
snaps = spark.sql(
f"SELECT snapshot_id, committed_at, operation "
f"FROM {CATALOG}.{TABLE}.snapshots "
f"ORDER BY committed_at DESC LIMIT 5"
).collect()
log.info(" [PRE] Last 5 snapshots:")
for row in snaps:
log.info(f" snapshot_id={row['snapshot_id']} | "
f"committed_at={row['committed_at']} | "
f"operation={row['operation']}")
except Exception as e:
log.warning(f" Could not fetch pre-compaction snapshots: {e}")
# File stats — pre
data_cnt, data_mb, eq_del_cnt = get_table_file_stats(spark, CATALOG, TABLE)
log.info(f" [PRE] Data files (content=0) : {data_cnt} files | {data_mb} MB")
log.info(f" [PRE] Eq-delete files (content=2) : {eq_del_cnt} files")
# Capture PRE boundary timestamp inline — same snapshot the PRE stats just read from.
# Used as the left edge of the ingestion-counting window.
pre_committed_at = None
try:
pre_row = spark.sql(
f"SELECT committed_at FROM {CATALOG}.{TABLE}.snapshots "
f"ORDER BY committed_at DESC LIMIT 1"
).collect()
pre_committed_at = str(pre_row[0]["committed_at"]) if pre_row else None
log.info(f" [PRE] Snapshot boundary : {pre_committed_at}")
except Exception as e:
log.warning(f" Could not fetch PRE snapshot boundary: {e}")
# Disable vectorization at table level
try:
spark.sql(
f"ALTER TABLE {FULL_TABLE} "
f"SET TBLPROPERTIES ('read.parquet.vectorization.enabled' = 'false')"
)
log.info(f" Disabled vectorization for {table_name}.")
except Exception as e:
log.warning(f" Could not set vectorization property: {e}")
# Compaction config banner
log.info(" Compaction config:")
log.info(" strategy : binpack")
log.info(" target-file-size-bytes : 536870912 (512 MB)")
log.info(" max-file-size-bytes : 644245094 (614.4 MB)")
log.info(" min-file-size-bytes : 402653184 (384 MB)")
log.info(" max-concurrent-file-group-rewrites: 28")
log.info(" partial-progress.enabled : false")
log.info(" delete-file-threshold : 1")
table_start_time = time.time()
table_start_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.info(f" Compaction started at : {table_start_ts}")
try:
result = spark.sql(
f"""
CALL {CATALOG}.system.rewrite_data_files(
table => '{TABLE}',
strategy => 'binpack',
options => map(
'target-file-size-bytes', '536870912',
'max-file-size-bytes', '644245094',
'min-file-size-bytes', '402653184',
'max-concurrent-file-group-rewrites', '28',
'partial-progress.enabled', 'false',
'delete-file-threshold', '1'
)
)
"""
)
rows = result.collect()
table_elapsed = round(time.time() - table_start_time, 2)
table_end_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.info(f" Compaction finished at : {table_end_ts}")
log.info(f" Compaction time for this table: {table_elapsed}s "
f"({round(table_elapsed / 60, 2)} min)")
for row in rows:
rewritten_files = row["rewritten_data_files_count"]
added_files = row["added_data_files_count"]
rewritten_bytes = (
row["rewritten_bytes_count"]
if "rewritten_bytes_count" in row.__fields__
else "N/A"
)
rewritten_mb = (
round(rewritten_bytes / (1024 * 1024), 2)
if isinstance(rewritten_bytes, int)
else "N/A"
)
log.info(" Compaction Result:")
log.info(f" Files rewritten (input) : {rewritten_files}")
log.info(f" Files added (output) : {added_files}")
log.info(f" Total bytes rewritten : {rewritten_bytes} ({rewritten_mb} MB)")
except Exception as e:
elapsed = round(time.time() - table_start_time, 2)
log.error(f" Compaction FAILED for {table_name} after {elapsed}s")
log.error(f" Error: {e}")
continue
# ── Step 1: Find the exact compaction snapshot (operation=replace) ────────
compaction_snapshot_id, compaction_committed_at = get_compaction_snapshot(
spark, CATALOG, TABLE, table_start_ts
)
# ── Step 2: Count OLake ingestions that committed DURING compaction ───────
# Window: strictly after PRE snapshot → up to and including compaction snapshot
# Each ingestion = +1 data file, +1 eq-delete file (OLake guarantee)
ingestions_during_compaction = count_ingestions_during_compaction(
spark, CATALOG, TABLE, pre_committed_at, compaction_committed_at
)
# ── Step 3: Read POST file counts pinned to the compaction snapshot ───────
data_cnt_p, data_mb_p, eq_del_cnt_p = get_table_file_stats(
spark, CATALOG, TABLE, snapshot_id=compaction_snapshot_id
)
# ── Step 4: Subtract ingestion noise → pure compaction POST counts ────────
# The compaction snapshot inherits ingested files via Iceberg's linear chain.
# Subtracting ingestions_during_compaction isolates the compaction-only effect.
true_data_cnt_p = (
data_cnt_p - ingestions_during_compaction
if isinstance(data_cnt_p, int)
else data_cnt_p
)
true_eq_del_cnt_p = (
eq_del_cnt_p - ingestions_during_compaction
if isinstance(eq_del_cnt_p, int)
else eq_del_cnt_p
)
if compaction_snapshot_id:
log.info(f" [POST] File stats pinned to compaction snapshot {compaction_snapshot_id}:")
else:
log.info(f" [POST] File stats (no replace snapshot found — showing current state):")
log.info(f" [POST] Data files (content=0) at snapshot : {data_cnt_p} files")
log.info(f" [POST] Eq-delete (content=2) at snapshot : {eq_del_cnt_p} files")
log.info(f" [POST] Minus ingestion files during window : -{ingestions_during_compaction} (data), -{ingestions_during_compaction} (eq-delete)")
log.info(f" [POST] True post-compaction data files : {true_data_cnt_p} files | {data_mb_p} MB")
log.info(f" [POST] True post-compaction eq-delete files : {true_eq_del_cnt_p} files")
# Delta — pure compaction effect only
if isinstance(data_cnt, int) and isinstance(true_data_cnt_p, int):
log.info(f" [DELTA] Data files : {data_cnt} → {true_data_cnt_p} "
f"(change: {true_data_cnt_p - data_cnt:+d})")
if isinstance(eq_del_cnt, int) and isinstance(true_eq_del_cnt_p, int):
log.info(f" [DELTA] Eq-del files : {eq_del_cnt} → {true_eq_del_cnt_p} "
f"(change: {true_eq_del_cnt_p - eq_del_cnt:+d})")
# Last 3 snapshots — post
try:
snaps_after = spark.sql(
f"SELECT snapshot_id, committed_at, operation "
f"FROM {CATALOG}.{TABLE}.snapshots "
f"ORDER BY committed_at DESC LIMIT 3"
).collect()
log.info(" [POST] Latest snapshots:")
for row in snaps_after:
log.info(f" snapshot_id={row['snapshot_id']} | "
f"committed_at={row['committed_at']} | "
f"operation={row['operation']}")
except Exception as e:
log.warning(f" Could not fetch post-compaction snapshots: {e}")
# ─── Per-run Summary ───────────────────────────────────────────────────────
overall_elapsed = round(time.time() - overall_start_time, 2)
overall_end_ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log.info("")
log.info("=" * 70)
log.info(f" RUN {run_number} — COMPACTION SUMMARY")
log.info("=" * 70)
log.info(f" Started at : {overall_start_ts}")
log.info(f" Ended at : {overall_end_ts}")
log.info(f" Total time : {overall_elapsed}s ({round(overall_elapsed / 60, 2)} min)")
log.info("=" * 70)
# ══════════════════════════════════════════════════════════════════════════════
# SPARK SESSION
# ══════════════════════════════════════════════════════════════════════════════
log.info("Initializing SparkSession...")
spark = (
SparkSession.builder
.appName("iceberg-compaction-tpch-spark_compact")
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.benchmark",
"org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.benchmark.catalog-impl",
"org.apache.iceberg.rest.RESTCatalog")
.config("spark.sql.catalog.benchmark.uri",
"http://10.20.0.64:30081/catalog")
.config("spark.sql.catalog.benchmark.warehouse",
"benchmarking")
.config("spark.sql.catalog.benchmark.io-impl",
"org.apache.iceberg.aws.s3.S3FileIO")
.config("spark.sql.catalog.benchmark.s3.endpoint",
"https://storage.googleapis.com/")
.config("spark.sql.catalog.benchmark.s3.path-style-access", "true")
.config("spark.sql.catalog.benchmark.client.region", "ap-south-1")
.config("spark.hadoop.fs.s3a.endpoint", "https://storage.googleapis.com")
.config("spark.hadoop.fs.s3a.path.style.access", "true")
.config("spark.hadoop.fs.s3a.endpoint.region", "ap-south-1")
.config("spark.sql.shuffle.partitions", "128")
.config("spark.sql.defaultCatalog", "benchmark")
.getOrCreate()
)
spark.conf.set("spark.sql.iceberg.vectorization.enabled", "false")
spark.sparkContext.setLogLevel("INFO")
log.info("SparkSession initialized successfully.")
log.info(f" Spark version : {spark.version}")
log.info(f" App name : {spark.sparkContext.appName}")
log.info(f" Master : {spark.sparkContext.master}")
log.info(f" Default parallelism : {spark.sparkContext.defaultParallelism}")
TPCH_QUERIES = build_tpch_q6(CATALOG, DATABASE)
# ══════════════════════════════════════════════════════════════════════════════
# SCHEDULED LOOP
# ══════════════════════════════════════════════════════════════════════════════
schedule_anchor = time.time()
for run_number in range(1, TOTAL_RUNS + 1):
scheduled_start = schedule_anchor + (run_number - 1) * INTERVAL_SECONDS
now = time.time()
wait_seconds = scheduled_start - now
if wait_seconds > 0:
next_run_ts = datetime.datetime.fromtimestamp(scheduled_start).strftime("%Y-%m-%d %H:%M:%S")
log.info("")
log.info("~" * 70)
log.info(f" Waiting {round(wait_seconds, 1)}s until next scheduled run at {next_run_ts} ...")
log.info("~" * 70)
time.sleep(wait_seconds)
log.info("")
log.info("#" * 70)
log.info(f"#{'':^68}#")
log.info(f"#{'RUN ' + str(run_number) + ' OF ' + str(TOTAL_RUNS):^68}#")
log.info(f"#{'':^68}#")
log.info("#" * 70)
run_compaction(spark, run_number, TPCH_QUERIES)
log.info("")
log.info("-" * 70)
log.info(f" END OF RUN {run_number} OF {TOTAL_RUNS}")
log.info("-" * 70)
remaining = TOTAL_RUNS - run_number
if remaining > 0:
log.info(f" Remaining runs : {remaining}")
for future_run in range(run_number + 1, TOTAL_RUNS + 1):
future_ts = datetime.datetime.fromtimestamp(
schedule_anchor + (future_run - 1) * INTERVAL_SECONDS
).strftime("%Y-%m-%d %H:%M:%S")
log.info(f" Run {future_run} scheduled at : {future_ts}")
else:
log.info(" All scheduled runs completed. No further runs.")
log.info("-" * 70)
# ══════════════════════════════════════════════════════════════════════════════
# ALL RUNS DONE
# ══════════════════════════════════════════════════════════════════════════════
log.info("")
log.info("=" * 70)
log.info(" ALL COMPACTION RUNS FINISHED")
log.info(f" Total runs executed : {TOTAL_RUNS}")
log.info(f" Interval : every {INTERVAL_MINUTES} minute(s)")
log.info("=" * 70)
log.info("")
log.info("All done. Stopping SparkSession.")
spark.stop()
View spark-submit command for Spark compaction
gcloud dataproc jobs submit pyspark gs://dz-benchmark/dataproc_compaction.py \
--cluster=dz-olake-compaction-21042026 \
--region=asia-south1 \
--properties="^#^spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.7.2,org.apache.iceberg:iceberg-aws-bundle:1.7.2#spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions#spark.sql.catalog.benchmark=org.apache.iceberg.spark.SparkCatalog#spark.sql.catalog.benchmark.catalog-impl=org.apache.iceberg.rest.RESTCatalog#spark.sql.catalog.benchmark.uri=http://10.20.0.64:30081/catalog#spark.sql.catalog.benchmark.warehouse=benchmarking#spark.sql.catalog.benchmark.io-impl=org.apache.iceberg.aws.s3.S3FileIO#spark.sql.catalog.benchmark.s3.endpoint=https://storage.googleapis.com/#spark.sql.catalog.benchmark.s3.path-style-access=true#spark.sql.catalog.benchmark.client.region=ap-south-1#spark.sql.defaultCatalog=benchmark#spark.hadoop.fs.s3a.endpoint=https://storage.googleapis.com#spark.hadoop.fs.s3a.path.style.access=true#spark.hadoop.fs.s3a.endpoint.region=ap-south-1#spark.dynamicAllocation.enabled=false#spark.executor.instances=4#spark.executor.cores=7#spark.executor.memory=45g#spark.executor.memoryOverhead=12g#spark.driver.memory=18g#spark.driver.memoryOverhead=10g#spark.sql.parquet.enableVectorizedReader=false#spark.sql.iceberg.vectorization.enabled=false#spark.hadoop.io.native.lib.available=false#spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:G1ReservePercent=20 -XX:UseAVX=0 -Darrow.enable_unsafe_memory_access=false -Darrow.enable_null_check_for_get=true -Dhadoop.io.native.lib.available=false#spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -Xss8m -XX:UseAVX=0 -Darrow.enable_unsafe_memory_access=false -Darrow.enable_null_check_for_get=true"
OLake Fusion
The compaction was executed using Fusion's scheduled compaction flow across three compaction levels.
The compaction job was configured with two trigger tiers to balance frequent incremental cleanup with periodic deeper compaction:
Lite: every 20 minutesMedium: every 40 minutes
In this benchmark, the destination dataset size was below 100 GB, so running Full compaction was not necessary. It is more useful as a periodic deep-clean step for much larger datasets (for instance, terabyte-scale tables) where long-term small-file buildup is higher.
The Fusion compaction setup is configured with the following parameters:
target-size: 512 MB (Target file size after compaction)
Dataset and Table Schemas
TPCH lineitem table
CREATE TABLE lineitem (
l_orderkey BIGINT NOT NULL,
l_partkey BIGINT NOT NULL,
l_suppkey BIGINT NOT NULL,
l_linenumber INTEGER NOT NULL,
l_quantity DECIMAL(15,2) NOT NULL,
l_extendedprice DECIMAL(15,2) NOT NULL,
l_discount DECIMAL(15,2) NOT NULL,
l_tax DECIMAL(15,2) NOT NULL,
l_returnflag CHAR(1) NOT NULL,
l_linestatus CHAR(1) NOT NULL,
l_shipdate DATE NOT NULL,
l_commitdate DATE NOT NULL,
l_receiptdate DATE NOT NULL,
l_shipinstruct CHAR(25) NOT NULL,
l_shipmode CHAR(10) NOT NULL,
l_comment VARCHAR(44) NOT NULL
);
TPCH Query 6 used in this benchmark
SELECT
SUM(l_extendedprice * l_discount) AS revenue
FROM lineitem
WHERE l_shipdate >= DATE '1994-01-01'
AND l_shipdate < DATE '1995-01-01'
AND l_discount BETWEEN 0.05 AND 0.07
AND l_quantity < 24;
We used REST Lakekeeper as the Iceberg catalog and GCP as the storage layer on the destination side for this benchmark.
Bottom line: If you need to compact Iceberg tables quickly and keep them highly queryable as data changes continuously, OLake Fusion delivers faster compaction cycles with lower infrastructure cost.