-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable S3 compliant data vaults using https and http #8167
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -123,7 +123,7 @@ | |
"ts-coverage": "typescript-coverage-report", | ||
"find-cyclic-dependencies": "yarn run dpdm -T --tree false --warning false --extensions .ts,.tsx frontend/javascripts/main.tsx", | ||
"check-cyclic-dependencies": "node ./tools/check-cyclic-dependencies.js", | ||
"startf": "yarn rm-fossil-lock; yarn kill-listeners; yarn start", | ||
"startf": "yarn rm-fossil-lock; yarn kill-listeners; rm -r webknossos-jni/target; yarn start", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you mean to commit this? Seems like some that is covered by There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, because this is always annoying me because the webknossos-jni folder causes an error after using docker compose up (e.g. e2e tests). startf is "start forcefully", and this directory in particular is sometimes leading to me not being able to start. |
||
"beautify-front": "yarn fix-frontend && yarn typecheck", | ||
"beautify": "yarn format-backend && yarn beautify-front" | ||
}, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -93,11 +93,14 @@ class DataVaultTestSuite extends PlaySpec { | |
"using S3 data vault" should { | ||
"return correct response" in { | ||
val uri = new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/") | ||
val vaultPath = new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None))) | ||
val bytes = | ||
(vaultPath / "s0/5/5/5").readBytes(Some(range))(globalExecutionContext).get(handleFoxJustification) | ||
assert(bytes.length == range.length) | ||
assert(bytes.take(10).sameElements(Array(0, 0, 0, 3, 0, 0, 0, 64, 0, 0))) | ||
WsTestClient.withClient { ws => | ||
val vaultPath = | ||
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)) | ||
val bytes = | ||
(vaultPath / "s0/5/5/5").readBytes(Some(range))(globalExecutionContext).get(handleFoxJustification) | ||
assert(bytes.length == range.length) | ||
assert(bytes.take(10).sameElements(Array(0, 0, 0, 3, 0, 0, 0, 64, 0, 0))) | ||
} | ||
Comment on lines
+96
to
+103
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Add test coverage for HTTPS fallback mechanism and edge cases. While the basic range request test is good, consider adding test cases for:
Example test structure: "when vault supports HTTPS" should {
"fallback to HTTPS successfully" in {
WsTestClient.withClient { ws =>
val uri = new URI("http://https-supporting-bucket.example.com")
val vaultPath = new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws))
// Test HTTPS fallback
}
}
} |
||
} | ||
} | ||
} | ||
|
@@ -135,59 +138,69 @@ class DataVaultTestSuite extends PlaySpec { | |
"using s3 data vault" should { | ||
"return correctly decoded brotli-compressed data" in { | ||
val uri = new URI("s3://open-neurodata/bock11/image/4_4_40") | ||
val vaultPath = new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None))) | ||
val bytes = | ||
(vaultPath / "33792-34304_29696-30208_3216-3232") | ||
.readBytes()(globalExecutionContext) | ||
.get(handleFoxJustification) | ||
assert(bytes.take(10).sameElements(Array(-87, -95, -85, -94, -101, 124, 115, 100, 113, 111))) | ||
WsTestClient.withClient { ws => | ||
val vaultPath = | ||
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)) | ||
val bytes = | ||
(vaultPath / "33792-34304_29696-30208_3216-3232") | ||
.readBytes()(globalExecutionContext) | ||
.get(handleFoxJustification) | ||
assert(bytes.take(10).sameElements(Array(-87, -95, -85, -94, -101, 124, 115, 100, 113, 111))) | ||
} | ||
} | ||
|
||
"return empty box" when { | ||
"requesting a non-existent bucket" in { | ||
val uri = new URI(s"s3://non-existent-bucket${UUID.randomUUID}/non-existent-object") | ||
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None)) | ||
val vaultPath = new VaultPath(uri, s3DataVault) | ||
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification) | ||
assertBoxEmpty(result) | ||
WsTestClient.withClient { ws => | ||
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext) | ||
val vaultPath = new VaultPath(uri, s3DataVault) | ||
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification) | ||
assertBoxEmpty(result) | ||
} | ||
} | ||
} | ||
|
||
"return empty box" when { | ||
"requesting a non-existent object in existent bucket" in { | ||
val uri = new URI(s"s3://open-neurodata/non-existent-object${UUID.randomUUID}") | ||
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None)) | ||
val vaultPath = new VaultPath(uri, s3DataVault) | ||
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification) | ||
assertBoxEmpty(result) | ||
WsTestClient.withClient { ws => | ||
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext) | ||
val vaultPath = new VaultPath(uri, s3DataVault) | ||
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification) | ||
assertBoxEmpty(result) | ||
} | ||
Comment on lines
+155
to
+172
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Expand error handling test coverage. While basic error cases are covered, consider adding tests for:
Example test: "handle network errors gracefully" in {
WsTestClient.withClient { ws =>
// Mock WS client to simulate network errors
val mockWs = // ... setup mock
val uri = new URI("s3://example-bucket")
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), mockWs)
val vaultPath = new VaultPath(uri, s3DataVault)
val result = vaultPath.readBytes()(globalExecutionContext).await(handleFoxJustification)
assertBoxFailure(result)
}
} |
||
} | ||
} | ||
} | ||
} | ||
|
||
"using directory list requests" when { | ||
val uri = new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/") | ||
val vaultPath = new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None))) | ||
|
||
"using s3 data vault" should { | ||
"list available directories" in { | ||
val result = vaultPath.listDirectory(maxItems = 3)(globalExecutionContext).get(handleFoxJustification) | ||
assert(result.length == 3) | ||
assert( | ||
result.exists( | ||
_.toUri == new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/s0/"))) | ||
} | ||
WsTestClient.withClient { ws => | ||
val vaultPath = | ||
new VaultPath(uri, S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext)) | ||
|
||
"using s3 data vault" should { | ||
"list available directories" in { | ||
val result = vaultPath.listDirectory(maxItems = 3)(globalExecutionContext).get(handleFoxJustification) | ||
assert(result.length == 3) | ||
assert( | ||
result.exists( | ||
_.toUri == new URI("s3://janelia-cosem-datasets/jrc_hela-3/jrc_hela-3.n5/em/fibsem-uint16/s0/"))) | ||
} | ||
|
||
"return failure" when { | ||
"requesting directory listing on non-existent bucket" in { | ||
val uri = new URI(f"s3://non-existent-bucket${UUID.randomUUID}/non-existent-object/") | ||
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None)) | ||
val vaultPath = new VaultPath(uri, s3DataVault) | ||
val result = vaultPath.listDirectory(maxItems = 5)(globalExecutionContext).await(handleFoxJustification) | ||
assertBoxFailure(result) | ||
"return failure" when { | ||
"requesting directory listing on non-existent bucket" in { | ||
val uri = new URI(f"s3://non-existent-bucket${UUID.randomUUID}/non-existent-object/") | ||
val s3DataVault = S3DataVault.create(RemoteSourceDescriptor(uri, None), ws)(globalExecutionContext) | ||
val vaultPath = new VaultPath(uri, s3DataVault) | ||
val result = vaultPath.listDirectory(maxItems = 5)(globalExecutionContext).await(handleFoxJustification) | ||
assertBoxFailure(result) | ||
} | ||
} | ||
} | ||
|
||
} | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
package com.scalableminds.webknossos.datastore.datavault | ||
|
||
import com.scalableminds.util.tools.Fox | ||
import com.scalableminds.util.tools.Fox.box2Fox | ||
import com.scalableminds.util.tools.Fox.{box2Fox, future2Fox} | ||
import com.scalableminds.webknossos.datastore.storage.{ | ||
LegacyDataVaultCredential, | ||
RemoteSourceDescriptor, | ||
|
@@ -10,6 +10,7 @@ import com.scalableminds.webknossos.datastore.storage.{ | |
import net.liftweb.common.Box.tryo | ||
import net.liftweb.common.{Box, Empty, Full, Failure => BoxFailure} | ||
import org.apache.commons.lang3.builder.HashCodeBuilder | ||
import play.api.libs.ws.WSClient | ||
import software.amazon.awssdk.auth.credentials.{ | ||
AnonymousCredentialsProvider, | ||
AwsBasicCredentials, | ||
|
@@ -41,14 +42,18 @@ import scala.jdk.FutureConverters._ | |
import scala.jdk.OptionConverters.RichOptional | ||
import scala.util.{Failure => TryFailure, Success => TrySuccess} | ||
|
||
class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI) extends DataVault { | ||
class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], | ||
uri: URI, | ||
ws: WSClient, | ||
implicit val ec: ExecutionContext) | ||
extends DataVault { | ||
Comment on lines
+45
to
+49
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential Performance Impact During Initialization The constructor now includes a network call to determine the protocol using Consider deferring the protocol determination until it is actually needed or making the network call asynchronously to avoid blocking the main execution thread. |
||
private lazy val bucketName = S3DataVault.hostBucketFromUri(uri) match { | ||
case Some(value) => value | ||
case None => throw new Exception(s"Could not parse S3 bucket for ${uri.toString}") | ||
} | ||
|
||
private lazy val client: S3AsyncClient = | ||
S3DataVault.getAmazonS3Client(s3AccessKeyCredential, uri) | ||
private lazy val clientFox: Fox[S3AsyncClient] = | ||
S3DataVault.getAmazonS3Client(s3AccessKeyCredential, uri, ws) | ||
Comment on lines
+55
to
+56
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Error Handling for Client Initialization Failures The Ensure that failures during client initialization are properly propagated and handled. Consider eagerly initializing the client and handling errors to prevent silent failures during runtime. |
||
|
||
private def getRangeRequest(bucketName: String, key: String, range: NumericRange[Long]): GetObjectRequest = | ||
GetObjectRequest.builder().bucket(bucketName).key(key).range(s"bytes=${range.start}-${range.end - 1}").build() | ||
|
@@ -64,6 +69,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI | |
val responseTransformer: AsyncResponseTransformer[GetObjectResponse, ResponseBytes[GetObjectResponse]] = | ||
AsyncResponseTransformer.toBytes | ||
for { | ||
client <- clientFox | ||
responseBytesObject: ResponseBytes[GetObjectResponse] <- notFoundToEmpty( | ||
client.getObject(request, responseTransformer).asScala) | ||
encoding = responseBytesObject.response().contentEncoding() | ||
|
@@ -122,6 +128,7 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI | |
val listObjectsRequest = | ||
ListObjectsV2Request.builder().bucket(bucketName).prefix(keyPrefix).delimiter("/").maxKeys(maxKeys).build() | ||
for { | ||
client <- clientFox | ||
objectListing: ListObjectsV2Response <- notFoundToFailure(client.listObjectsV2(listObjectsRequest).asScala) | ||
s3SubPrefixes: List[CommonPrefix] = objectListing.commonPrefixes().asScala.take(maxItems).toList | ||
} yield s3SubPrefixes.map(_.prefix()) | ||
|
@@ -140,13 +147,14 @@ class S3DataVault(s3AccessKeyCredential: Option[S3AccessKeyCredential], uri: URI | |
} | ||
|
||
object S3DataVault { | ||
def create(remoteSourceDescriptor: RemoteSourceDescriptor): S3DataVault = { | ||
def create(remoteSourceDescriptor: RemoteSourceDescriptor, ws: WSClient)( | ||
implicit ec: ExecutionContext): S3DataVault = { | ||
val credential = remoteSourceDescriptor.credential.flatMap { | ||
case f: S3AccessKeyCredential => Some(f) | ||
case f: LegacyDataVaultCredential => Some(f.toS3AccessKey) | ||
case _ => None | ||
} | ||
new S3DataVault(credential, remoteSourceDescriptor.uri) | ||
new S3DataVault(credential, remoteSourceDescriptor.uri, ws, ec) | ||
Comment on lines
+150
to
+157
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Constructor Change May Break Existing Instantiations The Ensure all instantiations of |
||
} | ||
|
||
private def hostBucketFromUri(uri: URI): Option[String] = { | ||
|
@@ -201,16 +209,34 @@ object S3DataVault { | |
private def isNonAmazonHost(uri: URI): Boolean = | ||
(isPathStyle(uri) && !uri.getHost.endsWith(".amazonaws.com")) || uri.getHost == "localhost" | ||
|
||
private def getAmazonS3Client(credentialOpt: Option[S3AccessKeyCredential], uri: URI): S3AsyncClient = { | ||
private def determineProtocol(uri: URI, ws: WSClient)(implicit ec: ExecutionContext): Fox[String] = { | ||
// If the endpoint supports HTTPS, use it. Otherwise, use HTTP. | ||
val httpsUri = new URI("https", uri.getAuthority, "", "", "") | ||
val httpsFuture = ws.url(httpsUri.toString).get() | ||
|
||
val protocolFuture = httpsFuture.transformWith({ | ||
case TrySuccess(_) => Future.successful("https") | ||
case TryFailure(_) => Future.successful("http") | ||
}) | ||
Comment on lines
+216
to
+220
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Exception Handling May Mask Underlying Issues In the Refine the exception handling to log the specific reasons for HTTPS failures. This can aid in debugging and ensure that genuine issues are not overlooked. + import play.api.Logger
...
val protocolFuture = httpsFuture.transformWith {
case TrySuccess(_) => Future.successful("https")
case TryFailure(exception) =>
+ Logger.warn(s"HTTPS request failed: ${exception.getMessage}")
Future.successful("http")
}
|
||
for { | ||
protocol <- protocolFuture.toFox | ||
} yield protocol | ||
} | ||
Comment on lines
+212
to
+224
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Robustness of Protocol Determination Logic The Consider implementing a more reliable mechanism for protocol detection, such as:
|
||
|
||
private def getAmazonS3Client(credentialOpt: Option[S3AccessKeyCredential], uri: URI, ws: WSClient)( | ||
implicit ec: ExecutionContext): Fox[S3AsyncClient] = { | ||
val basic = | ||
S3AsyncClient.builder().credentialsProvider(getCredentialsProvider(credentialOpt)).crossRegionAccessEnabled(true) | ||
if (isNonAmazonHost(uri)) | ||
basic | ||
.forcePathStyle(true) | ||
.endpointOverride(new URI(s"http://${uri.getAuthority}")) | ||
.region(AwsHostNameUtils.parseSigningRegion(uri.getAuthority, "s3").toScala.getOrElse(Region.US_EAST_1)) | ||
.build() | ||
else basic.region(Region.US_EAST_1).build() | ||
if (isNonAmazonHost(uri)) { | ||
for { | ||
protocol <- determineProtocol(uri, ws) | ||
} yield | ||
basic | ||
.forcePathStyle(true) | ||
.endpointOverride(new URI(s"${protocol}://${uri.getAuthority}")) | ||
.region(AwsHostNameUtils.parseSigningRegion(uri.getAuthority, "s3").toScala.getOrElse(Region.US_EAST_1)) | ||
.build() | ||
} else Fox.successful(basic.region(Region.US_EAST_1).build()) | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance and relocate the changelog entry.
The changelog entry should be moved to the "Added" section since it represents a new feature rather than a fix. Additionally, the description should be expanded to better explain the functionality.
Apply this diff to improve the changelog entry: