Skip to content

Commit 85c90b3

Browse files
authored
Initial commit of s3 modular file system plugin (credit @vnvo2409) (#1191)
* Initial commit of s3 modular file system plugin (credit vnvo2409) Signed-off-by: Yong Tang <[email protected]> * Disable use_multi_part_download for now, and several fixes Signed-off-by: Yong Tang <[email protected]>
1 parent 21ca5c1 commit 85c90b3

File tree

11 files changed

+1865
-3
lines changed

11 files changed

+1865
-3
lines changed

tensorflow_io/core/plugins/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ cc_library(
3131
deps = [
3232
"//tensorflow_io/core/plugins/az",
3333
"//tensorflow_io/core/plugins/http",
34+
"//tensorflow_io/core/plugins/s3",
3435
],
3536
alwayslink = 1,
3637
)

tensorflow_io/core/plugins/file_system_plugins.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,11 @@ limitations under the License.
1818
void TF_InitPlugin(TF_FilesystemPluginInfo* info) {
1919
info->plugin_memory_allocate = tensorflow::io::plugin_memory_allocate;
2020
info->plugin_memory_free = tensorflow::io::plugin_memory_free;
21-
info->num_schemes = 2;
21+
info->num_schemes = 3;
2222
info->ops = static_cast<TF_FilesystemPluginOps*>(
2323
tensorflow::io::plugin_memory_allocate(info->num_schemes *
2424
sizeof(info->ops[0])));
2525
tensorflow::io::az::ProvideFilesystemSupportFor(&info->ops[0], "az");
2626
tensorflow::io::http::ProvideFilesystemSupportFor(&info->ops[1], "http");
27+
tensorflow::io::s3::ProvideFilesystemSupportFor(&info->ops[2], "s3e");
2728
}

tensorflow_io/core/plugins/file_system_plugins.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);
3838

3939
} // namespace http
4040

41+
namespace s3 {
42+
43+
void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri);
44+
45+
} // namespace s3
46+
4147
} // namespace io
4248
} // namespace tensorflow
4349

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
licenses(["notice"]) # Apache 2.0
2+
3+
package(default_visibility = ["//visibility:public"])
4+
5+
load(
6+
"//:tools/build/tensorflow_io.bzl",
7+
"tf_io_copts",
8+
)
9+
10+
cc_library(
11+
name = "s3",
12+
srcs = [
13+
"aws_crypto.cc",
14+
"aws_crypto.h",
15+
"aws_logging.cc",
16+
"aws_logging.h",
17+
"s3_filesystem.cc",
18+
"s3_filesystem.h",
19+
],
20+
copts = tf_io_copts(),
21+
linkstatic = True,
22+
deps = [
23+
"//tensorflow_io/core/plugins:plugins_header",
24+
"@aws-sdk-cpp",
25+
"@com_google_absl//absl/strings",
26+
"@com_google_absl//absl/synchronization",
27+
],
28+
alwayslink = 1,
29+
)
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
==============================================================================*/
15+
#include "tensorflow_io/core/plugins/s3/aws_crypto.h"
16+
17+
#include <aws/core/utils/crypto/HashResult.h>
18+
#include <aws/s3/S3Client.h>
19+
#include <openssl/hmac.h>
20+
#include <openssl/rand.h>
21+
#include <openssl/sha.h>
22+
23+
namespace tensorflow {
24+
namespace io {
25+
namespace s3 {
26+
namespace tf_s3_filesystem {
27+
28+
namespace {
29+
class AWSSha256HMACOpenSSLImpl : public Aws::Utils::Crypto::HMAC {
30+
public:
31+
AWSSha256HMACOpenSSLImpl() {}
32+
33+
virtual ~AWSSha256HMACOpenSSLImpl() = default;
34+
35+
Aws::Utils::Crypto::HashResult Calculate(
36+
const Aws::Utils::ByteBuffer& toSign,
37+
const Aws::Utils::ByteBuffer& secret) override {
38+
unsigned int length = SHA256_DIGEST_LENGTH;
39+
Aws::Utils::ByteBuffer digest(length);
40+
memset(digest.GetUnderlyingData(), 0, length);
41+
42+
HMAC_CTX ctx;
43+
HMAC_CTX_init(&ctx);
44+
45+
HMAC_Init_ex(&ctx, secret.GetUnderlyingData(),
46+
static_cast<int>(secret.GetLength()), EVP_sha256(), NULL);
47+
HMAC_Update(&ctx, toSign.GetUnderlyingData(), toSign.GetLength());
48+
HMAC_Final(&ctx, digest.GetUnderlyingData(), &length);
49+
HMAC_CTX_cleanup(&ctx);
50+
51+
return Aws::Utils::Crypto::HashResult(std::move(digest));
52+
}
53+
};
54+
55+
class AWSSha256OpenSSLImpl : public Aws::Utils::Crypto::Hash {
56+
public:
57+
AWSSha256OpenSSLImpl() {}
58+
59+
virtual ~AWSSha256OpenSSLImpl() = default;
60+
61+
Aws::Utils::Crypto::HashResult Calculate(const Aws::String& str) override {
62+
SHA256_CTX sha256;
63+
SHA256_Init(&sha256);
64+
SHA256_Update(&sha256, str.data(), str.size());
65+
66+
Aws::Utils::ByteBuffer hash(SHA256_DIGEST_LENGTH);
67+
SHA256_Final(hash.GetUnderlyingData(), &sha256);
68+
69+
return Aws::Utils::Crypto::HashResult(std::move(hash));
70+
}
71+
72+
Aws::Utils::Crypto::HashResult Calculate(Aws::IStream& stream) override {
73+
SHA256_CTX sha256;
74+
SHA256_Init(&sha256);
75+
76+
auto currentPos = stream.tellg();
77+
if (currentPos == std::streampos(std::streamoff(-1))) {
78+
currentPos = 0;
79+
stream.clear();
80+
}
81+
82+
stream.seekg(0, stream.beg);
83+
84+
char streamBuffer
85+
[Aws::Utils::Crypto::Hash::INTERNAL_HASH_STREAM_BUFFER_SIZE];
86+
while (stream.good()) {
87+
stream.read(streamBuffer,
88+
Aws::Utils::Crypto::Hash::INTERNAL_HASH_STREAM_BUFFER_SIZE);
89+
auto bytesRead = stream.gcount();
90+
91+
if (bytesRead > 0) {
92+
SHA256_Update(&sha256, streamBuffer, static_cast<size_t>(bytesRead));
93+
}
94+
}
95+
96+
stream.clear();
97+
stream.seekg(currentPos, stream.beg);
98+
99+
Aws::Utils::ByteBuffer hash(SHA256_DIGEST_LENGTH);
100+
SHA256_Final(hash.GetUnderlyingData(), &sha256);
101+
102+
return Aws::Utils::Crypto::HashResult(std::move(hash));
103+
}
104+
};
105+
106+
class AWSSecureRandomBytesImpl : public Aws::Utils::Crypto::SecureRandomBytes {
107+
public:
108+
AWSSecureRandomBytesImpl() {}
109+
virtual ~AWSSecureRandomBytesImpl() = default;
110+
void GetBytes(unsigned char* buffer, size_t bufferSize) override {
111+
assert(buffer);
112+
int success = RAND_bytes(buffer, static_cast<int>(bufferSize));
113+
if (success != 1) {
114+
m_failure = true;
115+
}
116+
}
117+
118+
private:
119+
bool m_failure;
120+
};
121+
122+
} // namespace
123+
124+
std::shared_ptr<Aws::Utils::Crypto::Hash>
125+
AWSSHA256Factory::CreateImplementation() const {
126+
return Aws::MakeShared<AWSSha256OpenSSLImpl>(AWSCryptoAllocationTag);
127+
}
128+
129+
std::shared_ptr<Aws::Utils::Crypto::HMAC>
130+
AWSSHA256HmacFactory::CreateImplementation() const {
131+
return Aws::MakeShared<AWSSha256HMACOpenSSLImpl>(AWSCryptoAllocationTag);
132+
}
133+
134+
std::shared_ptr<Aws::Utils::Crypto::SecureRandomBytes>
135+
AWSSecureRandomFactory::CreateImplementation() const {
136+
return Aws::MakeShared<AWSSecureRandomBytesImpl>(AWSCryptoAllocationTag);
137+
}
138+
139+
} // namespace tf_s3_filesystem
140+
} // namespace s3
141+
} // namespace io
142+
} // namespace tensorflow
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/* Copyright 2020 The TensorFlow Authors. All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
==============================================================================*/
15+
#ifndef TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_S3_AWS_CRYPTO_H_
16+
#define TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_S3_AWS_CRYPTO_H_
17+
18+
#include <aws/core/Aws.h>
19+
#include <aws/core/utils/crypto/Factories.h>
20+
#include <aws/core/utils/crypto/HMAC.h>
21+
#include <aws/core/utils/crypto/Hash.h>
22+
#include <aws/core/utils/crypto/SecureRandom.h>
23+
24+
namespace tensorflow {
25+
namespace io {
26+
namespace s3 {
27+
namespace tf_s3_filesystem {
28+
constexpr char AWSCryptoAllocationTag[] = "AWSCryptoAllocation";
29+
30+
class AWSSHA256Factory : public Aws::Utils::Crypto::HashFactory {
31+
public:
32+
std::shared_ptr<Aws::Utils::Crypto::Hash> CreateImplementation()
33+
const override;
34+
};
35+
36+
class AWSSHA256HmacFactory : public Aws::Utils::Crypto::HMACFactory {
37+
public:
38+
std::shared_ptr<Aws::Utils::Crypto::HMAC> CreateImplementation()
39+
const override;
40+
};
41+
42+
class AWSSecureRandomFactory : public Aws::Utils::Crypto::SecureRandomFactory {
43+
public:
44+
std::shared_ptr<Aws::Utils::Crypto::SecureRandomBytes> CreateImplementation()
45+
const override;
46+
};
47+
48+
} // namespace tf_s3_filesystem
49+
} // namespace s3
50+
} // namespace io
51+
} // namespace tensorflow
52+
53+
#endif // TENSORFLOW_C_EXPERIMENTAL_FILESYSTEM_PLUGINS_S3_AWS_CRYPTO_H_

0 commit comments

Comments
 (0)