Skip to main content

MongoDB Driver

The MongoDB Driver enables data synchronization from MongoDB to your desired destination. It supports both Full Refresh and CDC (Change Data Capture) modes.

Supported Modes

  1. Full Refresh
    Fetches the complete dataset from MongoDB.

  2. CDC (Change Data Capture)
    Tracks and syncs incremental changes from MongoDB in real time.

Setup and Configuration

To run the MongoDB Driver, configure the following files with your specific credentials and settings:

  • config.json: MongoDB connection details.
  • catalog.json: List of collections and fields to sync (generated using the Discover command).
  • write.json: Configuration for the destination where the data will be written.

Place these files in your project directory before running the commands.

Config File

Add MongoDB credentials in following format in config.json file

{
"hosts": [
"host1:27017",
"host2:27017",
"host3:27017"
],
"username": "test",
"password": "test",
"authdb": "admin",
"replica-set": "rs0",
"read-preference": "secondaryPreferred",
"srv": true,
"server-ram": 16,
"database": "database",
"max_threads": 50,
"default_mode" :"cdc"
}

Commands

Discover Command

The Discover command generates json content for catalog.json file, which defines the schema of the collections to be synced.

Usage

To run the Discover command, use the following syntax

./build.sh driver-mongodb discover --config /mongodb/examples/config.json 

Example Response (Formatted)

After executing the Discover command, a formatted response will look like this:

{
"type": "CATALOG",
"catalog": {
"streams": [
{
"stream": {
"name": "tweets",
"namespace": "twitter_data",
"json_schema": {
"Properties": {},
"properties": {
"_id": {
"type": [
"array"
]
},
"user": {
"type": [
"object"
]
},
"withheld_in_countries": {
"type": [
"array"
]
}
}
},
"supported_sync_modes": [
"full_refresh",
"cdc"
],
"source_defined_primary_key": [
"_id"
],
"available_cursor_fields": [],
"sync_mode": "cdc"
}
}
]
}
}

Configure Catalog

Before running the Sync command, the generated catalog.json file must be configured. Follow these steps:

  • Remove Unnecessary Streams: Delete any streams you do not want to sync.

  • Modify Each Stream: For each stream you want to sync:

    • Add the following properties:
      "sync_mode": "cdc",
    • Specify the cursor field (only for incremental syncs):
      "cursor_field": "<cursor field from available_cursor_fields>"
  • Final Catalog Example

    {
    "streams": [
    {
    "stream": {
    "name": "incr2",
    "namespace": "incr",
    "type_schema": {
    "properties": {
    "_id": { "type": ["string"] },
    "address": { "type": ["string"] },
    "age": { "type": ["integer"] },
    "height": { "type": ["number"] },
    "name": { "type": ["string"] }
    }
    },
    "supported_sync_modes": ["full_refresh", "cdc"],
    "source_defined_primary_key": ["_id"],
    "available_cursor_fields": [],
    "sync_mode": "cdc"
    }
    }
    ]
    }

Writer File

The Writer file defines the configuration for the destination where data needs to be added.

normalization determine that Level 1 flattening is required. Example (For Local):

{
"type": "PARQUET",
"writer": {
"normalization":true,
"local_path": "./examples/reader"
}
}

Example (For S3):

{
"type": "PARQUET",
"writer": {
"normalization":false,
"s3_bucket": "olake",
"s3_region": "",
"s3_access_key": "",
"s3_secret_key": "",
"s3_path": ""
}
}

Sync Command

The Sync command fetches data from MongoDB and ingests it into the destination.

./build.sh driver-mongodb sync --config /mongodb/examples/config.json --catalog /mongodb/examples/catalog.json --destination /mongodb/examples/write.json

To run sync with state

./build.sh driver-mongodb sync --config /mongodb/examples/config.json --catalog /mongodb/examples/catalog.json --destination /mongodb/examples/write.json --state /mongodb/examples/state.json

State File

The State file is generated by the CLI command at the completion of a batch or the end of a sync. This file can be used to save the sync progress and later resume from a specific checkpoint.

State File Format

You can save the state in a state.json file using the following format:

{
"type": "STREAM",
"streams": [
{
"stream":"stream_8",
"namespace":"otter_db",
"sync_mode":"cdc",
"state": {
"resume_token": {"_data": "82673F82FE000000022B0429296E1404"}
}
},
{
"stream":"stream_0",
"namespace":"otter_db",
"sync_mode":"cdc",
"state": {
"resume_token": {"_data": "82673F82FE000000022B0429296E1404"}
}
}
]
}

Need Assistance?

If you have any questions or uncertainties about setting up OLake, contributing to the project, or troubleshooting any issues, we’re here to help. You can:

  • Email Support: Reach out to our team at hello@olake.io for prompt assistance.
  • Join our Slack Community: where we discuss future roadmaps, discuss bugs, help folks to debug issues they are facing and more.
  • Schedule a Call: If you prefer a one-on-one conversation, schedule a call with our CTO and team.

Your success with OLake is our priority. Don’t hesitate to contact us if you need any help or further clarification!