Skip to main content

Overview

The OLake S3 Source connector ingests data from Amazon S3 or S3-compatible storage (MinIO, LocalStack). It supports multiple file formats with intelligent folder-based stream grouping, incremental sync, and parallel processing. This connector can be used within the OLake UI or run locally via Docker for open-source workflows.

Key Features​

  • Multi-Format Support: CSV, JSON (JSONL/Array), and Parquet files
  • Automatic Compression Handling: Transparent gzip decompression (.gz files)
  • Folder-Based Streams: Groups files by top-level folder into logical streams
  • Incremental Sync: Tracks changes using S3 LastModified timestamps
  • Parallel Processing: Configurable concurrent file processing
  • S3-Compatible Services: AWS S3, MinIO, LocalStack, and other S3 APIs

Sync Modes Supported​

  • Full Refresh: Syncs all files in the bucket/prefix every run
  • Incremental: Syncs only new or modified files using LastModified cursor

File Format Support​

CSV Files​

  • Plain and gzip compressed (.csv, .csv.gz)
  • Configurable delimiter, header detection, quote character
  • Automatic schema inference from header and data sampling

JSON Files​

  • JSONL (line-delimited), JSON Array, and Single Object formats
  • Plain and gzip compressed (.json, .jsonl, .json.gz)
  • Automatic format detection and schema inference

Parquet Files​

  • Native columnar format with full type support
  • Schema read directly from file metadata
  • Efficient streaming with S3 range requests

How Stream Grouping Works​

Files are automatically grouped into streams based on folder structure:

Example Bucket Structure:

s3://my-bucket/data/
β”œβ”€β”€ users/
β”‚ β”œβ”€β”€ 2024-01-01/users.parquet
β”‚ └── 2024-01-02/users.parquet
β”œβ”€β”€ orders/
β”‚ β”œβ”€β”€ 2024-01-01/orders.parquet
β”‚ └── 2024-01-02/orders.parquet
└── products/
└── products.csv.gz

Result: 3 streams created - users, orders, products

Folder Grouping

Stream grouping occurs at level 1 (first folder after path_prefix). All files within the same top-level folder are grouped into one stream, regardless of subfolder structure.

Prerequisites​

Version Prerequisites​

  • AWS S3 (any version) or S3-compatible service (MinIO 2020+, LocalStack 0.12+)
  • File formats: CSV, JSON (JSONL/Array/Object), or Parquet

Connection Prerequisites​

  • Read access to S3 bucket (s3:ListBucket, s3:GetObject)
  • AWS Authentication (choose one):
    • IAM roles, environment variables, or instance profiles (recommended)
    • Static access key and secret key credentials
  • Network connectivity to S3 endpoint

For MinIO/LocalStack: Custom endpoint URL required (e.g., http://localhost:9000)

For AWS S3: Credentials are optional. If omitted, the driver uses AWS default credential chain (IAM roles, environment variables, instance profiles, ECS task roles, etc.)

IAM Policy for S3 Source Access:

s3-source-iam-policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::<YOUR_S3_BUCKET>"
]
},
{
"Effect": "Allow",
"Action": [
"s3:GetObject"
],
"Resource": [
"arn:aws:s3:::<YOUR_S3_BUCKET>/*"
]
}
]
}
tip

Replace <YOUR_S3_BUCKET> with your actual S3 bucket name. This policy provides read-only access required for the OLake S3 source connector.

After prerequisites are fulfilled, configurations can be set up.


Configuration​

1. Navigate to the Source Configuration Page​

  1. Complete the OLake UI Setup Guide
  2. After logging in to the OlakeUI, select the Sources tab from the left sidebar
  3. Click Create Source on the top right corner
  4. Select S3 from the connector dropdown
  5. Provide a name for this source

2. Provide Configuration Details​

FieldDescriptionExample Value
Bucket Name requiredS3 bucket name (without s3:// prefix)my-data-warehouse
Region requiredAWS region where bucket is hostedus-east-1
Path PrefixOptional path prefix to filter filesdata/
Access Key IDAWS access key for authentication (optional - see note)<YOUR_KEY>
Secret Access KeyAWS secret key for authentication (optional - see note)<YOUR_SECRET>
File Format requiredFormat of files to syncparquet (CSV/JSON/Parquet)
Max ThreadsNumber of concurrent file processors10
Retry CountNumber of retry attempts for failures3
AWS Authentication

Access Key ID and Secret Access Key are optional for AWS S3. If omitted, the driver uses AWS default credential chain (IAM roles, environment variables, instance profiles, ECS task roles). This is the recommended approach for production deployments. If you provide one credential, you must provide both.

S3-Compatible Services (MinIO, LocalStack)

For MinIO or LocalStack, use the same field structure but add the Endpoint field:

MinIO:

  • Endpoint: http://minio:9000
  • Access Key ID: minioadmin
  • Secret Access Key: minioadmin

LocalStack:

  • Endpoint: http://localhost:4566
  • Access Key ID: test
  • Secret Access Key: test

The Endpoint field is required for non-AWS S3 services.

3. Format-Specific Configuration​

Additional fields for CSV files:

FieldDescriptionDefaultExample
DelimiterField separator character,; or \t
Has HeaderWhether first row contains column namestruetrue
Skip RowsNumber of rows to skip at beginning02
Quote CharacterCharacter for quoting fields"'

Compression: Automatically detected from file extension (.csv.gz = gzipped)

4. Test Connection​

  • Once the connection is validated, the S3 source is created. Jobs can then be configured using this source
  • In case of connection failure, refer to the Troubleshooting section

Data Type Mapping​

File FormatSource Data TypeDestination Data TypeNotes
CSVInferred from datastringAll CSV fields initially treated as strings
CSVNumeric patternsint, bigint, doubleInteger and floating-point numbers auto-detected
CSVISO 8601 datestimestamptzDate/datetime strings converted to timestamp
CSVBoolean valuesbooleantrue/false strings converted to boolean
JSONstringstringJSON string fields
JSONnumber (integer)bigintJSON integer values
JSONnumber (float)doubleJSON floating-point values
JSONbooleanbooleanJSON boolean values
JSONobject, arraystringNested objects/arrays serialized to JSON strings
JSONnullstringNull values converted to empty strings
ParquetSTRING, BINARYstringParquet string types
ParquetINT32, INT64int, bigintParquet integer types
ParquetFLOAT, DOUBLEfloat, doubleParquet floating-point types
ParquetBOOLEANbooleanParquet boolean type
ParquetTIMESTAMP_MILLIStimestamptzParquet timestamp types
ParquetDATEdateParquet date type
ParquetDECIMALfloatParquet decimal types converted to float64
All Formats_last_modified_timetimestamptzS3 LastModified metadata (added by connector)
Schema Inference
  • CSV: Uses AND logic - examines all sampled rows to determine most restrictive type
  • JSON: Auto-detects types from JSON primitives
  • Parquet: Schema read directly from file metadata (no inference needed)
timestamptz timezone

OLake always ingests timestamp data in UTC format, independent of the source timezone.


Date and Time Handling​

During transfer, values in date, time, and timestamp columns are modified to ensure valid calendar ranges and destination compatibility.

  • Case I (Year 0000):
    Source dates with year 0000 are not valid in most destinations, so we change them to the epoch start date.
    Example: 0000-05-10 β†’ 1970-01-01
  • Case II (Year > 9999):
    Extremely large years are capped at 9999. The month and date are not affected.
    Examples: 10000-03-12 β†’ 9999-03-12
  • Case III (Invalid month/day):
    When the month or day exceeds valid ranges (i.e. month > 12 or day > 31), or the combined date is invalid, the value is replaced with the epoch start date.
    Examples: 2024-13-15 β†’ 1970-01-01, 2023-04-31 β†’ 1970-01-01
Note

These rules apply to date, time, and timestamp columns during transfer.


Incremental Sync Details​

The S3 connector uses S3's LastModified timestamp as a cursor for incremental syncs:

How it works:

  1. Discovery phase adds _last_modified_time field to each stream
  2. During sync, each record is injected with the file's LastModified timestamp
  3. State file tracks the latest _last_modified_time per stream
  4. Subsequent syncs only process files with LastModified > last_synced_timestamp

State Example:

{
"users": {
"_last_modified_time": "2024-01-15T10:30:00Z"
},
"orders": {
"_last_modified_time": "2024-01-15T11:45:00Z"
}
}

Benefits:

  • Dramatically reduces sync time for large buckets
  • Only transfers changed data
  • Per-stream cursor tracking for granular control
File Modifications

If a file's content changes and is re-uploaded to S3, it will be re-synced in incremental mode because S3 updates the LastModified timestamp.


Troubleshooting​

1. Connection Failed - Access Denied​

ERROR failed to list objects: AccessDenied: Access Denied

Cause: Insufficient IAM permissions or incorrect credentials

Solution:

  • If using static credentials: Verify access key and secret key are correct
  • If using IAM roles: Ensure the IAM role has proper S3 permissions attached
  • Check IAM policy includes:
    {
    "Effect": "Allow",
    "Action": ["s3:ListBucket", "s3:GetObject"],
    "Resource": [
    "arn:aws:s3:::bucket-name",
    "arn:aws:s3:::bucket-name/*"
    ]
    }
  • AWS Credential Chain Order:
    1. Static credentials in config (access_key_id, secret_access_key)
    2. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
    3. IAM role attached to EC2 instance/ECS task
    4. AWS credentials file (~/.aws/credentials)
  • For MinIO/LocalStack: Ensure credentials match server configuration

2. No Streams Discovered​

Cause: Files not organized in folders or incorrect path_prefix

Solution:

  • S3 connector requires folder structure: bucket/prefix/stream_name/files
  • Check path_prefix matches your structure
  • Verify file extensions match format (.csv, .json, .parquet)
  • Example command to verify:
    aws s3 ls s3://bucket-name/prefix/ --recursive

3. Schema Inference Failed - CSV​

ERROR failed to infer schema: invalid delimiter or header configuration

Cause: Incorrect CSV configuration

Solution:

  • Verify has_header matches file (check first row)
  • Check delimiter is correct (, vs ; vs \t)
  • Ensure all rows have same column count
  • Test with a small sample file first

4. JSON Format Not Detected​

Cause: Mixed JSON formats in same folder or invalid JSON

Solution:

  • Keep JSONL and JSON Array formats in separate folders/streams
  • Validate JSON syntax: jq . < file.json
  • Ensure consistent field names across records
  • Check for trailing commas or syntax errors

5. Parquet File Cannot Be Read​

ERROR failed to read parquet schema: not a parquet file

Cause: Corrupted file or invalid Parquet format

Solution:

  • Verify file with parquet-tools: parquet-tools schema file.parquet
  • Check file wasn't corrupted during upload
  • Ensure file extension is .parquet (not .pq or other)
  • Re-upload file from source

6. Incremental Sync Not Working​

Cause: State file not persisted or incorrect sync_mode

Solution:

  • Verify state.json file location is writable
  • Check catalog has sync_mode: "incremental"
  • Ensure cursor_field: "_last_modified_time" is set
  • Confirm state file is being passed to sync command:
    --state /path/to/state.json

7. MinIO Connection Timeout​

ERROR dial tcp: i/o timeout

Cause: Network connectivity or incorrect endpoint

Solution:

  • Check MinIO is running: docker ps | grep minio
  • Test endpoint: curl http://localhost:9000/minio/health/live
  • Verify endpoint format: http://hostname:9000 (include protocol)
  • For Docker: Use container name instead of localhost

8. Files Not Syncing Despite Being Present​

Cause: File extension mismatch or compression not detected

Solution:

  • Ensure file extensions match format:
    • CSV: .csv or .csv.gz
    • JSON: .json, .jsonl, .json.gz, .jsonl.gz
    • Parquet: .parquet
  • Check file size is non-zero: aws s3 ls s3://bucket/prefix/ --recursive --human-readable
  • Verify files are in correct folder structure

9. Out of Memory Errors​

FATAL runtime: out of memory

Cause: Too many large files processed concurrently

Solution:

  • Reduce max_threads in configuration (try 3-5)
  • Process fewer streams at once
  • Split very large files (>5GB) before upload
  • Increase container memory limits

10. Permission Denied - LocalStack​

Cause: LocalStack IAM policy simulator

Solution:

  • LocalStack accepts any credentials by default (test/test)
  • Ensure endpoint is correct: http://localhost:4566
  • Check LocalStack is running: docker ps | grep localstack
  • Verify bucket exists: awslocal s3 ls

If the issue is not listed here, post the query on Slack to get it resolved within a few hours.


Changelog​

Date of ReleaseVersionDescription
TBDv0.4.0Initial S3 source connector release



πŸ’‘ Join the OLake Community!

Got questions, ideas, or just want to connect with other data engineers?
πŸ‘‰ Join our Slack Community to get real-time support, share feedback, and shape the future of OLake together. πŸš€

Your success with OLake is our priority. Don’t hesitate to contact us if you need any help or further clarification!