Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion][WIP] Introduce the new pull-based ingestion engine, APIs, and Kafka plugin #16958

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

yupeng9
Copy link

@yupeng9 yupeng9 commented Jan 6, 2025

Description

This PR implements the basics of the pull-based ingestion described in this RFC, including:

  1. The APIs for the pull-based ingestion source
  2. A Kafka plugin that implements the ingestion source API
  3. A new IngestionEngine that pulls data from the ingestion sources

Currently WIP, and there are a few improvements to make and test coverage to increase

Related Issues

Resolves #16927 #16929 #16928

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions github-actions bot added enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing labels Jan 6, 2025
Copy link
Contributor

github-actions bot commented Jan 6, 2025

❌ Gradle check result for 16dd9d0: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Comment on lines 126 to 140
new Translog.Snapshot() {
@Override
public void close() {}

@Override
public int totalOperations() {
return 0;
}

@Override
public Translog.Operation next() {
return null;
}
}
);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe create a static EMPTY_TRANSLOG_SNAPSHOT and reuse across this and NoOpEngine

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. Let me explore that

Comment on lines +147 to +151
String clientId = engineConfig.getIndexSettings().getNodeName()
+ "-"
+ engineConfig.getIndexSettings().getIndex().getName()
+ "-"
+ engineConfig.getShardId().getId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use ids instead of names like index uuid, node id etc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mainly for monitoring and operation, for example, kafka supports quota set by client-id. as long as we can uniquely identify a streaming consumer, it's sufficient. any suggestion?

Copy link
Collaborator

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how would the FGAC security model work, espl with security plugin which intercepts transport actions to validate if authorised users can perform bulk actions on certain indices. Is the intent to handle permissions at a Kafka "partition level"
Another aspect is maintaining Kafka checkpoints durably, I'm yet to read that part but would be good to understand how are we handling fail overs and recoveries

*
* @opensearch.api
*/
public interface IngestionConsumerPlugin {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put the @ExperimentalApi annotation on this as well

*/

/** Indices ingestion module package. */
package org.opensearch.indices.ingest;
Copy link
Member

@andrross andrross Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The term "ingest" is definitely overloaded. _bulk is a type of ingestion, there are ingest pipelines, etc. I'd suggest using polling.ingest or pollingingest or anything else that helps disambiguate this area of the code from the other ingest related pieces.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. pollingingest sounds good to me.

private final TranslogManager translogManager;
private final DocumentMapperForType documentMapperForType;
private final IngestionConsumerFactory ingestionConsumerFactory;
protected StreamPoller streamPoller;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like streamPoller is assigned in the constructor and never accessed outside this class. Why is it not private final?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i had some tests that used it but I changed to a different way of testing. let me keep it private

}

versions << [
'kafka': '2.8.2',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite old (September 2022 according to https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients). Why not use the newest available?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I set it to test with Uber internal kafka, which we are using 2.8.2. Let me upgrade it

result = blockingQueue.poll(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// TODO: add metric
logger.debug("ConcurrentSiaStreamsPoller poll interruptedException", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentSiaStreamsPoller?

Comment on lines +178 to +179
streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState);
streamPoller.start();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pattern here of sending the this pointer out of the constructor is a bit concerning to me. This allows DefaultStreamPoller to observe a partially-constructed instance of IngestionEngine. If, for example, a future refactoring added a new final field to this class and it were initialized after this line, then DefaultStreamPoller would be able to observe an uninitialized final field. Alternatively, if streamPoller.start() throws an exception then that streamPoller instance will still have a reference to this instance, which never successfully completed its constructor. Is there a way to structure these classes to avoid these problems?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly need it to pass the engine to MessageProcessor, because MessageProcessor can invoke the index/delete operations on the engine.

you have a valid point, and ideally we can call engine.start, which can pass the pointer to the poller to start. do you feel it's viable with the existing Engine interface design?

@yupeng9
Copy link
Author

yupeng9 commented Jan 9, 2025

Curious how would the FGAC security model work, espl with security plugin which intercepts transport actions to validate if authorised users can perform bulk actions on certain indices. Is the intent to handle permissions at a Kafka "partition level" Another aspect is maintaining Kafka checkpoints durably, I'm yet to read that part but would be good to understand how are we handling fail overs and recoveries

that's a good question. For kafka-related access control, the policies can be passed as ingestion config params, so the created consumer can handle the kafka access, which is separated from OpenSearch permission management. However, the access control in your example if authorised users can perform bulk actions on certain indices that we can carry some existing permission handling policy and implement the control in the MessageHandler. This requires some thinking and modeling, and I can create an issue to track.

api "org.apache.kafka:kafka-clients:${versions.kafka}"

// test
api "com.github.docker-java:docker-java-api:${versions.docker}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this docker dependency should also be testImplementation scope?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@@ -194,4 +194,15 @@ grant {
permission java.io.FilePermission "/sys/fs/cgroup/cpuacct/-", "read";
permission java.io.FilePermission "/sys/fs/cgroup/memory", "read";
permission java.io.FilePermission "/sys/fs/cgroup/memory/-", "read";

//TODO: enable these policies to plugin-security.policy in kafka policy
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can safely remove these added permissions and rely on the plugin policy. Though you'll need to wrap the privileged call in AccessController.doPrivileged during consumer creation -

        return AccessController.doPrivileged((PrivilegedAction<Consumer<byte[], byte[]>>) () -> {
            return new KafkaConsumer<>(consumerProp, new ByteArrayDeserializer(), new ByteArrayDeserializer());
        });

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awesome. that works. thanks!

import java.util.stream.Collectors;
import java.util.stream.Stream;

public class KafkaPluginIT extends OpenSearchIntegTestCase {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a nitpick, you can combine this with the other IT so the tests run against the same cluster and save a bit of time. The default test scope is SUITE which will spin up one test cluster and run all cases.

private EngineFactory getEngineFactory(final IndexSettings idxSettings) {
final IndexMetadata indexMetadata = idxSettings.getIndexMetadata();
if (indexMetadata != null && indexMetadata.getState() == IndexMetadata.State.CLOSE) {
// NoOpEngine takes precedence as long as the index is closed
return NoOpEngine::new;
}

// streaming ingestion
if (indexMetadata != null && indexMetadata.useIngestionSource()) {
return new IngestionEngineFactory();
Copy link
Member

@mch2 mch2 Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can add IngestionConsumerFactory as a 2nd ctor param to IngestionEngine and pass through here, wdyt? Then we don't need to plumb it through IndexService/Shard.

You could also build the IngestionSource config object here and avoid having to send it over the wire & bake it into IndexMetadata.

Copy link
Contributor

❌ Gradle check result for b8923ca: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature Request] Pull-based ingestion source APIs
4 participants