State Controller
Common State Structure
Olake Connectors will follow a common structure, the structure is compatible for
-
Running Connectors On a Horizontal Scale
-
Running Connectors Resumable Full Load
{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "full-load",
"stream_name": "stream_6",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field" : "66ffcf79d9249bef96de3611",
"chunks" : [
{
"status": "succeed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
},
{
"status": "failed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
}
],
}
}
]
}
Some state terms
-
level
: Defines whether the state is for a single stream or all streams. It can be one of stream or global . -
version
: to manage versioning and backward compatibility -
streams
: an array that has information about streams, (if level is stream then streams array has only one object) -
sync_type
: type of sync the stream is running on incremental, full-load, cdc. (Only in full load chunks will be available.) -
streams :: stream_name
: contains the name of the stream going to be synced. -
streams :: stream_namespace
: going to store the namespace of the stream. (It can be a unique identifier as well if two streams have the same name) -
streams :: stream_state
: This is the important part where the cursor is being stored for the same stream. This contains the cursor field and chunks of data. -
streams :: stream_state
:: cursor : field contains cursor ID till where sync is completed previously. -
streams :: stream_state
:: chunks : contains data batches and their sync status that will be synced in a full load.
Some State Examples
Two incremental streams and one Full load failure of one stream (Parallel run)
- connector receives state
{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_1",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "66ffcf79d9249bef96de3611",
}
},
{
"sync_type": "incremental",
"stream_name": "stream_2",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "67ffcf79d9249bef96de3611",
}
}
]
}
Each thread will pick up one stream and each stream will pick up its own state (parallel mode)
```json
// example stream_1
{
"level": "stream",
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_1",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "66ffcf79d9249bef96de3611",
}
}
]
}
The full load stream will get nothing so it starts syncing with init of state (while creating chunks) as follows and updates accordingly
// example 2
{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "full-load",
"stream_name": "stream_3",
"stream_namespace": "otter_db",
"sync_type":{
"cursor_field": "67ffcf79d9249bef96de3611", // max cursor id or resume token
"chunks": [
{
"status": "succeed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
},
{
"status": "failed",
"min": "66ffcf79d9249bef96de3611",
"max": "66ffcf79d9249bef96de3611"
}
]
}
}
]
}
once all the chunks succeed delete chunks from the state and through it. if not succeed then through as it is.
In CDC syncs it is only possible (resumable full refresh) if cursor ID is available in wal logs or oplogs
// example 2
{
"level": "stream", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_3",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field" : "66ffcf79d9249bef96de3611"// max cursor id in chunks or resume token checkpointed on start of full load
},
}
]
}
Once each thread sends back the state, the main connector will append the state at the global level.
Now connector base gets all stream responses and saves them as global
{
"level": "global", // global or stream
"version":"v1", // if required
"streams": [
{
"sync_type": "incremental",
"stream_name": "stream_1",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "66ffcf79d9249bef96de3611",
}
},
{
"sync_type": "incremental",
"stream_name": "stream_2",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "67ffcf79d9249bef96de3611",
}
},
{
"sync_type": "incremental",
"stream_name": "stream_3",
"stream_namespace": "otter_db",
"stream_state": {
"cursor_field": "67ffcf79d9249bef96de3611",
}
}
]
}
There is no difference in parallel run or horizontal scale run in terms of state, state will be handled similarly in both cases just management of global level state is passed from connector level to SDK level
Example 2
Two streams CDC one full load
step 1: full load stream detection and split of state stream level
Every stream has its cursor field so can start CDC from there
the full load stream can start full load by creating chunks and saving them in state. As well as saving the current resume token or wal cursor.
Example 3
All CDC
All streams can pick up their states and use the cursor field to start CDC sync, some connectors may focus on synchronous CDC, for that resume token of each stream must equal or we have to choose the minimum cursor from all the streams to start from.