Skip to main content

State Controller

Common State Structure

Olake Connectors will follow a common structure, the structure is compatible for

  • Running Connectors On a Horizontal Scale

  • Running Connectors Resumable Full Load

{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "full-load",
"stream_name": "stream_6",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field" : "66ffcf79d9249bef96de3611",
"chunks" : [
{
"status": "succeed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
},
{
"status": "failed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
}
],
}
}
]
}

Some state terms

  • level : Defines whether the state is for a single stream or all streams. It can be one of stream or global .

  • version: to manage versioning and backward compatibility

  • streams: an array that has information about streams, (if level is stream then streams array has only one object)

  • sync_type : type of sync the stream is running on incremental, full-load, cdc. (Only in full load chunks will be available.)

  • streams :: stream_name : contains the name of the stream going to be synced.

  • streams :: stream_namespace : going to store the namespace of the stream. (It can be a unique identifier as well if two streams have the same name)

  • streams :: stream_state : This is the important part where the cursor is being stored for the same stream. This contains the cursor field and chunks of data.

  • streams :: stream_state :: cursor : field contains cursor ID till where sync is completed previously.

  • streams :: stream_state :: chunks : contains data batches and their sync status that will be synced in a full load.

Some State Examples

Two incremental streams and one Full load failure of one stream (Parallel run)

  • connector receives state
{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_1",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "66ffcf79d9249bef96de3611",
}
},
{
"sync_type": "incremental",
"stream_name": "stream_2",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "67ffcf79d9249bef96de3611",
}
}
]
}

Each thread will pick up one stream and each stream will pick up its own state (parallel mode)

```json
// example stream_1

{
"level": "stream",
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_1",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "66ffcf79d9249bef96de3611",
}
}
]
}

The full load stream will get nothing so it starts syncing with init of state (while creating chunks) as follows and updates accordingly

// example 2
{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "full-load",
"stream_name": "stream_3",
"stream_namespace": "otter_db",
"sync_type":{
"cursor_field": "67ffcf79d9249bef96de3611", // max cursor id or resume token
"chunks": [
{
"status": "succeed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
},
{
"status": "failed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
}
]
}
}
]
}

once all the chunks succeed delete chunks from the state and through it. if not succeed then through as it is.

In CDC syncs it is only possible (resumable full refresh) if cursor ID is available in wal logs or oplogs

// example 2
{
"level": "stream", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_3",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field" : "66ffcf79d9249bef96de3611"// max cursor id in chunks or resume token checkpointed on start of full load
},
}
]
}

Once each thread sends back the state, the main connector will append the state at the global level.

Now connector base gets all stream responses and saves them as global

{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_1",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "66ffcf79d9249bef96de3611",
}
},
{
"sync_type": "incremental",
"stream_name": "stream_2",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "67ffcf79d9249bef96de3611",
}
},
{
"sync_type": "incremental",
"stream_name": "stream_3",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "67ffcf79d9249bef96de3611",
}
}
]
}
info

There is no difference in parallel run or horizontal scale run in terms of state, state will be handled similarly in both cases just management of global level state is passed from connector level to SDK level

Example 2

Two streams CDC one full load

step 1: full load stream detection and split of state stream level

Every stream has its cursor field so can start CDC from there

the full load stream can start full load by creating chunks and saving them in state. As well as saving the current resume token or wal cursor.

Example 3

All CDC

All streams can pick up their states and use the cursor field to start CDC sync, some connectors may focus on synchronous CDC, for that resume token of each stream must equal or we have to choose the minimum cursor from all the streams to start from.


Need Assistance?

If you have any questions or uncertainties about setting up OLake, contributing to the project, or troubleshooting any issues, we’re here to help. You can:

  • Email Support: Reach out to our team at hello@olake.io for prompt assistance.
  • Join our Slack Community: where we discuss future roadmaps, discuss bugs, help folks to debug issues they are facing and more.
  • Schedule a Call: If you prefer a one-on-one conversation, schedule a call with our CTO and team.

Your success with OLake is our priority. Don’t hesitate to contact us if you need any help or further clarification!