Skip to main content

Running OLake on Kubernetes with Your Existing Airflow, Sync Your Data Effortlessly

· 7 min read
Schitiz Sharma
OLake Maintainer

olake-airflow-cover

At OLake, we're building tools to make data integration seamless. Today, we're excited to show you how to leverage your existing Apache Airflow setup to automate OLake data synchronization tasks directly on your Kubernetes cluster!

OLake is designed to efficiently sync data from various sources to your chosen destinations. This guide provides an Airflow DAG (Directed Acyclic Graph) that orchestrates the OLake sync command, handling dependencies like persistent storage and configuration management within Kubernetes.

This post assumes you already have:

  1. A running Apache Airflow instance.

Airflow Kubernetes Provider: The necessary provider must be installed in your Airflow environment. If not already installed, run:

# Activate your Airflow virtual environment first
pip install apache-airflow-providers-cncf-kubernetes

# Restart Airflow scheduler and webserver after installation
  1. Access to a Kubernetes cluster.

  2. Basic familiarity with Airflow concepts (UI, DAGs, Connections) and Kubernetes concepts (Namespaces, ConfigMaps, StorageClasses, PersistentVolumeClaims).

Let's dive in!

Setting the Stage

Before deploying the DAG, ensure the following are in place:

Airflow with LocalExecutor:

info

Crucially, your Airflow instance must be configured to use the LocalExecutor or SequentialExecutor.

Our DAG utilizes Airflow's PythonOperator to dynamically check for and create Kubernetes resources (like PersistentVolumeClaims) before running the main OLake task.

This Python code runs directly on the machine where the Airflow scheduler is running, which is how LocalExecutor and SequentialExecutor operate.

It will not work correctly out-of-the-box with the CeleryExecutor or KubernetesExecutor without modification.

You can verify this by checking the executor setting under the [core] section in your $AIRFLOW_HOME/airflow/airflow.cfg file. It should be set to either executor = LocalExecutor or executor = SequentialExecutor.

Kubernetes Connection in Airflow

You need an Airflow Connection configured to communicate with your Kubernetes cluster.

  1. Go to Admin -> Connections in the Airflow UI.

  2. Add or identify a connection of type Kubernetes.

  3. Configure it using either a Kubeconfig file (pasted JSON recommended for setups outside the cluster). To get the kubeconfig on your machine in JSON format. Please run:

kubectl config view --raw --flatten --output='json'
tip

Note down the Connection Id - you'll need it for the DAG configuration.

olake-airflow-1

Kubernetes Namespace

Choose or create a namespace in your Kubernetes cluster where OLake pods and related resources (like the PVC) will reside. You'll need the name of this namespace.

Kubernetes ConfigMaps

The OLake DAG relies on Kubernetes ConfigMaps to inject configuration. You must create three ConfigMaps in the target namespace before running the DAG:

  1. olake-source-config: Containing your source configuration in a key named source.json.

    Download cm_olake-source-config.yaml

curl -Lo cm_olake-source-config.yaml https://raw.githubusercontent.com/datazip-inc/olake-docs/refs/heads/master/kubernetes/cm_olake-source-config.yaml
  1. olake-destination-config: Containing your destination configuration in a key named destination.json.

    Download cm_olake-destination-config.yaml

curl -Lo cm_olake-destination-config.yaml https://raw.githubusercontent.com/datazip-inc/olake-docs/refs/heads/master/kubernetes/cm_olake-destination-config.yaml
  1. olake-streams-config: Containing your OLake streams configuration in a key named streams.json

    Download cm_olake-streams-config.yaml

curl -Lo cm_olake-streams-config.yaml https://raw.githubusercontent.com/datazip-inc/olake-docs/refs/heads/master/kubernetes/cm_olake-streams-config.yaml
info

This setup requires a streams.json generated beforehand using the OLake discover command against your source database.

  • Streams Generation Guides:
  • The content of this file will be placed within the cm_olake-streams-config.yaml ConfigMap.

Kubernetes StorageClass

OLake uses a state file to keep track of sync progress. To persist this state between runs, the DAG creates a PersistentVolumeClaim (PVC). You need a StorageClass in your Kubernetes cluster that supports ReadWriteMany (RWX) access mode.

note

At start of the sync, the state file just contains {}. OLake will update this file as it processes data.

Examples: azurefile on AKS, Google Cloud Filestore provisioner on GKE, AWS EFS provisioner on EKS, or an NFS provisioner.

Identify the name of a suitable StorageClass. You can often list available StorageClasses using kubectl get storageclass. Consult your Kubernetes administrator if unsure.

Getting the DAG File

You'll need the Python DAG file that defines the workflow. Download it here.

Configuring the DAG

Now, open the downloaded olake_sync_from_source.py file in a text editor. You need to customize a few variables at the top of the file to match your specific environment. Do not skip this step!

# --- Configuration ---
# !!! IMPORTANT: Set this to the ID of your configured Kubernetes connection in Airflow !!!
# This connection tells Airflow how to authenticate with your K8s cluster.
KUBERNETES_CONN_ID = "kubernetes_default" # <-- EDIT THIS LINE

# !!! IMPORTANT: Set this to the Kubernetes namespace where OLake pods should run !!!
# Ensure ConfigMaps and the PVC exist or will be created in this namespace.
TARGET_NAMESPACE = "olake" # <-- EDIT THIS LINE

# !!! IMPORTANT: Set this to the correct OLake image for your source database !!!
# Find images at: https://hub.docker.com/u/olakego
# Examples: "olakego/source-mongodb:latest", "olakego/source-mysql:latest", "olakego/source-postgres:latest"
OLAKE_IMAGE = "olakego/source-db:latest" # <-- EDIT THIS LINE

# !!! IMPORTANT: Set this to the name of a StorageClass in your K8s cluster !!!
# This StorageClass MUST support ReadWriteMany (RWX) access mode.
# Examples: "azurefile" (AKS), "standard-rwx" (GKE Filestore), an EFS provisioner (EKS)
STORAGE_CLASS_NAME = "default" # <-- EDIT THIS LINE

# --- Optional Node Affinity ---
# Set NODE_AFFINITY_LABEL_KEY and NODE_AFFINITY_LABEL_VALUE to target specific nodes.
# Set NODE_AFFINITY_REQUIRED to True for hard requirement, False for preference.
NODE_AFFINITY_REQUIRED = False # <-- EDIT THIS LINE (True/False)
NODE_AFFINITY_LABEL_KEY = "nodegroup_name" # <-- EDIT THIS LINE (e.g., "topology.kubernetes.io/zone")
NODE_AFFINITY_LABEL_VALUE = "olake" # <-- EDIT THIS LINE (e.g., "us-central1-a")

# --- Names of ConfigMaps ---
# Ensure ConfigMaps with these names exist in the TARGET_NAMESPACE
# containing your source, destination, and streams JSON configurations respectively.
SOURCE_CONFIG_MAP_NAME = "olake-source-config"
DESTINATION_CONFIG_MAP_NAME = "olake-destination-config"
STREAMS_CONFIG_MAP_NAME = "olake-streams-config"

Recap of values to change:

  • KUBERNETES_CONN_ID: Set this to the exact "Connection Id" of your Kubernetes connection in Airflow.

  • TARGET_NAMESPACE: Set this to the Kubernetes namespace where you created the ConfigMaps and where the OLake pod/PVC should run.

  • OLAKE_IMAGE: Crucially, change this to the specific OLake Docker image corresponding to your data source (e.g., olakego/source-postgres:latest, olakego/source-mongodb:latest, etc). Check the OLake Docker Hub page for available images and tags.

  • STORAGE_CLASS_NAME: Set this to the name of an available ReadWriteMany (RWX) StorageClass in your cluster.

  • NODE_AFFINITY_* (Optional): Adjust these if you need to control pod placement onto specific Kubernetes nodes based on labels. Leave NODE_AFFINITY_REQUIRED = False if you don't need strict node requirements.

  • *_CONFIG_MAP_NAME (Optional): Only change these if you named your prerequisite ConfigMaps differently than the defaults.

Deploying the DAG to Airflow

  1. Save your modified olake_sync_from_source.py file.

  2. Place the file into the dags folder recognized by your Airflow instance. The location of this folder depends on your Airflow setup.

  3. Airflow automatically scans this folder. Wait a minute or two, and the DAG named olake_sync_from_source should appear in the Airflow UI. You might need to unpause it (toggle button on the left) if it loads in a paused state.

Running Your OLake Sync

  1. Navigate to the Airflow UI.

  2. Find the olake_sync_from_source DAG.

  3. Click the "Play" button (▶️) on the right side to trigger a manual run.

olake-airflow-2

  1. Click on the running DAG instance to view the Graph or Gantt chart.

  2. The first task (create_pvc) will run, checking for and creating the olake-shared-data PVC if needed.

  3. Once that succeeds, the sync_data task will start. This task launches the actual OLake pod on your Kubernetes cluster.

  4. You can click on the sync_data task instance and view its logs to see the output from the OLake process itself.

How to set up OLake cron job scheduler

Modify the line #52 from the olake_sync_from_source.py DAG file (schedule=None) with the frequency you wish to setup.

with DAG(
dag_id="olake_sync_from_source",
start_date=dag_start_date,
schedule="@daily", # SAMPLE VALUE
catchup=False,
...

For more information on how to add a schedule, refer Cron & Time Intervals docs.

That's it! You've successfully configured and run an OLake sync task orchestrated by your existing Airflow instance, leveraging the power of Kubernetes for execution. This setup provides a robust and automated way to manage your data synchronization pipelines.

Happy Syncing!

OLake

Achieve 5x speed data replication to Lakehouse format with OLake, our open source platform for efficient, quick and scalable big data ingestion for real-time analytics.

Contact us at hello@olake.io