-
Notifications
You must be signed in to change notification settings - Fork 25k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability #32743
[Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability #32743
Conversation
Pinging @elastic/es-search-aggs |
public static final ConstructingObjectParser<RollupJobStats, Void> PARSER = | ||
new ConstructingObjectParser<>(NAME.getPreferredName(), | ||
args -> new RollupJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3])); | ||
public class RollupJobStats extends IndexerStats { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is empty as there is nothing in there specific to rollup right now, but that might change.
Note: I changed field names! So this break BWC for the get rollup jobs api. Seeking advise, whether this is a problem and whether I should implement a compat layer here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use the deprecated names feature of ParseField
so we avoid a bwc break here. Its true that rollups is experimental so we can in theory make bwc breaks without notice but I think where its easy for us to deprecate we should 😄
* | ||
* @param <JobPosition> Type that defines a job position to be defined by the implementation. | ||
*/ | ||
public abstract class IterativeIndexer<JobPosition> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: The flexibility of JobPosition is a requirement for my usage of this class.
/cc @jimczi too |
to be reverted and replaced by final version of this PR
This is a temporary (squashed) commit of #32743 till ed4feb6, to be reverted and replaced by the final version of this PR
builder.field(NUM_DOCUMENTS.getPreferredName(), numDocuments); | ||
builder.field(NUM_ROLLUPS.getPreferredName(), numRollups); | ||
builder.field(NUM_INPUT_DOCUMENTS.getPreferredName(), numInputDocuments); | ||
builder.field(NUM_OUTPUT_DOCUMENTS.getPreferredName(), numOuputDocuments); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Piggybacking on @colings86's comment, I think we might need a conditional here for 6.5 when running under Rollup context? Haven't completely convinced myself this is needed, but I think it is for display purposes.
E.g. pass in some kind of flag in Params
(from GetRollupJobsAction#JobWrapper.toXContent()
) so that we can use the deprecated name for rollups. There might be a different/better way to handle this, not sure... never done this sort of change before.
Luckily this isn't persisted anywhere, so we don't have to worry about that sort of bwc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote some wrapping code that brings back the old state: https://github.com/elastic/elasticsearch/pull/32743/files#diff-6e8a57a441bd0c25d6ac6534929d0ef0R132
Gave it a read through, I like how things are falling out. I was never keen on how big RollupIndexer was, so separating some of this logic out is nice. Going to let it stew in my brain over the weekend and give it another, deeper look on Monday. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few minor comments. I'm not sure at the moment if its safe for the IterativeIndexer
to hold and update the state
and position
independent of the persistent task but I need to think on this a bit more
this.numPages = numPages; | ||
this.numDocuments = numDocuments; | ||
this.numRollups = numRollups; | ||
this.numInputDocuments = numDocuments; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we call both sides of this numInputDocuments
?
* Result object to hold the result of 1 iteration of iterative indexing. | ||
* Acts as an interface between the implementation and the generic indexer. | ||
*/ | ||
public class Iteration<JobPosition> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: I wonder if we should call this IterationResult
as Iteration
made me think it was the thing doing the work rather than just holding the result of an iteration?
*/ | ||
public boolean isDone() { | ||
return isDone; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this not belong in JobPosition
?
addressed the review comments:
|
@@ -84,6 +84,11 @@ public long getOutputDocuments() { | |||
return numOuputDocuments; | |||
} | |||
|
|||
@Deprecated | |||
public long getNumRollups() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely positive if this is required... but I think it is. We expose the IndexerJobStats object in the GetRollupJob API, so I think we need to keep this method around because anyone using the Java client could potentially be using it.
I think :)
private static ParseField NUM_ROLLUPS = new ParseField("rollups_indexed"); | ||
private static ParseField NUM_INPUT_DOCUMENTS = new ParseField("documents_processed"); | ||
// BWC for RollupJobStats | ||
public static String ROLLUP_BWC_XCONTENT_PARAM = "rollup_6_4_compat"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not in love with this flag name... but I can't think of something better. It will only be exposed on the GetRollupJob API, so I don't think we need "rollup" in the name.
Maybe "old_stats_format"
or something? Open to suggestions
@colings86 made a few minor tweaks, left some questions. I think this is probably mostly good to go.
I think this should be ok, since the RollupIndexer extends IterativeIndexer, and the (allocated) persistent task owns the indexer. If the allocated task is killed the indexer will go with it. There's probably some undefined behavior here if two instances of an indexer are running at the same time and holding onto the same state/position references. It's probably ok because they were designed for external threads to signal via those atomics, but yunno, not tested :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I left some minor comments. Regarding the stats naming deprecation I wonder if we really need to flood the log for such a minor change.
The stats api is informal so we should be allowed to change things here as long as we don't break inter-nodes communication, @colings86 @polyfractal ?
* | ||
* @param now The current time in milliseconds passed through from {@link #maybeTriggerAsyncJob(long)} | ||
*/ | ||
protected abstract void onStart(long now); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe onStartJob
to disambiguate ?
* | ||
* @param <JobPosition> Type that defines a job position to be defined by the implementation. | ||
*/ | ||
public abstract class IterativeIndexer<JobPosition> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design is for an asynchronous execution so what about AsyncBulkBySearchIndexer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++ getting async in the name. Some other ideas too:
AsyncTwoPhaseIndexer
(phases being search + bulk)AsyncTwoCycleIndexer
AsyncSearchDrivenIndexer
AsyncSearchTransformingIndexer
I sorta like the two phase one, but otherwise don't really have a preference. AsyncBulkBySearchIndexer
is good with me too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went with AsyncTwoPhaseIndexer
. If anyone feel strongly about the name let me know before it gets merged :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I think its important that we use a consistent approach to all deprecations so I'm keen for us to log to the deprecation logger if the client is not adding the flag to use the new output format so any users programmatically using the stats API are warned of the upcoming breaking change and given the opportunity to migrate to the new response format early. |
With IndexerJobStats now abstract, each implementation of of the AsyncTwoPhaseIndexer can implement their own job stats. The base abstract class provides some stats (docs indexed, etc), but allows each specific implementation the ability to add more stats and control how they are displayed in XContent. This removes the need for the BWC flag.
@jimczi refactored IndexerJobStats to be abstract as we discussed. There's no longer a bwc break for Rollup, as each implementation of AsyncTwoPhaseIndexer implements their own stats object (along with xcontent handling). Would you mind taking a look at those changs? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @polyfractal , glad we can avoid the deprecation. I left some comments regarding the new classes.
} | ||
|
||
@Deprecated | ||
public long getNumRollups() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove this entirely, we changed the package and the name so it's already breaking ?
* | ||
* @param <JobPosition> Type that defines a job position to be defined by the implementation. | ||
*/ | ||
public abstract class AsyncTwoPhaseIndexer<JobPosition> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add IndexerJobStats in the declaration to enforce type (AsyncTwoPhaseIndexer<JobPosition, JobStats extends IndexerJobStats
>) ? The client should receive a RollupIndexerJobStats
when requesting stats for a rollup job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
++
/** | ||
* Get the stats of this indexer. | ||
*/ | ||
public IndexerJobStats getStats() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's return the concrete class
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left more comments.
private final RollupJobStatus status; | ||
|
||
public static final ConstructingObjectParser<JobWrapper, Void> PARSER | ||
= new ConstructingObjectParser<>(NAME, a -> new JobWrapper((RollupJobConfig) a[0], | ||
(RollupJobStats) a[1], (RollupJobStatus)a[2])); | ||
(IndexerJobStats) a[1], (RollupJobStatus)a[2])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the concrete class now ?
PARSER.declareObject(ConstructingObjectParser.constructorArg(), RollupJobStatus.PARSER::apply, STATUS); | ||
} | ||
|
||
public JobWrapper(RollupJobConfig job, RollupJobStats stats, RollupJobStatus status) { | ||
public JobWrapper(RollupJobConfig job, IndexerJobStats stats, RollupJobStatus status) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, RollupIndexerJobStats
@@ -46,7 +46,7 @@ | |||
* @param isUpgradedDocID `true` if this job is using the new ID scheme | |||
* @return A list of rolled documents derived from the response | |||
*/ | |||
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, RollupJobStats stats, | |||
static List<IndexRequest> processBuckets(CompositeAggregation agg, String rollupIndex, IndexerJobStats stats, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use the concrete classRollupIndexJobStats
.
Ah, thanks @jimczi. Found a few more uses of the abstract class and swapped them to concrete too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @polyfractal
Jenkins, run sample packaging tests |
…lity (#32743) This extracts a super class out of the rollup indexer called the AsyncTwoPhaseIterator. The implementor of it can define the query, transformation of the response, indexing and the object to persist the position/state of the indexer. The stats object used by the indexer to record progress is also now abstract, allowing the implementation provide custom stats beyond what the indexer provides. It also allows the implementation to decide how the stats are presented (leaves toXContent() up to the implementation). This should allow new projects to reuse the search-then-index persistent task that Rollup uses, but without the restrictions/baggage of how Rollup has to work internally to satisfy time-based rollups.
* 6.x: Mute test watcher usage stats output [Rollup] Fix FullClusterRestart test TEST: Disable soft-deletes in ParentChildTestCase TEST: Disable randomized soft-deletes settings Integrates soft-deletes into Elasticsearch (#33222) drop `index.shard.check_on_startup: fix` (#32279) Fix AwaitsFix issue number Mute SmokeTestWatcherWithSecurityIT testsi [DOCS] Moves ml folder from x-pack/docs to docs (#33248) TEST: mute more SmokeTestWatcherWithSecurityIT tests [DOCS] Move rollup APIs to docs (#31450) [DOCS] Rename X-Pack Commands section (#33005) Fixes SecurityIntegTestCase so it always adds at least one alias (#33296) TESTS: Fix Random Fail in MockTcpTransportTests (#33061) (#33307) MINOR: Remove Dead Code from PathTrie (#33280) (#33306) Fix pom for build-tools (#33300) Lazy evaluate java9home (#33301) SQL: test coverage for JdbcResultSet (#32813) Work around to be able to generate eclipse projects (#33295) Different handling for security specific errors in the CLI. Fix for #33230 (#33255) [ML] Refactor delimited file structure detection (#33233) SQL: Support multi-index format as table identifier (#33278) Enable forbiddenapis server java9 (#33245) [MUTE] SmokeTestWatcherWithSecurityIT flaky tests Add region ISO code to GeoIP Ingest plugin (#31669) (#33276) Don't be strict for 6.x Update serialization versions for custom IndexMetaData backport Replace IndexMetaData.Custom with Map-based custom metadata (#32749) Painless: Fix Bindings Bug (#33274) SQL: prevent duplicate generation for repeated aggs (#33252) TEST: Mute testMonitorClusterHealth Fix serialization of empty field capabilities response (#33263) Fix nested _source retrieval with includes/excludes (#33180) [DOCS] TLS file resources are reloadable (#33258) Watcher: Ensure TriggerEngine start replaces existing watches (#33157) Ignore module-info in jar hell checks (#33011) Fix docs build after #33241 [DOC] Repository GCS ADC not supported (#33238) Upgrade to latest Gradle 4.10 (#32801) Fix/30904 cluster formation part2 (#32877) Move file-based discovery to core (#33241) HLRC: add client side RefreshPolicy (#33209) [Kerberos] Add unsupported languages for tests (#33253) Watcher: Reload properly on remote shard change (#33167) Fix classpath security checks for external tests. (#33066) [Rollup] Only allow aggregating on multiples of configured interval (#32052) Added deprecation warning for rescore in scroll queries (#33070) Apply settings filter to get cluster settings API (#33247) [Rollup] Re-factor Rollup Indexer into a generic indexer for re-usability (#32743) HLRC: create base timed request class (#33216) HLRC: Use Optional in validation logic (#33104) Painless: Add Bindings (#33042)
Replace mocked indexer and use AsyncTwoPhaseIndexer (introduced in #32743) instead.
Motivation
I am working on a feature that is similar to rollup as it will work with the same 2 cycle process of 1st querying a source index and 2nd indexing results to a destination index.
@polyfractal and I discussed this and realised it would be useful if such features could use a common approach to performing these kinds of operations. This PR serves to implement that common approach.
Solution
This PR extracts a super class out of the rollup indexer called the iterative indexer. The implementor of it can define the query, transformation of the response, indexing and the object to persist the position/state of the indexer.
To be discussed
Apart from the approach (I will annotate some open questions in code), naming; The change introduces a package
org.elasticsearch.xpack.core.indexing
and a classIterativeIndexer
- I could not find a better name yet, I am more than happy about suggestions.Notes
This PR is not fully polished yet as I want to discuss the approach 1st before finalizing the details.