Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion docs/features/opensearch/pull-based-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ GET /<index>/ingestion/_state
| `total_consumer_error_count` | Consumer-level errors |
| `total_poller_message_failure_count` | Poller message failures |
| `total_poller_message_dropped_count` | Messages dropped by poller |
| `total_duplicate_message_skipped_count` | Duplicate messages skipped |
| `total_duplicate_message_skipped_count` | Duplicate messages skipped (deprecated in v3.4.0) |
| `lag_in_millis` | Ingestion lag in milliseconds |

## Limitations
Expand All @@ -193,11 +193,14 @@ GET /<index>/ingestion/_state
- Index shards must be >= stream partitions
- Traditional REST API ingestion disabled for pull-based indexes
- Consumer reset only works when ingestion is paused
- Provides at-least-once processing guarantees (use versioning for exactly-once semantics)

## Related PRs

| Version | PR | Description |
|---------|-----|-------------|
| v3.4.0 | [#19607](https://github.com/opensearch-project/OpenSearch/pull/19607) | Fix out-of-bounds offset scenarios and remove persisted offsets |
| v3.4.0 | [#19757](https://github.com/opensearch-project/OpenSearch/pull/19757) | Fix file-based ingestion consumer start point handling |
| v3.3.0 | [#19316](https://github.com/opensearch-project/OpenSearch/pull/19316) | Support all-active mode in pull-based ingestion |
| v3.3.0 | [#19320](https://github.com/opensearch-project/OpenSearch/pull/19320) | Fix ingestion state XContent serialization and fail fast on parsing errors |
| v3.3.0 | [#19393](https://github.com/opensearch-project/OpenSearch/pull/19393) | Fix lag metric when streaming source is empty |
Expand All @@ -213,6 +216,8 @@ GET /<index>/ingestion/_state

## References

- [Issue #19591](https://github.com/opensearch-project/OpenSearch/issues/19591): Duplicate/old message skipping bug
- [Issue #19723](https://github.com/opensearch-project/OpenSearch/issues/19723): File-based ingestion flaky test
- [Issue #19287](https://github.com/opensearch-project/OpenSearch/issues/19287): All-active mode feature request
- [Issue #19286](https://github.com/opensearch-project/OpenSearch/issues/19286): XContent serialization bug
- [Issue #17077](https://github.com/opensearch-project/OpenSearch/issues/17077): Metrics for pull-based ingestion
Expand All @@ -224,6 +229,7 @@ GET /<index>/ingestion/_state

## Change History

- **v3.4.0**: Fixed out-of-bounds offset handling by setting Kafka `auto.offset.reset` to `none` by default. Removed persisted pointers concept to fix correctness issues during consumer rewind. Pull-based ingestion now provides at-least-once processing guarantees. Deprecated `totalDuplicateMessageSkippedCount` metric. Fixed file-based ingestion consumer to track line numbers when start point exceeds file length.
- **v3.3.0**: Added all-active ingestion mode enabling replica shards to independently ingest from streaming sources. Fixed ingestion state XContent serialization for remote cluster state compatibility. Fixed lag metric calculation when streaming source is empty. Fixed pause state initialization during replica promotion. Added fail-fast behavior for mapper/parsing errors.
- **v3.2.0**: Added `ingestion-fs` plugin for file-based ingestion, enabling local testing without Kafka/Kinesis setup. Files follow `${base_directory}/${stream}/${shard_id}.ndjson` convention.
- **v3.1.0**: Added lag metrics, error metrics, configurable queue size, transient failure retries, create mode, cluster write block support, consumer reset in Resume API. Breaking change: renamed `REWIND_BY_OFFSET`/`REWIND_BY_TIMESTAMP` to `RESET_BY_OFFSET`/`RESET_BY_TIMESTAMP`.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Pull-based Ingestion Bugfixes

## Summary

This release fixes critical bugs in pull-based ingestion related to out-of-bounds offset handling and removes the persisted pointers concept. These changes improve reliability when dealing with Kafka retention expiration and consumer rewind scenarios, providing at-least-once processing guarantees.

## Details

### What's New in v3.4.0

Two key bugfixes improve pull-based ingestion reliability:

1. **Out-of-bounds offset handling**: Fixed scenarios where Kafka offsets become invalid (e.g., after retention period expiration)
2. **File-based ingestion flaky test fix**: Fixed the file-based consumer to properly track line numbers when start point exceeds file length

### Technical Changes

#### Removed Persisted Pointers

The persisted pointers concept has been removed to fix correctness issues during consumer rewind when versioning is not used:

| Before v3.4.0 | After v3.4.0 |
|---------------|--------------|
| Persisted pointers tracked processed offsets | No persisted pointer tracking |
| Duplicate detection based on stored offsets | At-least-once processing guarantee |
| Could skip latest messages on rewind | Consistent behavior on rewind |

#### Kafka Consumer Configuration Changes

| Setting | Before | After | Description |
|---------|--------|-------|-------------|
| `auto.offset.reset` | User-defined | `none` (default) | Throws error on out-of-bounds offsets |
| `max.poll.records` | Not used | Uses `poll.max_batch_size` | Controls batch size at consumer level |

#### API Changes

The `IngestionConsumerFactory.initialize()` method signature changed:

```java
// Before
void initialize(Map<String, Object> params);

// After
void initialize(IngestionSource ingestionSource);
```

This allows consumer factories to access additional configuration like `maxPollSize` from the ingestion source.

#### Deprecated Metrics

| Metric | Status |
|--------|--------|
| `totalDuplicateMessageSkippedCount` | Deprecated (always 0) |

### Usage Example

When Kafka offsets become out-of-bounds (e.g., after retention expiration), the consumer now throws an error by default:

```json
PUT /my-index
{
"settings": {
"ingestion_source": {
"type": "kafka",
"pointer.init.reset": "earliest",
"param": {
"topic": "my-topic",
"bootstrap_servers": "localhost:9092",
"auto.offset.reset": "earliest"
}
}
}
}
```

Users can override `auto.offset.reset` to `earliest` or `latest` to handle out-of-bounds offsets automatically.

### Migration Notes

1. **Versioning recommended**: Use document versioning (`_version` field) to ensure consistent document views on consumer rewind/replay
2. **Monitor consumer errors**: With `auto.offset.reset=none`, out-of-bounds offsets will cause errors that should be monitored
3. **Duplicate handling**: Without persisted pointers, duplicates may occur on rewind - use versioning for exactly-once semantics

## Limitations

- Pull-based ingestion provides at-least-once processing guarantees (not exactly-once without versioning)
- `totalDuplicateMessageSkippedCount` metric is deprecated and will be removed in a future version

## Related PRs

| PR | Description |
|----|-------------|
| [#19607](https://github.com/opensearch-project/OpenSearch/pull/19607) | Fix pull-based ingestion out-of-bounds offset scenarios and remove persisted offsets |
| [#19757](https://github.com/opensearch-project/OpenSearch/pull/19757) | Fix file-based ingestion consumer to handle start point beyond max line number |

## References

- [Issue #19591](https://github.com/opensearch-project/OpenSearch/issues/19591): Bug report for duplicate/old message skipping logic
- [Issue #19723](https://github.com/opensearch-project/OpenSearch/issues/19723): Flaky test report for FileBasedIngestionSingleNodeTests
- [Documentation](https://docs.opensearch.org/3.0/api-reference/document-apis/pull-based-ingestion/): Pull-based ingestion

## Related Feature Report

- [Full feature documentation](../../../../features/opensearch/pull-based-ingestion.md)
1 change: 1 addition & 0 deletions docs/releases/v3.4.0/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
- [Cluster State & Allocation Bugfixes](features/opensearch/cluster-state-allocation-bugfixes.md) - Fix concurrent modification in allocation filters and version compatibility in remote state
- [Data Stream & Index Template Bugfixes](features/opensearch/data-stream-index-template-bugfixes.md) - Fix deletion of unused index templates matching data streams with lower priority
- [GRPC Transport Bugfixes](features/opensearch/grpc-transport-bugfixes.md) - Fix ClassCastException for large requests, Bulk API fixes, and node bootstrap with streaming transport
- [Pull-based Ingestion Bugfixes](features/opensearch/pull-based-ingestion-bugfixes.md) - Fix out-of-bounds offset handling and remove persisted pointers for at-least-once guarantees
- [Reactor Netty Transport](features/opensearch/reactor-netty-transport.md) - Fix HTTP channel tracking and release during node shutdown
- [Snapshot & Restore Bugfixes](features/opensearch/snapshot-restore-bugfixes.md) - Fix NullPointerException when restoring remote snapshot with missing shard size information

Expand Down