MongoDB Driver
The MongoDB Driver enables data synchronization from MongoDB to your desired destination. It supports both Full Refresh and CDC (Change Data Capture) modes.
Supported Modes
-
Full Refresh
Fetches the complete dataset from MongoDB. -
CDC (Change Data Capture)
Tracks and syncs incremental changes from MongoDB in real time.
Setup and Configuration
To run the MongoDB Driver, configure the following files with your specific credentials and settings:
config.json
: MongoDB connection details.catalog.json
: List of collections and fields to sync (generated using the Discover command).write.json
: Configuration for the destination where the data will be written.
Place these files in your project directory before running the commands.
Config File
Add MongoDB credentials in following format in config.json file
{
"hosts": [
"host1:27017",
"host2:27017",
"host3:27017"
],
"username": "test",
"password": "test",
"authdb": "admin",
"replica-set": "rs0",
"read-preference": "secondaryPreferred",
"srv": true,
"server-ram": 16,
"database": "database",
"max_threads": 50,
"default_mode" :"cdc"
}
Commands
Discover Command
The Discover command generates json content for catalog.json
file, which defines the schema of the collections to be synced.
Usage
To run the Discover command, use the following syntax
./build.sh driver-mongodb discover --config /mongodb/examples/config.json
Example Response (Formatted)
After executing the Discover command, a formatted response will look like this:
{
"type": "CATALOG",
"catalog": {
"streams": [
{
"stream": {
"name": "tweets",
"namespace": "twitter_data",
"json_schema": {
"Properties": {},
"properties": {
"_id": {
"type": [
"array"
]
},
"user": {
"type": [
"object"
]
},
"withheld_in_countries": {
"type": [
"array"
]
}
}
},
"supported_sync_modes": [
"full_refresh",
"cdc"
],
"source_defined_primary_key": [
"_id"
],
"available_cursor_fields": [],
"sync_mode": "cdc"
}
}
]
}
}
Configure Catalog
Before running the Sync command, the generated catalog.json
file must be configured. Follow these steps:
-
Remove Unnecessary Streams: Delete any streams you do not want to sync.
-
Modify Each Stream: For each stream you want to sync:
- Add the following properties:
"sync_mode": "cdc",
- Specify the cursor field (only for incremental syncs):
"cursor_field": "<cursor field from available_cursor_fields>"
- Add the following properties:
-
Final Catalog Example
{
"streams": [
{
"stream": {
"name": "incr2",
"namespace": "incr",
"type_schema": {
"properties": {
"_id": { "type": ["string"] },
"address": { "type": ["string"] },
"age": { "type": ["integer"] },
"height": { "type": ["number"] },
"name": { "type": ["string"] }
}
},
"supported_sync_modes": ["full_refresh", "cdc"],
"source_defined_primary_key": ["_id"],
"available_cursor_fields": [],
"sync_mode": "cdc"
}
}
]
}
Writer File
The Writer file defines the configuration for the destination where data needs to be added.
normalization
determine that Level 1 flattening is required.
Example (For Local):
{
"type": "PARQUET",
"writer": {
"normalization":true,
"local_path": "./examples/reader"
}
}
Example (For S3):
{
"type": "PARQUET",
"writer": {
"normalization":false,
"s3_bucket": "olake",
"s3_region": "",
"s3_access_key": "",
"s3_secret_key": "",
"s3_path": ""
}
}
Sync Command
The Sync command fetches data from MongoDB and ingests it into the destination.
./build.sh driver-mongodb sync --config /mongodb/examples/config.json --catalog /mongodb/examples/catalog.json --destination /mongodb/examples/write.json
To run sync with state
./build.sh driver-mongodb sync --config /mongodb/examples/config.json --catalog /mongodb/examples/catalog.json --destination /mongodb/examples/write.json --state /mongodb/examples/state.json
State File
The State file is generated by the CLI command at the completion of a batch or the end of a sync. This file can be used to save the sync progress and later resume from a specific checkpoint.
State File Format
You can save the state in a state.json
file using the following format:
{
"type": "STREAM",
"streams": [
{
"stream":"stream_8",
"namespace":"otter_db",
"sync_mode":"cdc",
"state": {
"resume_token": {"_data": "82673F82FE000000022B0429296E1404"}
}
},
{
"stream":"stream_0",
"namespace":"otter_db",
"sync_mode":"cdc",
"state": {
"resume_token": {"_data": "82673F82FE000000022B0429296E1404"}
}
}
]
}