Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/changelog/118370.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 118370
summary: Fix concurrency issue with `ReinitializingSourceProvider`
area: Mapping
type: bug
issues:
- 118238
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.parser.ParsingException;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.junit.Before;

import java.io.IOException;
Expand Down Expand Up @@ -1673,17 +1674,32 @@ public void testScriptField() throws Exception {
String sourceMode = randomBoolean() ? "stored" : "synthetic";
Settings.Builder settings = indexSettings(1, 0).put(indexSettings()).put("index.mapping.source.mode", sourceMode);
client().admin().indices().prepareCreate("test-script").setMapping(mapping).setSettings(settings).get();
for (int i = 0; i < 10; i++) {
int numDocs = 256;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These test adjustments were required to make it much more likely that the test failure as was reported in #118238 would reproduce locally. Without the fix to ReinitializingSourceProvider this test should fail after a few iterations, with the fix to ReinitializingSourceProvider this test failure reproduces.

for (int i = 0; i < numDocs; i++) {
index("test-script", Integer.toString(i), Map.of("k1", i, "k2", "b-" + i, "meter", 10000 * i));
}
refresh("test-script");
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT 10")) {

var pragmas = randomPragmas();
if (canUseQueryPragmas()) {
Settings.Builder pragmaSettings = Settings.builder().put(pragmas.getSettings());
pragmaSettings.put("task_concurrency", 10);
pragmaSettings.put("data_partitioning", "doc");
pragmas = new QueryPragmas(pragmaSettings.build());
}
try (EsqlQueryResponse resp = run("FROM test-script | SORT k1 | LIMIT " + numDocs, pragmas)) {
List<Object> k1Column = Iterators.toList(resp.column(0));
assertThat(k1Column, contains(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L));
assertThat(k1Column, equalTo(LongStream.range(0L, numDocs).boxed().toList()));
List<Object> k2Column = Iterators.toList(resp.column(1));
assertThat(k2Column, contains(null, null, null, null, null, null, null, null, null, null));
assertThat(k2Column, equalTo(Collections.nCopies(numDocs, null)));
List<Object> meterColumn = Iterators.toList(resp.column(2));
assertThat(meterColumn, contains(0.0, 10000.0, 20000.0, 30000.0, 40000.0, 50000.0, 60000.0, 70000.0, 80000.0, 90000.0));
var expectedMeterColumn = new ArrayList<>(numDocs);
double val = 0.0;
for (int i = 0; i < numDocs; i++) {
expectedMeterColumn.add(val);
val += 10000.0;
}
assertThat(meterColumn, equalTo(expectedMeterColumn));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,23 @@
import java.util.function.Supplier;

/**
* This is a workaround for when compute engine executes concurrently with data partitioning by docid.
* This class exists as a workaround for using SourceProvider in the compute engine.
* <p>
* The main issue is when compute engine executes concurrently with data partitioning by docid (inter segment parallelization).
* A {@link SourceProvider} can only be used by a single thread and this wrapping source provider ensures that each thread uses
* its own {@link SourceProvider}.
* <p>
* Additionally, this source provider protects against going backwards, which the synthetic source provider can't handle.
*/
final class ReinitializingSourceProvider implements SourceProvider {

private PerThreadSourceProvider perThreadProvider;
private final Supplier<SourceProvider> sourceProviderFactory;

// Keeping track of last seen doc and if current doc is before last seen doc then source provider is initialized:
// (when source mode is synthetic then _source is read from doc values and doc values don't support going backwards)
private int lastSeenDocId;

ReinitializingSourceProvider(Supplier<SourceProvider> sourceProviderFactory) {
this.sourceProviderFactory = sourceProviderFactory;
}
Expand All @@ -30,11 +40,12 @@ final class ReinitializingSourceProvider implements SourceProvider {
public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
var currentThread = Thread.currentThread();
PerThreadSourceProvider provider = perThreadProvider;
if (provider == null || provider.creatingThread != currentThread) {
if (provider == null || provider.creatingThread != currentThread || doc < lastSeenDocId) {
provider = new PerThreadSourceProvider(sourceProviderFactory.get(), currentThread);
this.perThreadProvider = provider;
}
return perThreadProvider.source.getSource(ctx, doc);
Copy link
Member

@dnhatn dnhatn Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use provider not this.perThreadProvider here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did try this, but then I ran into this assertion failure:

java.lang.AssertionError
        at __randomizedtesting.SeedInfo.seed([89DC8DB16062966D]:0)
        at org.apache.lucene.tests.index.AssertingLeafReader$AssertingNumericDocValues.advanceExact(AssertingLeafReader.java:757)
        at org.apache.lucene.index.SingletonSortedNumericDocValues.advanceExact(SingletonSortedNumericDocValues.java:62)
        at org.elasticsearch.index.mapper.SortedNumericDocValuesSyntheticFieldLoader$ImmediateDocValuesLoader.advanceToDoc(SortedNumericDocValuesSyntheticFieldLoader.java:142)
        at org.elasticsearch.index.mapper.ObjectMapper$SyntheticSourceFieldLoader$ObjectDocValuesLoader.advanceToDoc(ObjectMapper.java:965)
        at org.elasticsearch.index.mapper.SourceLoader$Synthetic$SyntheticLeaf.write(SourceLoader.java:210)
        at org.elasticsearch.index.mapper.SourceLoader$Synthetic$SyntheticLeaf.source(SourceLoader.java:181)
        at org.elasticsearch.index.mapper.SourceLoader$Synthetic$LeafWithMetrics.source(SourceLoader.java:146)
        at org.elasticsearch.search.lookup.SyntheticSourceProvider$SyntheticSourceLeafLoader.getSource(SyntheticSourceProvider.java:58)
        at org.elasticsearch.search.lookup.SyntheticSourceProvider.getSource(SyntheticSourceProvider.java:42)
        at org.elasticsearch.xpack.esql.plugin.ReinitializingSourceProvider.getSource(ReinitializingSourceProvider.java:37)
        at org.elasticsearch.search.lookup.LeafSearchLookup.lambda$new$0(LeafSearchLookup.java:40)
        at org.elasticsearch.script.AbstractFieldScript.extractFromSource(AbstractFieldScript.java:107)
        at org.elasticsearch.script.AbstractFieldScript.emitFromSource(AbstractFieldScript.java:127)
        at org.elasticsearch.script.LongFieldScript$1$1.execute(LongFieldScript.java:29)
        at org.elasticsearch.script.AbstractFieldScript.runForDoc(AbstractFieldScript.java:159)
        at org.elasticsearch.index.fielddata.LongScriptDocValues.advanceExact(LongScriptDocValues.java:26)
        at org.elasticsearch.search.MultiValueMode$6.advanceExact(MultiValueMode.java:534)
        at org.elasticsearch.index.fielddata.FieldData$17.advanceExact(FieldData.java:620)
        at org.apache.lucene.search.comparators.LongComparator$LongLeafComparator.getValueForDoc(LongComparator.java:80)
        at org.apache.lucene.search.comparators.LongComparator$LongLeafComparator.copy(LongComparator.java:105)
        at org.apache.lucene.search.TopFieldCollector$TopFieldLeafCollector.collectAnyHit(TopFieldCollector.java:124)
        at org.apache.lucene.search.TopFieldCollector$SimpleFieldCollector$1.collect(TopFieldCollector.java:209)
        at org.apache.lucene.search.Weight$DefaultBulkScorer.scoreRange(Weight.java:305)
        at org.apache.lucene.search.Weight$DefaultBulkScorer.score(Weight.java:264)
        at org.elasticsearch.compute.lucene.LuceneOperator$LuceneScorer.scoreNextRange(LuceneOperator.java:193)
        at org.elasticsearch.compute.lucene.LuceneTopNSourceOperator.collect(LuceneTopNSourceOperator.java:168)
        at org.elasticsearch.compute.lucene.LuceneTopNSourceOperator.getCheckedOutput(LuceneTopNSourceOperator.java:148)
        at org.elasticsearch.compute.lucene.LuceneOperator.getOutput(LuceneOperator.java:118)
        at org.elasticsearch.compute.operator.Driver.runSingleLoopIteration(Driver.java:258)
        at org.elasticsearch.compute.operator.Driver.run(Driver.java:189)
        at org.elasticsearch.compute.operator.Driver$1.doRun(Driver.java:378)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
        at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:34)
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:1023)
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:27)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1575)

So then I went with this approach.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But thinking more about this, just using the provider local variable should fix the issue. This is why I also went for it initially. Maybe there is something wrong elsewhere.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed: 653ed40

The assertion that tripped here is because of another reason. The linked test failure failed because different threads using the same stored field readers or doc values instances (for the latter case, I observed this locally). This assertion failure here is because we go backwards with the current docid. I added protection against that in this commit, which reinitializes source provider if last seen docid is lower than the current docid.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was able to reproduce this failure by running: ./gradlew ":x-pack:plugin:esql:internalClusterTest" --tests "org.elasticsearch.xpack.esql.action.Esql*IT.testScriptField" -Dtests.iters=128

Without the commit this fails, with the commit the executes successfully.

lastSeenDocId = doc;
return provider.source.getSource(ctx, doc);
}

private record PerThreadSourceProvider(SourceProvider source, Thread creatingThread) {
Expand Down