Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tensorflow_io/core/plugins/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ cc_library(
linkstatic = True,
deps = [
"//tensorflow_io/core/plugins/az",
"//tensorflow_io/core/plugins/gs",
"//tensorflow_io/core/plugins/hdfs",
"//tensorflow_io/core/plugins/http",
"//tensorflow_io/core/plugins/s3",
Expand Down
3 changes: 2 additions & 1 deletion tensorflow_io/core/plugins/file_system_plugins.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ limitations under the License.
void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
info->plugin_memory_allocate = tensorflow::io::plugin_memory_allocate;
info->plugin_memory_free = tensorflow::io::plugin_memory_free;
info->num_schemes = 6;
info->num_schemes = 7;
info->ops = static_cast<TF_FilesystemPluginOps*>(
tensorflow::io::plugin_memory_allocate(info->num_schemes *
sizeof(info->ops[0])));
Expand All @@ -28,4 +28,5 @@ void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[3], "hdfse");
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[4], "viewfse");
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[5], "hare");
tensorflow::io::gs::ProvideFilesystemSupportFor(&info->ops[6], "gse");
}
6 changes: 6 additions & 0 deletions tensorflow_io/core/plugins/file_system_plugins.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);

} // namespace az

namespace gs {

void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);

} // namespace gs

namespace hdfs {

void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);
Expand Down
34 changes: 34 additions & 0 deletions tensorflow_io/core/plugins/gs/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
licenses(["notice"]) # Apache 2.0

package(default_visibility = ["//visibility:public"])

load(
"//:tools/build/tensorflow_io.bzl",
"tf_io_copts",
)

cc_library(
name = "gs",
srcs = [
"cleanup.h",
"expiring_lru_cache.h",
"gcs_env.cc",
"gcs_env.h",
"gcs_filesystem.cc",
"gcs_helper.cc",
"gcs_helper.h",
"ram_file_block_cache.cc",
"ram_file_block_cache.h",
],
copts = tf_io_copts(),
linkstatic = True,
deps = [
"//tensorflow_io/core/plugins:plugins_header",
"@com_github_googleapis_google_cloud_cpp//:storage_client",
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/types:variant",
],
alwayslink = 1,
)
117 changes: 117 additions & 0 deletions tensorflow_io/core/plugins/gs/cleanup.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

// MakeCleanup(f) returns an RAII cleanup object that calls 'f' in its
// destructor. The easiest way to use MakeCleanup is with a lambda argument,
// capturing the return value in an 'auto' local variable. Most users will not
// need more sophisticated syntax than that.
//
// Example:
// void func() {
// FILE* fp = fopen("data.txt", "r");
// if (fp == nullptr) return;
// auto fp_cleaner = gtl::MakeCleanup([fp] { fclose(fp); });
// // No matter what, fclose(fp) will happen.
// DataObject d;
// while (ReadDataObject(fp, &d)) {
// if (d.IsBad()) {
// LOG(ERROR) << "Bad Data";
// return;
// }
// PushGoodData(d);
// }
// }
//
// You can use Cleanup<F> directly, instead of using MakeCleanup and auto,
// but there's rarely a reason to do that.
//
// You can call 'release()' on a Cleanup object to cancel the cleanup.

#ifndef TENSORFLOW_IO_CORE_PLUGINS_GS_CLEANUP_H_
#define TENSORFLOW_IO_CORE_PLUGINS_GS_CLEANUP_H_

#include <type_traits>
#include <utility>

namespace tensorflow {
namespace io {
namespace gs {

namespace tf_gcs_filesystem {

// A move-only RAII object that calls a stored cleanup functor when
// destroyed. Cleanup<F> is the return type of gtl::MakeCleanup(F).
template <typename F>
class Cleanup {
public:
Cleanup() : released_(true), f_() {}

template <typename G>
explicit Cleanup(G&& f) // NOLINT
: f_(std::forward<G>(f)) {} // NOLINT(build/c++11)

Cleanup(Cleanup&& src) // NOLINT
: released_(src.is_released()), f_(src.release()) {}

// Implicitly move-constructible from any compatible Cleanup<G>.
// The source will be released as if src.release() were called.
// A moved-from Cleanup can be safely destroyed or reassigned.
template <typename G>
Cleanup(Cleanup<G>&& src) // NOLINT
: released_(src.is_released()), f_(src.release()) {}

// Assignment to a Cleanup object behaves like destroying it
// and making a new one in its place, analogous to unique_ptr
// semantics.
Cleanup& operator=(Cleanup&& src) { // NOLINT
if (!released_) f_();
released_ = src.released_;
f_ = src.release();
return *this;
}

~Cleanup() {
if (!released_) f_();
}

// Releases the cleanup function instead of running it.
// Hint: use c.release()() to run early.
F release() {
released_ = true;
return std::move(f_);
}

bool is_released() const { return released_; }

private:
static_assert(!std::is_reference<F>::value, "F must not be a reference");

bool released_ = false;
F f_;
};

template <int&... ExplicitParameterBarrier, typename F,
typename DecayF = typename std::decay<F>::type>
Cleanup<DecayF> MakeCleanup(F&& f) {
return Cleanup<DecayF>(std::forward<F>(f));
}

} // namespace tf_gcs_filesystem

} // namespace gs
} // namespace io
} // namespace tensorflow

#endif // TENSORFLOW_IO_CORE_PLUGINS_GS_CLEANUP_H_
199 changes: 199 additions & 0 deletions tensorflow_io/core/plugins/gs/expiring_lru_cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
==============================================================================*/

#ifndef TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_
#define TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_

#include <functional>
#include <list>
#include <map>
#include <memory>
#include <string>

#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "tensorflow/c/tf_status.h"
#include "tensorflow_io/core/plugins/gs/gcs_env.h"

namespace tensorflow {
namespace io {
namespace gs {

namespace tf_gcs_filesystem {

/// \brief An LRU cache of string keys and arbitrary values, with configurable
/// max item age (in seconds) and max entries.
///
/// This class is thread safe.
template <typename T>
class ExpiringLRUCache {
public:
/// A `max_age` of 0 means that nothing is cached. A `max_entries` of 0 means
/// that there is no limit on the number of entries in the cache (however, if
/// `max_age` is also 0, the cache will not be populated).
ExpiringLRUCache(uint64_t max_age, size_t max_entries,
std::function<uint64_t()> timer_seconds = GCSNowSeconds)
: max_age_(max_age),
max_entries_(max_entries),
timer_seconds_(timer_seconds) {}

/// Insert `value` with key `key`. This will replace any previous entry with
/// the same key.
void Insert(const std::string& key, const T& value) {
if (max_age_ == 0) {
return;
}
absl::MutexLock lock(&mu_);
InsertLocked(key, value);
}

// Delete the entry with key `key`. Return true if the entry was found for
// `key`, false if the entry was not found. In both cases, there is no entry
// with key `key` existed after the call.
bool Delete(const std::string& key) {
absl::MutexLock lock(&mu_);
return DeleteLocked(key);
}

/// Look up the entry with key `key` and copy it to `value` if found. Returns
/// true if an entry was found for `key`, and its timestamp is not more than
/// max_age_ seconds in the past.
bool Lookup(const std::string& key, T* value) {
if (max_age_ == 0) {
return false;
}
absl::MutexLock lock(&mu_);
return LookupLocked(key, value);
}

typedef std::function<void(const std::string&, T*, TF_Status*)> ComputeFunc;

/// Look up the entry with key `key` and copy it to `value` if found. If not
/// found, call `compute_func`. If `compute_func` set `status` to `TF_OK`,
/// store a copy of the output parameter in the cache, and another copy in
/// `value`.
void LookupOrCompute(const std::string& key, T* value,
const ComputeFunc& compute_func, TF_Status* status) {
if (max_age_ == 0) {
return compute_func(key, value, status);
}

// Note: we hold onto mu_ for the rest of this function. In practice, this
// is okay, as stat requests are typically fast, and concurrent requests are
// often for the same file. Future work can split this up into one lock per
// key if this proves to be a significant performance bottleneck.
absl::MutexLock lock(&mu_);
if (LookupLocked(key, value)) {
return TF_SetStatus(status, TF_OK, "");
}
compute_func(key, value, status);
if (TF_GetCode(status) == TF_OK) {
InsertLocked(key, *value);
}
}

/// Clear the cache.
void Clear() {
absl::MutexLock lock(&mu_);
cache_.clear();
lru_list_.clear();
}

/// Accessors for cache parameters.
uint64_t max_age() const { return max_age_; }
size_t max_entries() const { return max_entries_; }

private:
struct Entry {
/// The timestamp (seconds) at which the entry was added to the cache.
uint64_t timestamp;

/// The entry's value.
T value;

/// A list iterator pointing to the entry's position in the LRU list.
std::list<std::string>::iterator lru_iterator;
};

bool LookupLocked(const std::string& key, T* value)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
auto it = cache_.find(key);
if (it == cache_.end()) {
return false;
}
lru_list_.erase(it->second.lru_iterator);
if (timer_seconds_() - it->second.timestamp > max_age_) {
cache_.erase(it);
return false;
}
*value = it->second.value;
lru_list_.push_front(it->first);
it->second.lru_iterator = lru_list_.begin();
return true;
}

void InsertLocked(const std::string& key, const T& value)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
lru_list_.push_front(key);
Entry entry{timer_seconds_(), value, lru_list_.begin()};
auto insert = cache_.insert(std::make_pair(key, entry));
if (!insert.second) {
lru_list_.erase(insert.first->second.lru_iterator);
insert.first->second = entry;
} else if (max_entries_ > 0 && cache_.size() > max_entries_) {
cache_.erase(lru_list_.back());
lru_list_.pop_back();
}
}

bool DeleteLocked(const std::string& key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
auto it = cache_.find(key);
if (it == cache_.end()) {
return false;
}
lru_list_.erase(it->second.lru_iterator);
cache_.erase(it);
return true;
}

/// The maximum age of entries in the cache, in seconds. A value of 0 means
/// that no entry is ever placed in the cache.
const uint64_t max_age_;

/// The maximum number of entries in the cache. A value of 0 means there is no
/// limit on entry count.
const size_t max_entries_;

/// The callback to read timestamps.
std::function<uint64_t()> timer_seconds_;

/// Guards access to the cache and the LRU list.
absl::Mutex mu_;

/// The cache (a map from string key to Entry).
std::map<std::string, Entry> cache_ ABSL_GUARDED_BY(mu_);

/// The LRU list of entries. The front of the list identifies the most
/// recently accessed entry.
std::list<std::string> lru_list_ ABSL_GUARDED_BY(mu_);
};

} // namespace tf_gcs_filesystem

} // namespace gs
} // namespace io
} // namespace tensorflow

#endif // TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_EXPIRING_LRU_CACHE_H_
Loading