Benchmarks
Use the tabs below to view detailed benchmarks per connector. Each tab has a unique URL you can copy/share.
- Postgres
- MongoDB
- MySQL
- Oracle
- Kafka
PostgreSQL to Apache Iceberg Connector Benchmark
Benchmark Environment
- This benchmark uses standard NYC Taxi Data
tripsandfhv_tripstables. - The original repo ingests data into local postgres, so we have modified it to ingest into remote cloud postgres (Azure Flexible DB) NYC Taxi Data
- Total rows 4,008,587,913 rows including both tables.
- The average row size is 144 bytes for
tripsand 121 bytes forfhv_trips. - OLake & Debezium were run on Azure Standard D64ls v5 VM (64 vCPUs, 128 GiB memory), other platforms are used as a cloud offering (Fivetran, Estuary, Airbyte)
- Database instance: Azure Standard_D32ads_v5 (32 vCores, 128 GiB Memory, 51200 max IOPS)
(OLake vs. Popular Data-Movement Tools)
1. Speed Comparison – Full-Load Performance
| Tool | Rows Synced | Throughput (rows / sec) | Relative to OLake |
|---|---|---|---|
| OLake (as of 30th Jan 2026) | 4.01 B | 5,80,113 RPS | – |
| Fivetran (as of 30th Apr 2025) | 4.01 B | 46,395 RPS | 12.5 × slower |
| Debezium (memiiso) (as of 30th Apr 2025) | 1.28 B | 14,839 RPS | 39.1 × slower |
| Estuary (as of 30th Apr 2025) | 0.34 B | 3,982 RPS | 146 × slower |
| Airbyte Cloud (as of 30th Apr 2025) | 12.7 M | 457 RPS | 1270 × slower |
¹ Estuary ran the same 24-hour window but processed a ~10× smaller dataset, so its throughput looks even lower when normalized.
Memory usage (OLake) - Standard D64ls v5 (64 vcpus, 128 GiB Memory)
| Memory Stats | Usage (GB) |
|---|---|
| Min | 2.95 |
| Max | 74.95 |
| Mean | 60.01 |
OLake maintains high throughput while keeping memory usage efficient.
- The time elapsed for all the tools was 24 hours, but OLake and Fivetran were able to process the entire dataset in that time. Airbyte failed with a sync after 7.5 hours, so we only have throughput for the first part of the test.
Key takeaway: OLake now delivers upto 12.5x faster bulk-load performance than Fivetran, while outpacing every other open-source alternative by 35x to over 1000x.
2. Speed Comparison – Change-Data-Capture (CDC)
| Tool | CDC Window | Throughput (rows / sec) | Relative to OLake |
|---|---|---|---|
| OLake (as of 30th Jan 2026) | 15 min | 55,555 RPS | – |
| Fivetran (as of 30th Apr 2025) | 31 min | 26,910 RPS | 2 × slower |
| Debezium (memiiso) (as of 30th Apr 2025) | 60 min | 13,808 RPS | 4 × slower |
| Estuary (as of 30th Apr 2025) | 4.5 h | 3,085 RPS | 18 × slower |
| Airbyte Cloud (as of 30th Apr 2025) | 23 h | 585 RPS | 95 × slower |
The rows synced in the CDC test were the same 50 million changes that OLake processed in 15 minutes. The other tools were tested on the same dataset, but they had different CDC windows (timings).
Key takeaway: For incremental workloads OLake leads the pack, moving 50 million PostgreSQL changes into Iceberg 106 % faster than Fivetran and 10-95× faster than other OSS connectors.
3. Cost Comparison (Vendor List Prices)
| Tool | Scenario | Spend (USD) | Rows Synced |
|---|---|---|---|
| OLake | Full Load / CDC | Cost of a Standard D64ls v5 (64 vcpus, 128 GiB memory) running for 1.91 hours < $ 6 | 4.01 B / 50M |
| Fivetran | Full Load | $ 0 (free full sync) | 4.01 B |
| Estuary | Full Load | $ 1,668 | 0.34 B |
| Airbyte Cloud | Full Load | $ 5,560 | 12.7 M |
| Fivetran | CDC | $ 2, 375.80 | 50 M |
| Estuary | CDC | $ 17.63 | 50 M |
| Airbyte Cloud | CDC | $ 148.95 | 50 M |
- OLake is open-source and can be deployed on your own Kubernetes cluster or cloud VMs; you pay only for the compute and storage you provision.
Dataset and Table Schemas
Please refer to this GitHub repository for the dataset we used to conduct these benchmarks.
We first performed a full-load sync of empty dummy tables. Afterwards, we inserted the top 25 million records from both trips and fhv_trips into these tables and ran a CDC sync.
trips table
CREATE TABLE trips (
id bigserial NOT NULL,
cab_type_id int4 NULL,
vendor_id int4 NULL,
pickup_datetime timestamp NULL,
dropoff_datetime timestamp NULL,
store_and_fwd_flag bool NULL,
rate_code_id int4 NULL,
pickup_longitude numeric NULL,
pickup_latitude numeric NULL,
dropoff_longitude numeric NULL,
dropoff_latitude numeric NULL,
passenger_count int4 NULL,
trip_distance numeric NULL,
fare_amount numeric NULL,
extra numeric NULL,
mta_tax numeric NULL,
tip_amount numeric NULL,
tolls_amount numeric NULL,
ehail_fee numeric NULL,
improvement_surcharge numeric NULL,
congestion_surcharge numeric NULL,
airport_fee numeric NULL,
total_amount numeric NULL,
payment_type int4 NULL,
trip_type int4 NULL,
pickup_nyct2010_gid int4 NULL,
dropoff_nyct2010_gid int4 NULL,
pickup_location_id int4 NULL,
dropoff_location_id int4 NULL,
CONSTRAINT trips_pkey PRIMARY KEY (id)
);
fhv_trips table
CREATE TABLE fhv_trips (
id bigserial NOT NULL,
hvfhs_license_num text NULL,
dispatching_base_num text NULL,
originating_base_num text NULL,
request_datetime timestamp NULL,
on_scene_datetime timestamp NULL,
pickup_datetime timestamp NULL,
dropoff_datetime timestamp NULL,
pickup_location_id int4 NULL,
dropoff_location_id int4 NULL,
trip_miles numeric NULL,
trip_time numeric NULL,
base_passenger_fare numeric NULL,
tolls numeric NULL,
black_car_fund numeric NULL,
sales_tax numeric NULL,
congestion_surcharge numeric NULL,
airport_fee numeric NULL,
tips numeric NULL,
driver_pay numeric NULL,
shared_request bool NULL,
shared_match bool NULL,
access_a_ride bool NULL,
wav_request bool NULL,
wav_match bool NULL,
legacy_shared_ride int4 NULL,
affiliated_base_num text NULL,
CONSTRAINT fhv_trips_pkey PRIMARY KEY (id)
);
We used AWS Glue as Iceberg catalog and AWS S3 as the storage layer on the destination side for this benchmarks.
Bottom line: If you need to land terabytes of PostgreSQL data into Apache Iceberg quickly—and keep it continually up-to-date—OLake delivers enterprise-grade speed without the enterprise-grade bill.
Oracle → Apache Iceberg Connector Benchmark
Oracle data powers your business—so migrations should be fast and seamless. OLake helps you move massive Oracle datasets to Apache Iceberg at high speed, with predictable performance and no vendor lock-in.
Benchmark Environment
- This benchmark uses standard NYC Taxi Data
tripsandfhv_tripstables. - Since the original repo supports only PostgreSQL, we first ingested the NYC Taxi Data in the cloud PostgreSQL database (Azure Flexible DB), and then transferred the tables from there to our Oracle database.
- Total rows 4,008,587,913 rows including both tables.
- The average row size is 144 bytes for
tripsand 121 bytes forfhv_trips. - OLake was run on Azure Standard D64ls v5 VM (64 vCPUs, 128 GiB memory)
- Database instance: AWS RDS db.r6i.4xlarge (8 vCPUs, 32 GiB Memory)
(OLake's Performance with Oracle Database)
1. Speed Test – Full-Load Performance
| Tool | Rows Synced | Throughput (rows / sec) |
|---|---|---|
| OLake (as of 30th Jan 2026) | 4.01 B | 5,26,337 RPS |
Memory usage (OLake) - Standard D64ls v5 (64 vCPUs, 128 GiB Memory)
| Memory Stats | Usage (GB) |
|---|---|
| Min | 19.26 |
| Max | 93.16 |
| Mean | 73.18 |
OLake maintains high throughput while keeping memory usage efficient.
2. Cost at a Glance
| Tool | Scenario | Spend (USD) | Rows Synced |
|---|---|---|---|
| OLake | Full Load | Cost of a Standard D64ls v5 (64 vCPUs, 128 GiB memory) running for 2.11 hours < $ 6 | 4.01 B |
Dataset and Table Schemas
Please refer to this GitHub repository for the dataset we used to conduct these benchmarks.
For Oracle, we performed a full-load sync of the trips and fhv_trips tables into Apache Iceberg.
trips table
CREATE TABLE trips (
id bigserial NOT NULL,
cab_type_id int4 NULL,
vendor_id int4 NULL,
pickup_datetime timestamp NULL,
dropoff_datetime timestamp NULL,
store_and_fwd_flag bool NULL,
rate_code_id int4 NULL,
pickup_longitude numeric NULL,
pickup_latitude numeric NULL,
dropoff_longitude numeric NULL,
dropoff_latitude numeric NULL,
passenger_count int4 NULL,
trip_distance numeric NULL,
fare_amount numeric NULL,
extra numeric NULL,
mta_tax numeric NULL,
tip_amount numeric NULL,
tolls_amount numeric NULL,
ehail_fee numeric NULL,
improvement_surcharge numeric NULL,
congestion_surcharge numeric NULL,
airport_fee numeric NULL,
total_amount numeric NULL,
payment_type int4 NULL,
trip_type int4 NULL,
pickup_nyct2010_gid int4 NULL,
dropoff_nyct2010_gid int4 NULL,
pickup_location_id int4 NULL,
dropoff_location_id int4 NULL,
CONSTRAINT trips_pkey PRIMARY KEY (id)
);
fhv_trips table
CREATE TABLE fhv_trips (
id bigserial NOT NULL,
hvfhs_license_num text NULL,
dispatching_base_num text NULL,
originating_base_num text NULL,
request_datetime timestamp NULL,
on_scene_datetime timestamp NULL,
pickup_datetime timestamp NULL,
dropoff_datetime timestamp NULL,
pickup_location_id int4 NULL,
dropoff_location_id int4 NULL,
trip_miles numeric NULL,
trip_time numeric NULL,
base_passenger_fare numeric NULL,
tolls numeric NULL,
black_car_fund numeric NULL,
sales_tax numeric NULL,
congestion_surcharge numeric NULL,
airport_fee numeric NULL,
tips numeric NULL,
driver_pay numeric NULL,
shared_request bool NULL,
shared_match bool NULL,
access_a_ride bool NULL,
wav_request bool NULL,
wav_match bool NULL,
legacy_shared_ride int4 NULL,
affiliated_base_num text NULL,
CONSTRAINT fhv_trips_pkey PRIMARY KEY (id)
);
We used AWS Glue as Iceberg catalog and AWS S3 as the storage layer on the destination side for this benchmarks.
Bottom line: If you need to land terabytes of Oracle data into Apache Iceberg quickly—OLake delivers enterprise-grade speed without the enterprise-grade bill.
MongoDB → Apache Iceberg Connector Benchmark
Benchmark Environment
- This benchmark uses standard Twitter data
tweetstable. - Total 233,955,436 rows for the
tweetstables. - The average row size for
tweetstable is 3655 bytes - OLake was run on AWS EC2 c6i.16xlarge (64 vCPUs, 128 GiB memory)
- Database instance: 3 x Standard D16as v5 (16 vcpus, 64 GiB memory)
(OLake's Performance with MongoDB Database)
1. Speed Test – Full-Load Performance
| Tool | Rows Synced | Throughput (rows / sec) |
|---|---|---|
| OLake (as of 5th Feb 2026) | 233 M | 37,879 RPS |
Memory usage (OLake) - Standard D64ls v5 (64 vCPUs, 128 GiB Memory)
| Memory Stats | Usage (GB) |
|---|---|
| Min | 2.89 |
| Max | 112.22 |
| Mean | 71.18 |
OLake maintains high throughput while keeping memory usage efficient.
2. Speed Comparison – Change-Data-Capture (CDC)
| Tool | CDC Window | Throughput (rows / sec) | Relative to OLake |
|---|---|---|---|
| OLake | 38.96 mins | 10,692 RPS | – |
3. Cost at a Glance
| Tool | Scenario | Spend (USD) | Rows Synced |
|---|---|---|---|
| OLake | Full Load | Cost of a Standard D64ls v5 (64 vCPUs, 128 GiB memory) running for 2.11 hours < $ 5 | 233 M |
Dataset and Table Schemas
For MongoDB, we performed a full-load and CDC sync of the tweets tables into Apache Iceberg.
tweets table
CREATE TABLE trips (
"_id": ObjectId,
"delete": Object,
"created_at": String,
"id": Int64,
"id_str": String,
"text": String,
"source": String,
"truncated": Boolean,
"in_reply_to_status_id": Mixed,
"in_reply_to_status_id_str": Mixed,
"in_reply_to_user_id": Mixed,
"in_reply_to_user_id_str": Mixed,
"in_reply_to_screen_name": Mixed,
"user": Object,
"geo": Null,
"coordinates": Null,
"place": Null,
"contributors": Null,
"is_quote_status": Boolean,
"quote_count": Int32,
"reply_count": Int32,
"retweet_count": Int32,
"favorite_count": Int32,
"entities": Object,
"favorited": Boolean,
"retweeted": Boolean,
"filter_level": String,
"lang": String,
"timestamp_ms": String,
"display_text_range": Array,
"extended_entities": Object,
"possibly_sensitive": Boolean,
"retweeted_status": Object,
"extended_tweet": Object,
"quoted_status_id": Int64,
"quoted_status_id_str": String,
"quoted_status": Object
);
We used AWS Glue as Iceberg catalog and AWS S3 as the storage layer on the destination side for this benchmarks.
MySQL → Apache Iceberg Connector Benchmark
Benchmark Environment
- This benchmark uses standard NYC Taxi Data
tripsandfhv_tripstables. - Since the original repo supports only PostgreSQL, we first ingested the NYC Taxi Data in the cloud PostgreSQL database (Azure Flexible DB), and then transferred the tables from there to our MySQL database.
- Total rows 4,001,991,536 rows including both tables.
- The average row size is 144 bytes for
tripsand 121 bytes forfhv_trips. - OLake & Debezium were run on AWS EC2 c6i.16xlarge (64 vCPUs, 128 GiB memory)
- Database instance: Azure Standard D32as v6 (32 vCPUs, 128 GiB Memory)
(OLake vs. Popular Data-Movement Tool)
1. Speed Comparison – Full-Load Performance
| Tool | Rows Synced | Throughput (rows / sec) | Relative to OLake |
|---|---|---|---|
| OLake (as of 14th Nov 2025) | 4.0 B | 3,38,005 RPS | – |
| Fivetran (as of 14th Nov 2025) | 4.0 B | 119,106 RPS | 2.83 × slower |
Memory usage (OLake) - c6i.16xlarge (64 vCPUs, 128 GiB memory)
| Memory Stats | Usage (GB) |
|---|---|
| Min | 3.24 |
| Max | 75.1 |
| Mean | 48.95 |
OLake maintains high throughput while keeping memory usage efficient.
2. Speed Comparison – Change-Data-Capture (CDC)
| Tool | CDC Window | Throughput (rows / sec) | Relative to OLake |
|---|---|---|---|
| OLake | 16.06 min | 51,867 RPS | – |
| Fivetran | 29.86 min | 27,901 RPS | 1.85 × slower |
Key takeaway: For incremental workloads OLake leads the pack, moving 50 million MySQL changes into Iceberg 85.9 % faster than Fivetran
3. Cost Comparison (Vendor List Prices)
| Tool | Scenario | Spend (USD) | Rows Synced |
|---|---|---|---|
| OLake | Full Load / CDC | Cost of a c6i.16xlarge (64 vCPUs, 128 GiB memory) running for 3.3 hours < $ 11 | 4.0 B / 50 M |
| Fivetran | Full Load | $ 0 (free full sync) | 4.0 B |
| Fivetran | CDC | $ 2, 375.80 | 50 M |
- OLake is open-source and can be deployed on your own Kubernetes cluster or cloud VMs; you pay only for the compute and storage you provision.
Dataset and Table Schemas
Please refer to this GitHub repository for the dataset we used to conduct these benchmarks.
For MySQL, we performed a full-load and CDC sync of the trips and fhv_trips tables into Apache Iceberg.
trips table
CREATE TABLE trips (
id BIGINT NOT NULL AUTO_INCREMENT,
cab_type_id INT NULL,
vendor_id INT NULL,
pickup_datetime DATETIME NULL,
dropoff_datetime DATETIME NULL,
store_and_fwd_flag TINYINT(1) NULL,
rate_code_id INT NULL,
pickup_longitude DECIMAL(10,2) NULL,
pickup_latitude DECIMAL(10,2) NULL,
dropoff_longitude DECIMAL(10,2) NULL,
dropoff_latitude DECIMAL(10,2) NULL,
passenger_count INT NULL,
trip_distance DECIMAL(10,2) NULL,
fare_amount DECIMAL(10,2) NULL,
extra DECIMAL(10,2) NULL,
mta_tax DECIMAL(10,2) NULL,
tip_amount DECIMAL(10,2) NULL,
tolls_amount DECIMAL(10,2) NULL,
ehail_fee DECIMAL(10,2) NULL,
improvement_surcharge DECIMAL(10,2) NULL,
congestion_surcharge DECIMAL(10,2) NULL,
airport_fee DECIMAL(10,2) NULL,
total_amount DECIMAL(10,2) NULL,
payment_type INT NULL,
trip_type INT NULL,
pickup_nyct2010_gid INT NULL,
dropoff_nyct2010_gid INT NULL,
pickup_location_id INT NULL,
dropoff_location_id INT NULL,
PRIMARY KEY (id)
);
fhv_trips table
CREATE TABLE fhv_trips (
id BIGINT NOT NULL AUTO_INCREMENT,
hvfhs_license_num TEXT NULL,
dispatching_base_num TEXT NULL,
originating_base_num TEXT NULL,
request_datetime DATETIME NULL,
on_scene_datetime DATETIME NULL,
pickup_datetime DATETIME NULL,
dropoff_datetime DATETIME NULL,
pickup_location_id INT NULL,
dropoff_location_id INT NULL,
trip_miles DECIMAL(10,2) NULL,
trip_time DECIMAL(10,2) NULL,
base_passenger_fare DECIMAL(10,2) NULL,
tolls DECIMAL(10,2) NULL,
black_car_fund DECIMAL(10,2) NULL,
sales_tax DECIMAL(10,2) NULL,
congestion_surcharge DECIMAL(10,2) NULL,
airport_fee DECIMAL(10,2) NULL,
tips DECIMAL(10,2) NULL,
driver_pay DECIMAL(10,2) NULL,
shared_request TINYINT(1) NULL,
shared_match TINYINT(1) NULL,
access_a_ride TINYINT(1) NULL,
wav_request TINYINT(1) NULL,
wav_match TINYINT(1) NULL,
legacy_shared_ride INT NULL,
affiliated_base_num TEXT NULL,
PRIMARY KEY (id)
);
We used AWS Glue as Iceberg catalog and AWS S3 as the storage layer on the destination side for this benchmarks.
Bottom line: If you need to land terabytes of MySQL data into Apache Iceberg quickly—OLake delivers enterprise-grade speed without the enterprise-grade bill.
Kafka → Apache Iceberg Connector Benchmark
Benchmark Environment
- This benchmark uses the schema from the standard NYC Taxi Data
tripstable. - Since the original repo does not support direct Kafka ingestion, we replicated the
tripstable schema and generated random data matching the schema structure to create Kafka messages. - Total messages: 1,000,000,000 messages with schema identical to the
tripstable. - The average message size is 144 bytes (same as the original
tripstable row size). - Kafka Cluster: AWS MSK [3 broker nodes each m7g.4xlarge (16 vCPUs, 64 GiB Memory)]
- Topic Configuration: 1 topic (
trips) with 5 partitions - OLake was run on Azure Standard D64ls v6 VM (64 vCPUs, 128 GiB memory)
(OLake vs. Popular Data-Movement Tools)
1. Speed Comparison – Batch-Load Performance
| Tool | Rows Synced | Throughput (rows / sec) | Relative to OLake |
|---|---|---|---|
| OLake (as of 5th Dec 2025) | 1.0 B | 1,54,320 MPS | – |
| Flink (as of 5th Dec 2025) | 1.0 B | 85,470 MPS | 1.8 × slower |
Memory usage (OLake vs Flink) - Standard D64ls v6 (64 vcpus, 128 GiB Memory)
| Memory Stats | OLake Usage (GB) | Flink Usage (GB) |
|---|---|---|
| Min | 43.14 | 16.34 |
| Max | 65.91 | 40.92 |
| Mean | 63.15 | 39.89 |
2. Cost Comparison (Vendor List Prices)
| Tool | Scenario | Compute (USD) | Brokers (USD) | Total (USD) | Rows Synced |
|---|---|---|---|---|---|
| OLake | Batch Load | $6.51 (D64ls v6 running for 1.8 hours) | $7.57 (3× m7g.4xlarge running for 1.8 hours) | $14.08 | 1.0 B |
| Flink | Batch Load | $11.75 (D64ls v6 running for 3.25 hours) | $14.38 (3× m7g.4xlarge running for 3.25 hours) | $26.13 | 1.0 B |
Both OLake and Flink are open-source and can be deployed on your own Kubernetes cluster or cloud VMs; you pay only for the compute and storage you provision.
Dataset and Table Schemas
Please refer to this GitHub repository for the dataset we used to conduct these benchmarks.
trips table
CREATE TABLE trips (
id bigserial NOT NULL,
cab_type_id int4 NULL,
vendor_id int4 NULL,
pickup_datetime timestamp NULL,
dropoff_datetime timestamp NULL,
store_and_fwd_flag bool NULL,
rate_code_id int4 NULL,
pickup_longitude numeric NULL,
pickup_latitude numeric NULL,
dropoff_longitude numeric NULL,
dropoff_latitude numeric NULL,
passenger_count int4 NULL,
trip_distance numeric NULL,
fare_amount numeric NULL,
extra numeric NULL,
mta_tax numeric NULL,
tip_amount numeric NULL,
tolls_amount numeric NULL,
ehail_fee numeric NULL,
improvement_surcharge numeric NULL,
congestion_surcharge numeric NULL,
airport_fee numeric NULL,
total_amount numeric NULL,
payment_type int4 NULL,
trip_type int4 NULL,
pickup_nyct2010_gid int4 NULL,
dropoff_nyct2010_gid int4 NULL,
pickup_location_id int4 NULL,
dropoff_location_id int4 NULL,
CONSTRAINT trips_pkey PRIMARY KEY (id)
);
We used AWS Glue as Iceberg catalog and AWS S3 as the storage layer on the destination side for this benchmarks.
Flink setup for this benchmark
This section covers the complete Flink setup process used for the Kafka → Iceberg benchmark, from installation to configuration.
Prerequisites
- Java 11+ must be installed on your system. Verify your Java version:
java -version
Installation
-
Download the Flink tar release from the Apache Flink downloads page.
-
Extract the archive:
$ tar -xzf flink-*.tgz -
Navigate to the extracted Flink directory:
$ cd flink-*
Flink Cluster Configuration
This is the exact Flink cluster configuration we used for the Kafka → Iceberg benchmark, including JVM module opens, JobManager/TaskManager sizing, and S3 endpoint settings.
env:
java:
opts:
all: >
--add-exports=java.base/sun.net.util=ALL-UNNAMED
--add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED
--add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED
--add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED
--add-opens=java.base/java.lang=ALL-UNNAMED
--add-opens=java.base/java.net=ALL-UNNAMED
--add-opens=java.base/java.io=ALL-UNNAMED
--add-opens=java.base/java.nio=ALL-UNNAMED
--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
--add-opens=java.base/java.text=ALL-UNNAMED
--add-opens=java.base/java.time=ALL-UNNAMED
--add-opens=java.base/java.util=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED
--add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED
jobmanager:
bind-host: localhost
rpc:
address: localhost
port: 6123
memory:
process:
size: 8192m
execution:
failover-strategy: region
taskmanager:
bind-host: localhost
host: localhost
numberOfTaskSlots: 5
memory:
process:
size: 81920m
parallelism:
default: 5
execution:
checkpointing:
interval: 60000
mode: EXACTLY_ONCE
timeout: 600000
max-concurrent-checkpoints: 1
state:
backend:
type: rocksdb
checkpoints:
dir: s3://<checkpoint-bucket>/<path>/
savepoints:
dir: s3://<savepoint-bucket>/<path>/
s3:
endpoint: s3.ap-south-1.amazonaws.com
path:
style:
access: false
rest:
address: 0.0.0.0
port: 8081
Running a Flink Job
Before running a Flink job, make sure you have all of these jar files in the lib directory. These jar files are required as dependencies for transferring data from Kafka to Iceberg using Glue as catalog:
- aws-java-sdk-bundle-1.12.262.jar
- bundle-2.20.160.jar
- commons-configuration2-2.8.0.jar
- flink-cep-2.0.1.jar
- flink-connector-files-2.0.1.jar
- flink-csv-2.0.1.jar
- flink-dist-2.0.1.jar
- flink-json-2.0.1.jar
- flink-scala_2.12-2.0.1.jar
- flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
- flink-shaded-hadoop-3-uber-3.3.6-20.0.jar
- flink-sql-connector-kafka-4.0.0-2.0.jar
- flink-table-api-java-uber-2.0.1.jar
- flink-table-planner-loader-2.0.1.jar
- flink-table-runtime-2.0.1.jar
- guava-27.0.1-jre.jar
- hadoop-aws-3.3.6.jar
- iceberg-aws-bundle-1.10.0.jar
- iceberg-flink-runtime-2.0-1.10.0.jar
- log4j-1.2-api-2.24.3.jar
- log4j-api-2.24.3.jar
- log4j-core-2.24.3.jar
- log4j-slf4j-impl-2.24.3.jar
- schema-registry-flink-serde-1.1.18.jar
- stax2-api-4.2.1.jar
- woodstox-core-5.3.0.jar
Below is the complete commands for the transfer process:
Starting the Flink Cluster
Start the Flink cluster:
$ ./bin/start-cluster.sh
Verify that Flink is running properly by accessing the Flink Web UI at http://localhost:8081 in your browser.
Setting up Flink SQL Client
-
Start the Flink SQL client:
./bin/sql-client.sh -
Check available catalogs:
Flink SQL> show catalogs; -
Use the default catalog:
Flink SQL> USE CATALOG default_catalog;
Configuring Kafka Connection and initiating the transfer
-
Drop any existing table to avoid conflicts:
Flink SQL> DROP TABLE IF EXISTS nyc_trips_flink; -
Set the execution runtime mode to batch:
Flink SQL> SET 'execution.runtime-mode' = 'batch'; -
Create the Kafka table with the following schema:
Flink SQL> CREATE TABLE nyc_trips_flink (
id INT,
cab_type_id INT,
vendor_id INT,
pickup_datetime STRING,
dropoff_datetime STRING,
store_and_fwd_flag BOOLEAN,
rate_code_id INT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count INT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
congestion_surcharge DOUBLE,
airport_fee DOUBLE,
total_amount DOUBLE,
payment_type INT,
trip_type INT,
pickup_nyct2010_gid INT,
dropoff_nyct2010_gid INT,
pickup_location_id INT,
dropoff_location_id INT
) WITH (
'connector' = 'kafka',
'topic' = '<YOUR_TOPIC_NAME>',
'properties.bootstrap.servers' = '<LIST_OF_BOOTSTRAP_SERVERS>',
'properties.security.protocol' = '<SECURITY_PROTOCOL_OF_YOUR_KAFKA_CLUSTER>',
'properties.group.id' = '<UNIQUE_GROUP_ID>',
'scan.startup.mode' = 'earliest-offset',
'scan.bounded.mode' = 'latest-offset',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
); -
Creating AWS Glue Catalog
Flink SQL> CREATE CATALOG glue_catalog WITH (
'type' = 'iceberg',
'catalog-impl' = 'org.apache.iceberg.aws.glue.GlueCatalog',
'warehouse' = 's3://<BUCKET_NAME>/',
'io-impl' = 'org.apache.iceberg.aws.s3.S3FileIO',
'client.region' = '<REGION_NAME>'
); -
Use the AWS Glue catalog
USE CATALOG glue_catalog; -
Use the database from Glue Catalog where you want the Iceberg table to be created
USE <DATABASE_NAME>; -
Create the table in Iceberg with the same schema as the Kafka topic:
Flink SQL> CREATE TABLE IF NOT EXISTS nyc_trips_mtasks (
id INT,
cab_type_id INT,
vendor_id INT,
pickup_datetime STRING,
dropoff_datetime STRING,
store_and_fwd_flag BOOLEAN,
rate_code_id INT,
pickup_longitude DOUBLE,
pickup_latitude DOUBLE,
dropoff_longitude DOUBLE,
dropoff_latitude DOUBLE,
passenger_count INT,
trip_distance DOUBLE,
fare_amount DOUBLE,
extra DOUBLE,
mta_tax DOUBLE,
tip_amount DOUBLE,
tolls_amount DOUBLE,
ehail_fee DOUBLE,
improvement_surcharge DOUBLE,
congestion_surcharge DOUBLE,
airport_fee DOUBLE,
total_amount DOUBLE,
payment_type INT,
trip_type INT,
pickup_nyct2010_gid INT,
dropoff_nyct2010_gid INT,
pickup_location_id INT,
dropoff_location_id INT
); -
Insert the data from the Kafka topic into the Iceberg table:
Flink SQL> INSERT INTO nyc_trips_mtasks
SELECT
id,
cab_type_id,
vendor_id,
pickup_datetime,
dropoff_datetime,
store_and_fwd_flag,
rate_code_id,
pickup_longitude,
pickup_latitude,
dropoff_longitude,
dropoff_latitude,
passenger_count,
trip_distance,
fare_amount,
extra,
mta_tax,
tip_amount,
tolls_amount,
ehail_fee,
improvement_surcharge,
congestion_surcharge,
airport_fee,
total_amount,
payment_type,
trip_type,
pickup_nyct2010_gid,
dropoff_nyct2010_gid,
pickup_location_id,
dropoff_location_id
FROM default_catalog.default_database.nyc_trips_flink;Check in the Flink Web UI at
http://localhost:8081to verify that the job has been created and is running.