-
Notifications
You must be signed in to change notification settings - Fork 307
Initial commit of gcs modular file system plugin (vnvo2409) #1203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 8 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
a55bf7c
Initial commit of gcs modular file system plugin (vnvo2409)
yongtang b05939a
Namespace changes to avoid collision & conflict
yongtang a1ff97e
Add wrapper to GCSFileSystem so that it will only lazy-loaded.
yongtang 8b5ac40
Disable scheme check
yongtang 19fc707
Migrate GetTempFileName
yongtang c89f52b
Add placeholder APIs
yongtang 73622f9
Disable for now.
yongtang 2667885
Empty commit to trigger GitHub Actions
yongtang 6c19988
Rename the guards for header inclusion based on review feedback.
yongtang File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,34 @@ | ||
| licenses(["notice"]) # Apache 2.0 | ||
|
|
||
| package(default_visibility = ["//visibility:public"]) | ||
|
|
||
| load( | ||
| "//:tools/build/tensorflow_io.bzl", | ||
| "tf_io_copts", | ||
| ) | ||
|
|
||
| cc_library( | ||
| name = "gs", | ||
| srcs = [ | ||
| "cleanup.h", | ||
| "expiring_lru_cache.h", | ||
| "gcs_env.cc", | ||
| "gcs_env.h", | ||
| "gcs_filesystem.cc", | ||
| "gcs_helper.cc", | ||
| "gcs_helper.h", | ||
| "ram_file_block_cache.cc", | ||
| "ram_file_block_cache.h", | ||
| ], | ||
| copts = tf_io_copts(), | ||
| linkstatic = True, | ||
| deps = [ | ||
| "//tensorflow_io/core/plugins:plugins_header", | ||
| "@com_github_googleapis_google_cloud_cpp//:storage_client", | ||
| "@com_google_absl//absl/base:core_headers", | ||
| "@com_google_absl//absl/strings", | ||
| "@com_google_absl//absl/synchronization", | ||
| "@com_google_absl//absl/types:variant", | ||
| ], | ||
| alwayslink = 1, | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,117 @@ | ||
| /* Copyright 2020 The TensorFlow Authors. All Rights Reserved. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| ==============================================================================*/ | ||
|
|
||
| // MakeCleanup(f) returns an RAII cleanup object that calls 'f' in its | ||
| // destructor. The easiest way to use MakeCleanup is with a lambda argument, | ||
| // capturing the return value in an 'auto' local variable. Most users will not | ||
| // need more sophisticated syntax than that. | ||
| // | ||
| // Example: | ||
| // void func() { | ||
| // FILE* fp = fopen("data.txt", "r"); | ||
| // if (fp == nullptr) return; | ||
| // auto fp_cleaner = gtl::MakeCleanup([fp] { fclose(fp); }); | ||
| // // No matter what, fclose(fp) will happen. | ||
| // DataObject d; | ||
| // while (ReadDataObject(fp, &d)) { | ||
| // if (d.IsBad()) { | ||
| // LOG(ERROR) << "Bad Data"; | ||
| // return; | ||
| // } | ||
| // PushGoodData(d); | ||
| // } | ||
| // } | ||
| // | ||
| // You can use Cleanup<F> directly, instead of using MakeCleanup and auto, | ||
| // but there's rarely a reason to do that. | ||
| // | ||
| // You can call 'release()' on a Cleanup object to cancel the cleanup. | ||
|
|
||
| #ifndef TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_CLEANUP_H_ | ||
| #define TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_CLEANUP_H_ | ||
|
|
||
| #include <type_traits> | ||
| #include <utility> | ||
|
|
||
| namespace tensorflow { | ||
| namespace io { | ||
| namespace gs { | ||
|
|
||
| namespace tf_gcs_filesystem { | ||
|
|
||
| // A move-only RAII object that calls a stored cleanup functor when | ||
| // destroyed. Cleanup<F> is the return type of gtl::MakeCleanup(F). | ||
| template <typename F> | ||
| class Cleanup { | ||
| public: | ||
| Cleanup() : released_(true), f_() {} | ||
|
|
||
| template <typename G> | ||
| explicit Cleanup(G&& f) // NOLINT | ||
| : f_(std::forward<G>(f)) {} // NOLINT(build/c++11) | ||
|
|
||
| Cleanup(Cleanup&& src) // NOLINT | ||
| : released_(src.is_released()), f_(src.release()) {} | ||
|
|
||
| // Implicitly move-constructible from any compatible Cleanup<G>. | ||
| // The source will be released as if src.release() were called. | ||
| // A moved-from Cleanup can be safely destroyed or reassigned. | ||
| template <typename G> | ||
| Cleanup(Cleanup<G>&& src) // NOLINT | ||
| : released_(src.is_released()), f_(src.release()) {} | ||
|
|
||
| // Assignment to a Cleanup object behaves like destroying it | ||
| // and making a new one in its place, analogous to unique_ptr | ||
| // semantics. | ||
| Cleanup& operator=(Cleanup&& src) { // NOLINT | ||
| if (!released_) f_(); | ||
| released_ = src.released_; | ||
| f_ = src.release(); | ||
| return *this; | ||
| } | ||
|
|
||
| ~Cleanup() { | ||
| if (!released_) f_(); | ||
| } | ||
|
|
||
| // Releases the cleanup function instead of running it. | ||
| // Hint: use c.release()() to run early. | ||
| F release() { | ||
| released_ = true; | ||
| return std::move(f_); | ||
| } | ||
|
|
||
| bool is_released() const { return released_; } | ||
|
|
||
| private: | ||
| static_assert(!std::is_reference<F>::value, "F must not be a reference"); | ||
|
|
||
| bool released_ = false; | ||
| F f_; | ||
| }; | ||
|
|
||
| template <int&... ExplicitParameterBarrier, typename F, | ||
| typename DecayF = typename std::decay<F>::type> | ||
| Cleanup<DecayF> MakeCleanup(F&& f) { | ||
| return Cleanup<DecayF>(std::forward<F>(f)); | ||
| } | ||
|
|
||
| } // namespace tf_gcs_filesystem | ||
|
|
||
| } // namespace gs | ||
| } // namespace io | ||
| } // namespace tensorflow | ||
|
|
||
| #endif // TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_CLEANUP_H_ | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| /* Copyright 2020 The TensorFlow Authors. All Rights Reserved. | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| ==============================================================================*/ | ||
|
|
||
| #ifndef TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_ | ||
| #define TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_ | ||
|
|
||
| #include <functional> | ||
| #include <list> | ||
| #include <map> | ||
| #include <memory> | ||
| #include <string> | ||
|
|
||
| #include "absl/base/thread_annotations.h" | ||
| #include "absl/synchronization/mutex.h" | ||
| #include "tensorflow/c/tf_status.h" | ||
| #include "tensorflow_io/core/plugins/gs/gcs_env.h" | ||
|
|
||
| namespace tensorflow { | ||
| namespace io { | ||
| namespace gs { | ||
|
|
||
| namespace tf_gcs_filesystem { | ||
|
|
||
| /// \brief An LRU cache of string keys and arbitrary values, with configurable | ||
| /// max item age (in seconds) and max entries. | ||
| /// | ||
| /// This class is thread safe. | ||
| template <typename T> | ||
| class ExpiringLRUCache { | ||
| public: | ||
| /// A `max_age` of 0 means that nothing is cached. A `max_entries` of 0 means | ||
| /// that there is no limit on the number of entries in the cache (however, if | ||
| /// `max_age` is also 0, the cache will not be populated). | ||
| ExpiringLRUCache(uint64_t max_age, size_t max_entries, | ||
| std::function<uint64_t()> timer_seconds = GCSNowSeconds) | ||
| : max_age_(max_age), | ||
| max_entries_(max_entries), | ||
| timer_seconds_(timer_seconds) {} | ||
|
|
||
| /// Insert `value` with key `key`. This will replace any previous entry with | ||
| /// the same key. | ||
| void Insert(const std::string& key, const T& value) { | ||
| if (max_age_ == 0) { | ||
| return; | ||
| } | ||
| absl::MutexLock lock(&mu_); | ||
| InsertLocked(key, value); | ||
| } | ||
|
|
||
| // Delete the entry with key `key`. Return true if the entry was found for | ||
| // `key`, false if the entry was not found. In both cases, there is no entry | ||
| // with key `key` existed after the call. | ||
| bool Delete(const std::string& key) { | ||
| absl::MutexLock lock(&mu_); | ||
| return DeleteLocked(key); | ||
| } | ||
|
|
||
| /// Look up the entry with key `key` and copy it to `value` if found. Returns | ||
| /// true if an entry was found for `key`, and its timestamp is not more than | ||
| /// max_age_ seconds in the past. | ||
| bool Lookup(const std::string& key, T* value) { | ||
| if (max_age_ == 0) { | ||
| return false; | ||
| } | ||
| absl::MutexLock lock(&mu_); | ||
| return LookupLocked(key, value); | ||
| } | ||
|
|
||
| typedef std::function<void(const std::string&, T*, TF_Status*)> ComputeFunc; | ||
|
|
||
| /// Look up the entry with key `key` and copy it to `value` if found. If not | ||
| /// found, call `compute_func`. If `compute_func` set `status` to `TF_OK`, | ||
| /// store a copy of the output parameter in the cache, and another copy in | ||
| /// `value`. | ||
| void LookupOrCompute(const std::string& key, T* value, | ||
| const ComputeFunc& compute_func, TF_Status* status) { | ||
| if (max_age_ == 0) { | ||
| return compute_func(key, value, status); | ||
| } | ||
|
|
||
| // Note: we hold onto mu_ for the rest of this function. In practice, this | ||
| // is okay, as stat requests are typically fast, and concurrent requests are | ||
| // often for the same file. Future work can split this up into one lock per | ||
| // key if this proves to be a significant performance bottleneck. | ||
| absl::MutexLock lock(&mu_); | ||
| if (LookupLocked(key, value)) { | ||
| return TF_SetStatus(status, TF_OK, ""); | ||
| } | ||
| compute_func(key, value, status); | ||
| if (TF_GetCode(status) == TF_OK) { | ||
| InsertLocked(key, *value); | ||
| } | ||
| } | ||
|
|
||
| /// Clear the cache. | ||
| void Clear() { | ||
| absl::MutexLock lock(&mu_); | ||
| cache_.clear(); | ||
| lru_list_.clear(); | ||
| } | ||
|
|
||
| /// Accessors for cache parameters. | ||
| uint64_t max_age() const { return max_age_; } | ||
| size_t max_entries() const { return max_entries_; } | ||
|
|
||
| private: | ||
| struct Entry { | ||
| /// The timestamp (seconds) at which the entry was added to the cache. | ||
| uint64_t timestamp; | ||
|
|
||
| /// The entry's value. | ||
| T value; | ||
|
|
||
| /// A list iterator pointing to the entry's position in the LRU list. | ||
| std::list<std::string>::iterator lru_iterator; | ||
| }; | ||
|
|
||
| bool LookupLocked(const std::string& key, T* value) | ||
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { | ||
| auto it = cache_.find(key); | ||
| if (it == cache_.end()) { | ||
| return false; | ||
| } | ||
| lru_list_.erase(it->second.lru_iterator); | ||
| if (timer_seconds_() - it->second.timestamp > max_age_) { | ||
| cache_.erase(it); | ||
| return false; | ||
| } | ||
| *value = it->second.value; | ||
| lru_list_.push_front(it->first); | ||
| it->second.lru_iterator = lru_list_.begin(); | ||
| return true; | ||
| } | ||
|
|
||
| void InsertLocked(const std::string& key, const T& value) | ||
| ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { | ||
| lru_list_.push_front(key); | ||
| Entry entry{timer_seconds_(), value, lru_list_.begin()}; | ||
| auto insert = cache_.insert(std::make_pair(key, entry)); | ||
| if (!insert.second) { | ||
| lru_list_.erase(insert.first->second.lru_iterator); | ||
| insert.first->second = entry; | ||
| } else if (max_entries_ > 0 && cache_.size() > max_entries_) { | ||
| cache_.erase(lru_list_.back()); | ||
| lru_list_.pop_back(); | ||
| } | ||
| } | ||
|
|
||
| bool DeleteLocked(const std::string& key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { | ||
| auto it = cache_.find(key); | ||
| if (it == cache_.end()) { | ||
| return false; | ||
| } | ||
| lru_list_.erase(it->second.lru_iterator); | ||
| cache_.erase(it); | ||
| return true; | ||
| } | ||
|
|
||
| /// The maximum age of entries in the cache, in seconds. A value of 0 means | ||
| /// that no entry is ever placed in the cache. | ||
| const uint64_t max_age_; | ||
|
|
||
| /// The maximum number of entries in the cache. A value of 0 means there is no | ||
| /// limit on entry count. | ||
| const size_t max_entries_; | ||
|
|
||
| /// The callback to read timestamps. | ||
| std::function<uint64_t()> timer_seconds_; | ||
|
|
||
| /// Guards access to the cache and the LRU list. | ||
| absl::Mutex mu_; | ||
|
|
||
| /// The cache (a map from string key to Entry). | ||
| std::map<std::string, Entry> cache_ ABSL_GUARDED_BY(mu_); | ||
|
|
||
| /// The LRU list of entries. The front of the list identifies the most | ||
| /// recently accessed entry. | ||
| std::list<std::string> lru_list_ ABSL_GUARDED_BY(mu_); | ||
| }; | ||
|
|
||
| } // namespace tf_gcs_filesystem | ||
|
|
||
| } // namespace gs | ||
| } // namespace io | ||
| } // namespace tensorflow | ||
|
|
||
| #endif // TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_ |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.