Skip to main content

Overview

The OLake Kafka Source connector syncs messages from Kafka topics directly to the destination. It supports only one synchronization mode and offers features like parallel partition processing and checkpointing. This connector can be used within the OLake UI or run locally via Docker for open-source workflows.

Important: Kafka Retention Period

Kafka topics have a configured retention period. Once messages exceed this retention period, they are automatically deleted from the topic. OLake cannot access or sync deleted messages. Ensure your sync schedule runs frequently enough to capture data before it expires.

note

Kafka Connector Requirements

  • Kafka as a source connector is available in OLake v0.3.0 and above on both the source and destination side.
  • The OLake Kafka connector has been tested specifically with Amazon MSK (Amazon Managed Streaming for Apache Kafka), AWS's fully managed Apache Kafka service. The connector uses standard Kafka protocols and should work with other Kafka distributions.

Authentication​

OLake supports three authentication methods for connecting to Kafka brokers:

1. PLAINTEXT​

No authentication or encryption. Suitable only for development or testing environments where security is not a concern.

  • Security Protocol: PLAINTEXT
  • Configuration: No additional authentication parameters required

2. SASL_PLAINTEXT​

Authentication without encryption. Credentials are verified, but data is transmitted in plaintext.

  • Security Protocol: SASL_PLAINTEXT
  • Supported SASL Mechanisms:
    • PLAIN: Simple username/password authentication
    • SCRAM-SHA-512: Salted Challenge Response Authentication Mechanism with SHA-512 hashing

Example Configuration:

"protocol": {
"security_protocol": "SASL_PLAINTEXT",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_jaas_config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"YOUR_USERNAME\" password=\"YOUR_PASSWORD\";"
}

3. SASL_SSL​

Authentication with SSL/TLS encryption. This is the most secure option, providing both authentication and encrypted communication.

  • Security Protocol: SASL_SSL
  • Supported SASL Mechanisms:
    • PLAIN: Simple username/password authentication over SSL
    • SCRAM-SHA-512: Salted Challenge Response Authentication Mechanism with SHA-512 hashing over SSL

Example Configuration:

"protocol": {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_jaas_config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"YOUR_USERNAME\" password=\"YOUR_PASSWORD\";"
}

Prerequisites​

Version Prerequisites​

Kafka Version 0.10.1.0 or higher.

Connection Prerequisites​

  • OLake and the Kafka broker servers must be accessible within the same network.

After initial Prerequisites are fulfilled, the configurations for Kafka can be configured.


Configuration​

1. Navigate to the Source Configuration Page​

  1. Complete the OLake UI Setup Guide
  2. After logging in to the OLake UI, select the Sources tab from the left sidebar.
  3. Click Create Source on the top right corner.
  4. Select Kafka from the connector dropdown
  5. Provide a name for this source.

2. Provide Configuration Details​

  • Enter Kafka credentials.

    OLake Kafka source setup with endpoint and SSH options
FieldDescriptionExample Value
Kafka BootstrapServers requiredComma-separated list of Kafka broker addresses (host:port) for establishing the initial connection to the Kafka cluster."broker:9092, broker:9093"
Consumer Group IDUnique identifier for the consumer group used to track messages and coordinate with consumer members. If not provided, OLake automatically generates one.example-consumer-group
Protocol requiredConfiguration object containing Kafka security settings and authentication configurations for connecting to Kafka. Sub-parameters: security_protocol, sasl_mechanism, sasl_jaas_config.{
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_jaas_config": "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"YOUR_KAFKA_USERNAME\" password=\"YOUR_KAFKA_PASSWORD\";"
}
Security Protocol requiredProtocol used for communication with Kafka.
Supported options: PLAINTEXT, SASL_PLAINTEXT, or SASL_SSL.
SASL_SSL
SASL MechanismSpecifies the type of SASL authentication protocol used to verify client identity when connecting to Kafka.
Supported options: PLAIN, SCRAM-SHA-512.
"SCRAM-SHA-512"
SASL JAAS ConfigJAAS configuration string containing the login module and authentication credentials (username and password) for SASL authentication.If SASL mechanism is PLAIN:
"org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";"
If SASL mechanism is SCRAM-SHA-512:
"org.apache.kafka.common.security.scram.ScramLoginModule required username="username" password="password";"
Threads Equal Total PartitionsWhen true, the number of consumer thread is equal to total number of partitions for most optimal parallel processing. When false, consumer threads are set to the max_threads value.false
Max ThreadsMaximum number of parallel threads for processing or syncing data.3
Backoff Retry CountNumber of retry attempts for establishing sync with exponential backoff.3

3. Test Connection​

  • Once the connection is validated, the Kafka source is created. Jobs can then be configured using this source.

  • In case of connection failure, refer to the Troubleshooting section.


Kafka General Information​

When OLake syncs data from Kafka topics to Iceberg, it transfers the complete message structure along with Kafka metadata. This ensures full traceability and context for each message.

Columns Transferred​

The following columns are automatically included when syncing Kafka messages to the destination:

ColumnDescription
offsetThe sequential position of the message within its partition. Guarantees message ordering within a partition.
partitionThe Kafka partition number where the message resides. Used for parallel processing and data distribution.
kafka_timestampThe timestamp when the message was produced or appended to the Kafka topic.
messagesThe message payload is shown as stringified JSON.
JSON Format Requirement

The messages field must contain valid JSON data. If a message value is empty, null, or contains non-JSON data, OLake will read the message but skip writing it to the destination. Only messages with valid JSON values are transferred to Iceberg tables.

Normalization​

OLake provides flexibility in how JSON message values are stored:

  • Normalization = true: The JSON payload in the value column is Level 0 flattened, and each nested field becomes a separate column in the destination table. This makes querying individual fields easier.

  • Normalization = false: The JSON payload is stored as-is in the value column without flattening. The entire JSON object remains in a single column.


Troubleshooting​

1. Consumer Group Corruption​

If a consumer group becomes corrupted, you'll need to use a different consumer group to continue syncing data. The resolution depends on how you're running OLake.

Using OLake UI​

Consumer group names cannot be modified once set in the OLake UI. To resolve a corrupted consumer group, create a new job with a different consumer group ID.

Using OLake CLI​

When using the CLI, you can modify the consumer group in your configuration file:

  1. Open your source.json file
  2. Update the consumer_group_id field with a new consumer group name.
  3. Save the file and run the sync command again

The sync will continue with the new consumer group.

If the issue is not listed here, post the query on Slack to get it resolved within a few hours.




πŸ’‘ Join the OLake Community!

Got questions, ideas, or just want to connect with other data engineers?
πŸ‘‰ Join our Slack Community to get real-time support, share feedback, and shape the future of OLake together. πŸš€

Your success with OLake is our priority. Don’t hesitate to contact us if you need any help or further clarification!