Various Kudu cleanups and fixes related to schema emulation races#11264
Various Kudu cleanups and fixes related to schema emulation races#11264hashhar merged 7 commits intotrinodb:masterfrom
Conversation
da56d10 to
dab4609
Compare
|
Please don't remove PR template. |
8d0f4de to
3872062
Compare
...udu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-kudu/src/test/java/io/trino/plugin/kudu/TestingKuduServer.java
Outdated
Show resolved
Hide resolved
plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java
Outdated
Show resolved
Hide resolved
plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java
Outdated
Show resolved
Hide resolved
plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java
Outdated
Show resolved
Hide resolved
plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java
Outdated
Show resolved
Hide resolved
plugin/trino-kudu/src/main/java/org/apache/kudu/client/KuduOperationApplier.java
Outdated
Show resolved
Hide resolved
fa5d5af to
0994f87
Compare
74fba32 to
d8f9b04
Compare
...ino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
When does the table is actually created ? When we try to create a new table ?
There was a problem hiding this comment.
It is created when trying to listSchemas: https://github.com/trinodb/trino/blob/1773ce393abd4eecf0de26e0909df78f6f34621c/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/schema/SchemaEmulationByTableNameConvention.java#L112-L118
I think the purpose of the table is listing every table from every tablet is expensive, so instead do that listing once and cache the results in a kudu table and then update the kudu table whenever a new schema is created/deleted.
There was a problem hiding this comment.
I think the purpose of the table is listing every table from every tablet is expensive, so instead do that listing once and cache the results in a kudu table and then update the kudu table whenever a new schema is created/deleted.
I think we don't use it for caching - Kudu doesn't have the concept of namespace i.e schema so we virtually create a table to store the schemas and add them as a prefix for the tables under this virtual schema.
https://trino.io/docs/current/connector/kudu.html#behavior-with-schema-emulation
...udu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java
Outdated
Show resolved
Hide resolved
...udu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java
Outdated
Show resolved
Hide resolved
...udu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithStandardInferSchemaConnectorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
So in the real world also it causes an issue this flakiness ?
There was a problem hiding this comment.
Yes, if a kudu master has multiple resolvable ip addresses for a dns record then this read your own writes flakiness will occur [like creating then immediately opening a table]. This is unconfirmed, but it appears to be an issue with the kudu client thinking multiple masters exist even though there is a single master, which results in this problem manifesting: https://issues.apache.org/jira/browse/KUDU-3266
I reached out to the kudu maintainers on slack and they were surprised about this behavior:
https://getkudu.slack.com/archives/C0CPXJ3CH/p1646330492501219
There was a problem hiding this comment.
Can we configure the kuduSession to have manual_flush as its default flush mode.
There was a problem hiding this comment.
It is tricky to do without introducing extra abstraction in ForwardingKuduClient:
https://github.com/trinodb/trino/blob/master/plugin/trino-kudu/src/main/java/io/trino/plugin/kudu/ForwardingKuduClient.java#L98-L101
And we guard against the case of incorrectly using the wrong flush mode here. I'm inclined to leave the configured default flush mode.
There was a check if exists then create and populate table if not exists pattern in the kudu schema emulation layer. This races when two concurrent queries happen. Instead, always try to create and upsert data into the kudu schemas table idempotently.
1773ce3 to
2e909d6
Compare
2e909d6 to
5d53302
Compare
Depending on test ordering, the kudu schemas table may or may not exist
Include the throwable kuduexception cause
BaseConnectorTest creates tables in the default schema. This races in listing of the default schema
This commit (in kudu client v1.12.0): apache/kudu@4bf7e45 Causes multiple ip addresses to be resolved for each kudu master. This interacts poorly with kudu's multi-master support which does not provide read your own writes, leading to flaky tests when creating then immediatly opening a table. The workaround is explicitly connectng to the kudu master at the ipv4 loopback address (127.0.0.1) instead of localhost
This helps performance of loading data into kudu. However, still verify operations are successful and fail fast when errors occur.
They are now reliably passing without flakes after this PR: trinodb#11264
5d53302 to
d174114
Compare
| PartialRow row = upsert.getRow(); | ||
| int start = 0; | ||
| if (generateUUID) { | ||
| String id = format("%s-%08x", uuid, nextSubId++); |
There was a problem hiding this comment.
Q: Should nextSubId be marked volatile (it's a member being updated in a CompletableFuture)?
There was a problem hiding this comment.
The completablefuture NOT_BLOCKED is just a placeholder to signal the page sink is not blocked. NOT_BLOCKED does not hold a reference to nextSubId, so I do not think there are any issues here.
There was a problem hiding this comment.
Just to be clear, non-volatile means that any other thread may not see the new value of nextSubId++ (it's not volatile, so there's no guarantee a separate thread will ever see the updated value, if it had ever read the value before).
If that's not important, then feel free to ignore! (But as an aside, I think it's generally better to be explicit about these scenarios, and I would mark it volatile to be correct - rather than have code which could be "incorrect" under certain circumstances, and just depending on those situations being "impossible" - someone may always re-use this code and fall foul of the invisible update!).
There was a problem hiding this comment.
Concrete example:
executor.submit(() -> kuduPageSink.appendPage(page));
executor.submit(() -> kuduPageSink.appendPage(anotherPage));
If that were a multi-threaded Exector, this line could yield the same value in both threads:
String id = format("%s-%08x", uuid, nextSubId++);
(both threads read kuduPageSink.nextSubId , both increment it, neither see the value of the other, so it's both a race and a lost update - but even if only 1 thread did updates, because it's not volatile, there's no guarantee anyone who has a reference to kuduPageSink instance will see the update nextSubId++, hence the problem).
On reflection, I'd use AtomicInteger.getAndIncrement() for simplicity sake (and to avoid the race I mentioned - volatile wouldn't protect against that...).
But anyways - just wanted to throw that out there for your consideration!
There was a problem hiding this comment.
Correct me if I'm wrong, but page sinks are not expected to be thread safe.
Nowhere in the javadoc for ConnectorPageSink does it list thread safety as a requirement. The closest thing I could find was:
/**
* Returns a future that will be completed when the page sink can accept
* more pages. If the page sink can accept more pages immediately,
* this method should return {@code NOT_BLOCKED}.
*/
CompletableFuture<?> appendPage(Page page);
Here we only return NOT_BLOCKED after the critical section, so there is no risk of concurrent access to nextSubId
| catch (KuduException e) { | ||
| if (e.getStatus().isAlreadyPresent()) { | ||
| // Table already exists | ||
| return client.openTable(name); |
There was a problem hiding this comment.
Q: (For complete correctness) should we do a comparison on the existing table's options to be sure they're effectively "the same"?
(I know this is fixing a race in test code, but in Prod code if someone attempted to create a table w/incompatible options at the same time as someone else, they may (unexpectedly) get back a KuduTable which doesn't match the create options they were trying to apply?)
There was a problem hiding this comment.
(Looking a little more closely, it seems the only usage is from createAndFillSchemasTable, which means the case I suggested can't manifest - so ignore!).
There was a problem hiding this comment.
The whole approach of emulating schemas in kudu using a kudu table is not safe given kudu's consistency semantics and trino does not propagate client timestamps in kudu.
I talked with some kudu maintainers, impala uses the hive metastore to emulate schemas in kudu, that introduces a big dependency however.
There was a problem hiding this comment.
I think the purpose of the table is listing every table from every tablet is expensive, so instead do that listing once and cache the results in a kudu table and then update the kudu table whenever a new schema is created/deleted.
I think we don't use it for caching - Kudu doesn't have the concept of namespace i.e schema so we virtually create a table to store the schemas and add them as a prefix for the tables under this virtual schema.
https://trino.io/docs/current/connector/kudu.html#behavior-with-schema-emulation
| assertThat(rows).containsAll( | ||
| REQUIRED_TPCH_TABLES.stream() | ||
| .map(TpchTable::getTableName) | ||
| .collect(Collectors.toList())); |
There was a problem hiding this comment.
nit: Any specific reason on not using similar toUnmodifiableListhere ?
There was a problem hiding this comment.
Doesn't seem like it matters given the list clearly cannot be modified.
...udu/src/test/java/io/trino/plugin/kudu/AbstractKuduWithDisabledInferSchemaConnectorTest.java
Outdated
Show resolved
Hide resolved
|
@Praveen2112 WRT to #11264 (comment) Yes, but you could imagine instead of creating a $schemas table to emulate schemas you could list all kudu tables and from the naming convention use that information to emulate schemas. The $schemas table is purely for performance reasons. |
|
Yeah !! I agree schema operations would be a lot faster with the $schema tables. |
|
Thanks a lot for digging to the bottom of this flakiness @grantatspothero . Wasn't easy for sure. And the performance improvements for writes is also nice. |
Description
Fixes lots of different problems with flaky kudu tests. Let me know if I should split some of these commits into separate PRs
$schemastable$schemastableAUTO_FLUSH_BACKGROUNDtoAUTO_FLUSH_SYNCto improve correctness, but unfortunately it caused a performance degradation due to flushing each operation one at a time. This commit keeps correctness while getting better write performance.Related issues, pull requests, and links
#11203
Documentation
(x) No documentation is needed.
( ) Sufficient documentation is included in this PR.
( ) Documentation PR is available with #prnumber.
( ) Documentation issue #issuenumber is filed, and can be handled later.
Release notes
( ) No release notes entries required.
(x) Release notes entries required with the following suggested text: