Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tensorflow_io/core/plugins/file_system_plugins.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[3], "hdfse");
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[4], "viewfse");
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[5], "hare");
tensorflow::io::gs::ProvideFilesystemSupportFor(&info->ops[6], "gse");
} else {
tensorflow::io::s3::ProvideFilesystemSupportFor(&info->ops[2], "s3");
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[3], "hdfs");
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[4], "viewfs");
tensorflow::io::hdfs::ProvideFilesystemSupportFor(&info->ops[5], "har");
tensorflow::io::gs::ProvideFilesystemSupportFor(&info->ops[6], "gs");
}
tensorflow::io::gs::ProvideFilesystemSupportFor(&info->ops[6], "gse");
}
2 changes: 0 additions & 2 deletions tensorflow_io/core/plugins/gs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ cc_library(
srcs = [
"cleanup.h",
"expiring_lru_cache.h",
"gcs_env.cc",
"gcs_env.h",
"gcs_filesystem.cc",
"gcs_helper.cc",
"gcs_helper.h",
Expand Down
4 changes: 2 additions & 2 deletions tensorflow_io/core/plugins/gs/expiring_lru_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ limitations under the License.

#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "tensorflow/c/env.h"
#include "tensorflow/c/tf_status.h"
#include "tensorflow_io/core/plugins/gs/gcs_env.h"

namespace tensorflow {
namespace io {
Expand All @@ -44,7 +44,7 @@ class ExpiringLRUCache {
/// that there is no limit on the number of entries in the cache (however, if
/// `max_age` is also 0, the cache will not be populated).
ExpiringLRUCache(uint64_t max_age, size_t max_entries,
std::function<uint64_t()> timer_seconds = GCSNowSeconds)
std::function<uint64_t()> timer_seconds = TF_NowSeconds)
: max_age_(max_age),
max_entries_(max_entries),
timer_seconds_(timer_seconds) {}
Expand Down
164 changes: 0 additions & 164 deletions tensorflow_io/core/plugins/gs/gcs_env.cc

This file was deleted.

45 changes: 0 additions & 45 deletions tensorflow_io/core/plugins/gs/gcs_env.h

This file was deleted.

11 changes: 11 additions & 0 deletions tensorflow_io/core/plugins/gs/gcs_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ limitations under the License.

#include <stdio.h>

#include <cstdlib>
#include <fstream>
#include <string>
#include <utility>

#include "tensorflow/c/env.h"

TempFile::TempFile(const std::string& temp_file_name, std::ios::openmode mode)
: std::fstream(temp_file_name, mode), name_(temp_file_name) {}

Expand All @@ -38,3 +41,11 @@ bool TempFile::truncate() {
std::fstream::open(name_, std::ios::binary | std::ios::out);
return std::fstream::is_open();
}

std::string GCSGetTempFileName(const std::string& extension) {
char* raw_temp_file_name = TF_GetTempFileName(extension.c_str());
if (!raw_temp_file_name) return "";
std::string temp_file_name(raw_temp_file_name);
std::free(raw_temp_file_name);
return temp_file_name;
}
2 changes: 2 additions & 0 deletions tensorflow_io/core/plugins/gs/gcs_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ class TempFile : public std::fstream {
const std::string name_;
};

std::string GCSGetTempFileName(const std::string& extension);

#endif // TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_GCS_GCS_HELPER_H_
14 changes: 7 additions & 7 deletions tensorflow_io/core/plugins/gs/ram_file_block_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ limitations under the License.
#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "absl/synchronization/notification.h"
#include "tensorflow/c/env.h"
#include "tensorflow/c/logging.h"
#include "tensorflow/c/tf_status.h"
#include "tensorflow_io/core/plugins/gs/gcs_env.h"

namespace tensorflow {
namespace io {
Expand All @@ -56,19 +56,19 @@ class RamFileBlockCache {

RamFileBlockCache(size_t block_size, size_t max_bytes, uint64_t max_staleness,
BlockFetcher block_fetcher,
std::function<uint64_t()> timer_seconds = GCSNowSeconds)
std::function<uint64_t()> timer_seconds = TF_NowSeconds)
: block_size_(block_size),
max_bytes_(max_bytes),
max_staleness_(max_staleness),
block_fetcher_(block_fetcher),
timer_seconds_(timer_seconds),
pruning_thread_(nullptr,
[](GCSThread* thread) { GCSJoinThread(thread); }) {
[](TF_Thread* thread) { TF_JoinThread(thread); }) {
if (max_staleness_ > 0) {
GCSThreadOptions thread_options;
GCSDefaultThreadOptions(&thread_options);
TF_ThreadOptions thread_options;
TF_DefaultThreadOptions(&thread_options);
pruning_thread_.reset(
GCSStartThread(&thread_options, "TF_prune_FBC", PruneThread, this));
TF_StartThread(&thread_options, "TF_prune_FBC", PruneThread, this));
}
TF_VLog(1, "GCS file block cache is %s.\n",
(IsCacheEnabled() ? "enabled" : "disabled"));
Expand Down Expand Up @@ -236,7 +236,7 @@ class RamFileBlockCache {
void RemoveBlock(BlockMap::iterator entry) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

/// The cache pruning thread that removes files with expired blocks.
std::unique_ptr<GCSThread, std::function<void(GCSThread*)>> pruning_thread_;
std::unique_ptr<TF_Thread, std::function<void(TF_Thread*)>> pruning_thread_;

/// Notification for stopping the cache pruning thread.
absl::Notification stop_pruning_thread_;
Expand Down
4 changes: 2 additions & 2 deletions tests/test_gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def test_read_file():

# Setup the GCS bucket and key
key_name = "TEST"
bucket_name = "gse{}e".format(int(time.time()))
bucket_name = "gs{}e".format(int(time.time()))
bucket = client.create_bucket(bucket_name)

blob = bucket.blob(key_name)
Expand All @@ -56,5 +56,5 @@ def test_read_file():

os.environ["CLOUD_STORAGE_TESTBENCH_ENDPOINT"] = "http://localhost:9099"

content = tf.io.read_file("gse://{}/{}".format(bucket_name, key_name))
content = tf.io.read_file("gs://{}/{}".format(bucket_name, key_name))
assert content == body