From 116dc3975ae50c0f4bce2ca3b8a2c42682eaa41b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=B5=20V=C4=83n=20Ngh=C4=A9a?= Date: Sun, 28 Mar 2021 16:26:32 +0200 Subject: [PATCH 1/3] lazy loading for `s3` environements variables --- .../core/plugins/s3/s3_filesystem.cc | 45 +++++++++---------- tensorflow_io/core/plugins/s3/s3_filesystem.h | 7 +-- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/tensorflow_io/core/plugins/s3/s3_filesystem.cc b/tensorflow_io/core/plugins/s3/s3_filesystem.cc index b1b1736f5..f8039c39b 100644 --- a/tensorflow_io/core/plugins/s3/s3_filesystem.cc +++ b/tensorflow_io/core/plugins/s3/s3_filesystem.cc @@ -241,6 +241,9 @@ static void GetS3Client(tf_s3_filesystem::S3File* s3_file) { tf_s3_filesystem::AWSLogSystem::ShutdownAWSLogging(); } }); + int temp_value; + if (absl::SimpleAtoi(getenv("S3_DISABLE_MULTI_PART_DOWNLOAD"), &temp_value)) + s3_file->use_multi_part_download = (temp_value != 1); } } @@ -263,15 +266,26 @@ static void GetTransferManager( absl::MutexLock l(&s3_file->initialization_lock); - if (s3_file->transfer_managers[direction].get() == nullptr) { + if (s3_file->transfer_managers.count(direction) == 0) { + uint64_t temp_value; + if (direction == Aws::Transfer::TransferDirection::UPLOAD) { + if (!absl::SimpleAtoi(getenv("S3_MULTI_PART_UPLOAD_CHUNK_SIZE"), + &temp_value)) + temp_value = kS3MultiPartUploadChunkSize; + } else if (direction == Aws::Transfer::TransferDirection::DOWNLOAD) { + if (!absl::SimpleAtoi(getenv("S3_MULTI_PART_DOWNLOAD_CHUNK_SIZE"), + &temp_value)) + temp_value = kS3MultiPartDownloadChunkSize; + } + s3_file->multi_part_chunk_sizes.emplace(direction, temp_value); + Aws::Transfer::TransferManagerConfiguration config(s3_file->executor.get()); config.s3Client = s3_file->s3_client; - config.bufferSize = s3_file->multi_part_chunk_sizes[direction]; + config.bufferSize = temp_value; // must be larger than pool size * multi part chunk size - config.transferBufferMaxHeapSize = - (kExecutorPoolSize + 1) * s3_file->multi_part_chunk_sizes[direction]; - s3_file->transfer_managers[direction] = - Aws::Transfer::TransferManager::Create(config); + config.transferBufferMaxHeapSize = (kExecutorPoolSize + 1) * temp_value; + s3_file->transfer_managers.emplace( + direction, Aws::Transfer::TransferManager::Create(config)); } } @@ -529,24 +543,7 @@ S3File::S3File() transfer_managers(), multi_part_chunk_sizes(), use_multi_part_download(false), // TODO: change to true after fix - initialization_lock() { - uint64_t temp_value; - multi_part_chunk_sizes[Aws::Transfer::TransferDirection::UPLOAD] = - absl::SimpleAtoi(getenv("S3_MULTI_PART_UPLOAD_CHUNK_SIZE"), &temp_value) - ? temp_value - : kS3MultiPartUploadChunkSize; - multi_part_chunk_sizes[Aws::Transfer::TransferDirection::DOWNLOAD] = - absl::SimpleAtoi(getenv("S3_MULTI_PART_DOWNLOAD_CHUNK_SIZE"), &temp_value) - ? temp_value - : kS3MultiPartDownloadChunkSize; - use_multi_part_download = - absl::SimpleAtoi(getenv("S3_DISABLE_MULTI_PART_DOWNLOAD"), &temp_value) - ? (temp_value != 1) - : use_multi_part_download; - transfer_managers.emplace(Aws::Transfer::TransferDirection::UPLOAD, nullptr); - transfer_managers.emplace(Aws::Transfer::TransferDirection::DOWNLOAD, - nullptr); -} + initialization_lock() {} void Init(TF_Filesystem* filesystem, TF_Status* status) { filesystem->plugin_filesystem = new S3File(); TF_SetStatus(status, TF_OK, ""); diff --git a/tensorflow_io/core/plugins/s3/s3_filesystem.h b/tensorflow_io/core/plugins/s3/s3_filesystem.h index 85e71b0d6..882785854 100644 --- a/tensorflow_io/core/plugins/s3/s3_filesystem.h +++ b/tensorflow_io/core/plugins/s3/s3_filesystem.h @@ -60,11 +60,12 @@ typedef struct S3File { std::shared_ptr s3_client; std::shared_ptr executor; // We need 2 `TransferManager`, for multipart upload/download. - Aws::Map> + Aws::UnorderedMap> transfer_managers; // Sizes to split objects during multipart upload/download. - Aws::Map multi_part_chunk_sizes; + Aws::UnorderedMap + multi_part_chunk_sizes; bool use_multi_part_download; absl::Mutex initialization_lock; S3File(); From 6e9e86d450fc1698beaa3040fc747476646cf01a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=B5=20V=C4=83n=20Ngh=C4=A9a?= Date: Sun, 28 Mar 2021 17:37:32 +0200 Subject: [PATCH 2/3] `S3_ENDPOINT` supports http/https --- tensorflow_io/core/plugins/s3/s3_filesystem.cc | 6 ++++-- tests/test_s3.py | 4 +--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tensorflow_io/core/plugins/s3/s3_filesystem.cc b/tensorflow_io/core/plugins/s3/s3_filesystem.cc index f8039c39b..5d77b91ad 100644 --- a/tensorflow_io/core/plugins/s3/s3_filesystem.cc +++ b/tensorflow_io/core/plugins/s3/s3_filesystem.cc @@ -135,8 +135,6 @@ static Aws::Client::ClientConfiguration& GetDefaultClientConfig() { absl::MutexLock l(&cfg_lock); if (!init) { - const char* endpoint = getenv("S3_ENDPOINT"); - if (endpoint) cfg.endpointOverride = Aws::String(endpoint); const char* region = getenv("AWS_REGION"); // TODO (yongtang): `S3_REGION` should be deprecated after 2.0. if (!region) region = getenv("S3_REGION"); @@ -241,9 +239,13 @@ static void GetS3Client(tf_s3_filesystem::S3File* s3_file) { tf_s3_filesystem::AWSLogSystem::ShutdownAWSLogging(); } }); + int temp_value; if (absl::SimpleAtoi(getenv("S3_DISABLE_MULTI_PART_DOWNLOAD"), &temp_value)) s3_file->use_multi_part_download = (temp_value != 1); + + const char* endpoint = getenv("S3_ENDPOINT"); + if (endpoint) s3_file->s3_client->OverrideEndpoint(endpoint); } } diff --git a/tests/test_s3.py b/tests/test_s3.py index 6f008f3a9..08f928380 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -51,9 +51,7 @@ def test_read_file(): response = client.get_object(Bucket=bucket_name, Key=key_name) assert response["Body"].read() == body - os.environ["S3_ENDPOINT"] = "localhost:4566" - os.environ["S3_USE_HTTPS"] = "0" - os.environ["S3_VERIFY_SSL"] = "0" + os.environ["S3_ENDPOINT"] = "http://localhost:4566" content = tf.io.read_file("s3://{}/{}".format(bucket_name, key_name)) assert content == body From ec3db33075e36fb4e72458ed05434beaf4973738 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=B5=20V=C4=83n=20Ngh=C4=A9a?= Date: Mon, 29 Mar 2021 19:14:07 +0200 Subject: [PATCH 3/3] remove `S3_USE_HTTPS` and `S3_VERIFY_SSL` --- tensorflow_io/core/plugins/s3/s3_filesystem.cc | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/tensorflow_io/core/plugins/s3/s3_filesystem.cc b/tensorflow_io/core/plugins/s3/s3_filesystem.cc index 5d77b91ad..17f4ce084 100644 --- a/tensorflow_io/core/plugins/s3/s3_filesystem.cc +++ b/tensorflow_io/core/plugins/s3/s3_filesystem.cc @@ -166,20 +166,6 @@ static Aws::Client::ClientConfiguration& GetDefaultClientConfig() { cfg.region = profiles["default"].GetRegion(); } } - const char* use_https = getenv("S3_USE_HTTPS"); - if (use_https) { - if (use_https[0] == '0') - cfg.scheme = Aws::Http::Scheme::HTTP; - else - cfg.scheme = Aws::Http::Scheme::HTTPS; - } - const char* verify_ssl = getenv("S3_VERIFY_SSL"); - if (verify_ssl) { - if (verify_ssl[0] == '0') - cfg.verifySSL = false; - else - cfg.verifySSL = true; - } // if these timeouts are low, you may see an error when // uploading/downloading large files: Unable to connect to endpoint int64_t timeout;