-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: hook up last cache to query executor using DataFusion traits #25143
Conversation
Each last cache holds a ring buffer for each column in an index map, which preserves the insertion order for faster record batch production. The ring buffer uses a custom type to handle the different supported data types that we can have in the system.
LastCacheProvider is the API used to create last caches and write table batches to them. It uses a two-layer RwLock/HashMap: the first for the database, and the second layer for the table within the database. This allows for table-level locks when writing in buffered data, and only gets a database-level lock when creating a cache (and in future, when removing them as well).
Added basic APIs on the write buffer to access the last cache and then a test to the last_cache module to see that it works with a simple example
Addressed three parts of PR feedback: 1. Remove double-lock on cache map 2. Re-order the get when writing to the cache to be outside the loop 3. Move the time check into the cache itself
This refactors the last cache to use a nested caching structure, where the key columns for a given cache are used to create a hierarchy of nested maps, terminating in the actual store for the values in the cache. Access to the cache is done via a set of predicates which can optionally specify the key column values at any level in the cache hierarchy to only gather record batches from children of that node in the cache. Some todos: - Need to handle the TTL - Need to move the TableProvider impl up to the LastCache type
This re-writes the datafusion TableProvider implementation on the correct type, i.e., the LastCache, and adds conversion from the filter Expr's to the Predicate type for the cache.
Last caches will have expired entries walked when writes come in.
Changed key columns so that they do not accept null values, i.e., rows that are pushed that are missing key column values will be ignored. When producing record batches for a cache, if not all key columns are used in the predicate, then this change makes it so that the non-predicate key columns are produced as columns in the outputted record batches. A test with a few cases showing this was added.
Ensure key columns in the last cache that are not included in the predicate are emitted in the RecordBatches as a column. Cleaned up and added comments to the new test.
Added two tests, as per commit title. Also moved the eviction process to a separate function so that it was not being done on every write to the cache, which could be expensive, and this ensures that entries are evicted regardless of whether writes are coming in or not.
CacheAlreadyExists errors were only being based on the database and table names, and not including the cache names, which was not correct.
This also adds explicit support for series key columns to distinguish them from normal tags in terms of nullability A test was added to check nulls work
Support the addition of new fields to the last cache, for caches that do not have a specified set of value columns. A test was added along with the changes.
Created a new module for the DataFusion table function implementations. The TableProvider impl for LastCache was moved there, and new code that implements the TableFunctionImpl trait to make the last cache queryable was also written. The LastCacheProvider and LastCache were augmented to make this work: - The provider stores an Arc<LastCache> instead of a LastCache - The LastCache uses interior mutability via an RwLock, to make the above possible.
The server used to accept a socket address and bind it directly, returning error if the bind fails. This commit changes that so the ServerBuilder accepts a TcpListener. The behaviour is essentially the same, but this allows us to bind the address from tests when instantiating the server, so we can easily assign unused ports. Tests in the influxdb3_server were updated to exploit this in order to use port 0 auto assignment and stop flaky test failures. A new, failing, test was also added to that module for the last cache.
Committing here as the last cache is in a working state, but it is naively implemented as it just stores all key columns again (still with the hierarchy)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left comments for myself 🧹
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few suggestions and comments but nothing to me that's blocking
@@ -300,12 +300,17 @@ pub async fn command(config: Config) -> Result<()> { | |||
config.query_log_size, | |||
)); | |||
|
|||
let listener = TcpListener::bind(*config.http_bind_address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have some tests that use a socket addr and I think they cause spurious failures. I'm glad we bind here and pass this in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes! That is largely what motivated me to do this 😊
ServerBuilder { | ||
common_state: self.common_state, | ||
time_provider: self.time_provider, | ||
max_request_size: self.max_request_size, | ||
write_buffer: self.write_buffer, | ||
query_executor: self.query_executor, | ||
persister: self.persister, | ||
listener: WithListener(listener), | ||
authorizer: self.authorizer, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can just be
ServerBuilder { listener: WithListener(listener), ..self }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same with the other methods like this. Should reduce some line noise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is unfortunately part of the pain and suffering of this excessively boiler-plate-y builder pattern. Because the return type is different than the Self
type, the compiler doesn't allow this 😔:
error[E0308]: mismatched types
--> influxdb3_server/src/builder.rs:72:15
|
72 | ..self
| ^^^^ expected `ServerBuilder<WithWriteBuf<...>, ..., ..., ..., ...>`, found
`ServerBuilder<NoWriteBuf, ..., ..., ..., ...>`
|
= note: expected struct `ServerBuilder<WithWriteBuf<W>, _, _, _, _>` (return type)
found struct `ServerBuilder<NoWriteBuf, _, _, _, _>` (Self type)
For more information about this error, try `rustc --explain E0308`.
error: could not compile `influxdb3_server` (lib) due to 1 previous error
use super::{LastCache, LastCacheProvider}; | ||
|
||
#[async_trait] | ||
impl TableProvider for LastCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is pretty cool -- I think we could totally support something like this in distributed (the key would be, of course, to get and populate LastCacheProvider
correctly in a distributed setting, which is likely pretty compilcated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect use case for Redis. I actually used it for exactly this at the fintech startup I worked at in 2010 where I created my first "time series database" 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything looks clean and well tested. The one concern I have here is that the locks in the individual caches are going to be an expensive pattern.
Writes come in as an entire batch. Once the WAL refactor is done, this will happen once per second by default. This means that all rows written to the database will get written to a log file and then loaded into the in-memory buffer and last cache all in one go. Acquiring locks on every individual last cache is going to be very costly in scenarios when you're tracking hundreds of thousands of individual caches (i.e. tags with higher cardinalities).
Ideally, we'd grab one write lock, update everything and release. Then on read, just convert the cache data into a record batch on the fly and give that back. So you have a single lock on the entire state of the buffer or last cache that gets locked for write once per second and no underlying locks to worry about.
Otherwise, individual reads to all these caches are likely to cause a bunch of contention with the write update process which will also have to acquire a bunch of individual locks. It's basically a bunch of lock thrashing that we can avoid given we know that we batch all writes up in one second intervals.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesomesauce, 🚢!!!
Closes #25095
Note: I kind of messed up the commit history on this because I was stacking PRs. The previous PRs got squash merged and I think merging
main
into this branch (instead of the stacked branch) is what messed it up. All the previous PRs commits are added to this and I haven't been able to rebase/squash them locally. I figured since this too will be squashed then it can be ignored, instead of trying to grapple with it further. The PR diff is correct.Summary
This PR implements DataFusion's
TableProvider
andTableFunctionImpl
traits on theLastCache
and hooks the cache into the query executor, so it can be accessed via the query API.I split out the
last_cache
module ininfluxdb3_write
to two files to separate the core code from the DataFusion implementation.The cache is accessed through the query API via the table function
last_cache
, like so:There is a small test added that shows the cache being accessed through the query API here.
Enabling this required I change the
LastCache
and theLastCacheProvider
in the following ways:LastCacheProvider
, which holds the whole cache, spanning all databases and tables, now holds anArc<LastCache>
instead of aLastCache
at the lowest level.LastCache
uses anRwLock
internally to allow for interior mutability.LastCache
now includes fields for the key columns, previously it just held fields for value columns.(1.) was done to make implementation of the
TableFunctionImpl
trait possible. TheTableProvider
trait is implemented on theLastCache
type, so when fetching the cache from the provider, it needed to beArc
'd in order to satisfy the borrow checker.(2.) was required to allow mutability, given (1.).
(3.) was required for the
TableProvider
to work properly, and produceRecordBatch
es correctly.Other Changes
In addition to the above, I refactored the
Server
ininfluxdb3_server
to accept aTcpListener
instead of aSocketAddr
. This means the server is started using an already bound TCP address. This doesn't change the server in any way, but allows tests using the server to more reliably assign it a port, i.e., by binding to port 0, which will assign a randomly available port from the OS (we do this in our E2E tests, but also see here).A Note on Testing
I would like to test this more with E2E tests, but will need to get the APIs setup first (see #25096 and #25097)