Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recordio storage client #285

Merged
merged 20 commits into from
Nov 6, 2024
Merged

Conversation

Marco-Premier
Copy link
Contributor

Closing #1861 and added RecordIoStorageClient to common-jvm as per @SanjayVas suggestion.

StorageClient that writes / reads Apache Mesos record io file format and uses StreamingAead for encrypting / decrypting operations.

The StorageClient streams an encrypted file and emit each record individually.

@wfa-reviewable
Copy link

This change is Reviewable

Copy link
Contributor

@stevenwarejones stevenwarejones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 6 files at r1, 1 of 3 files at r2, all commit messages.
Reviewable status: 2 of 6 files reviewed, 2 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @Marco-Premier, and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageclient.kt line 1 at r2 (raw file):

// Copyright 2024 The Cross-Media Measurement Authors

please capitalize the C in the file name ...torageClient.kt


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageclient.kt line 44 at r2 (raw file):

 * @param dataKey a base64-encoded symmetric data key
 */
class RecordIoStorageClient(

I was thinking you would have two classes

  1. StreamingKmsStorageClient
  2. RecordIoStorageClient

Then the RecordIoStorageClient is oblivious to the underlying encryption

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 2 of 8 files reviewed, 2 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @SanjayVas, and @stevenwarejones)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageclient.kt line 1 at r2 (raw file):

Previously, stevenwarejones (Steven Ware Jones) wrote…

please capitalize the C in the file name ...torageClient.kt

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageclient.kt line 44 at r2 (raw file):

Previously, stevenwarejones (Steven Ware Jones) wrote…

I was thinking you would have two classes

  1. StreamingKmsStorageClient
  2. RecordIoStorageClient

Then the RecordIoStorageClient is oblivious to the underlying encryption

Done.

Copy link
Contributor

@stevenwarejones stevenwarejones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 3 files at r2, 3 of 5 files at r3, all commit messages.
Reviewable status: 6 of 8 files reviewed, 5 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @Marco-Premier, and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 34 at r3 (raw file):

 * @param storageClient underlying client for accessing blob/object storage
 */
class RecordIoStorageClient(private val storageClient: StorageClient) : StorageClient {

MesosRecordIoStorageClient since this isn't necessarily compatible with other RecordIo formats


src/main/kotlin/org/wfanet/measurement/storage/testing/AbstractStreamingStorageClientTest.kt line 38 at r3 (raw file):

const val TARGET_SIZE = 228
/** Abstract base class for testing implementations of [StorageClient]. */
abstract class AbstractStreamingStorageClientTest<T : StorageClient> {

this is unused. is an abstract streaming storage client test needed?


src/test/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClientTest.kt line 32 at r3 (raw file):

@RunWith(JUnit4::class)
class RecordIoStorageClientTest {

this should extend the abstractstorageclient test


src/test/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClientTest.kt line 43 at r3 (raw file):

  }

  @Test

can you add

  1. a test where you write the raw bytes using a normal storage client and then read it with RecordIOStorageClient successfully?
  2. a test where invalid format is previously written and then read when a verified error

src/test/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClientTest.kt line 44 at r3 (raw file):

@RunWith(JUnit4::class)
class StreamingAeadStorageClientTest {

this should extend the abstractstorageclient test

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 3 files at r2, 1 of 5 files at r3, all commit messages.
Reviewable status: 6 of 8 files reviewed, 15 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @Marco-Premier)


MODULE.bazel.lock line 1583 at r3 (raw file):

        ]
      }
    },

There should be no changes to this file unless MODULE.bazel is also changed


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/BUILD.bazel line 9 at r3 (raw file):

    srcs = glob(["*.kt"]),
    deps = [
        "//imports/java/com/google/cloud/storage",

This should not depend directly on GCS.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 15 at r3 (raw file):

// limitations under the License.

package org.wfanet.measurement.securecomputation.teesdk.cloudstorage.v1alpha

This doesn't use any Tink primitives and can therefore move under measurement.storage, with the Kotlin package updated to match the Bazel package.

Suggestion:

package org.wfanet.measurement.common.crypto.tink

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 47 at r3 (raw file):

  *
  * @param blobKey The key (or name) of the blob where the content will be stored.
  * @param content A Flow<ByteString> representing the source of RecordIO rows that will be stored.

nit: the information that each item in the flow is a single record should go here


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 58 at r3 (raw file):

        val rawBytes = byteString.toByteArray()
        val recordSize = rawBytes.size.toString()
        val fullRecord = recordSize + "\n" + String(rawBytes, Charsets.UTF_8)

Extract a RECORD_DELIMITER constant?

Code quote:

"\n"

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 87 at r3 (raw file):

  /** A blob that will read the content in RecordIO format */
  private inner class RecordioBlob(private val blob: StorageClient.Blob, private val blobKey: String) :

nit: Just Blob is fine since this is both private and defined in the context of RecordIoStorageClient.

Suggestion:

Blob

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 110 at r3 (raw file):

      val buffer = StringBuilder()
      var currentRecordSize = -1
      var recordBuffer = ByteArrayOutputStream()

Use ByteString.Output to avoid unnecessary copies. It can be reset, so this can be a val

Suggestion:

val recordBuffer = ByteString.newOutput()

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 114 at r3 (raw file):

      blob.read().collect { chunk ->
        var position = 0
        val chunkString = chunk.toByteArray().toString(Charsets.UTF_8)

Suggestion:

toStringUtf8()

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 15 at r3 (raw file):

// limitations under the License.

package org.wfanet.measurement.securecomputation.teesdk.cloudstorage.v1alpha

Suggestion:

package org.wfanet.measurement.

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 28 at r3 (raw file):

import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

Wildcard imports are not allowed. See https://developer.android.com/kotlin/style-guide

Code quote:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 60 at r3 (raw file):

   */
  override suspend fun writeBlob(blobKey: String, content: Flow<ByteString>): StorageClient.Blob {
    val encryptedContent = flow {

We should be able to copy a lot of the read/write implementation from GcsStorageClient, as that also operates on ReadableByteChannel/WriteableByteChannel. In particular, use the extension functions in src/main/kotlin/org/wfanet/measurement/common/Bytes.kt for going between a byte channel and a Flow


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 118 at r3 (raw file):

      val chunkChannel = Channel<ByteString>(capacity = Channel.UNLIMITED)

      CoroutineScope(streamingAeadContext).launch {

We don't need async reading in a separate coroutine, meaning no need for a separate CoroutineScope/channel/launch call. The only reason for the separate CoroutineContext is that the default one may not be suited for blocking IO. All that's needed is to wrap any blocking IO calls in withContext(streamingAeadContext)

See GcsStorageClient as an example.

Copy link
Member

@kungfucraig kungfucraig left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 6 files at r1, all commit messages.
Reviewable status: 6 of 8 files reviewed, 16 unresolved discussions (waiting on @dawnwang22 and @Marco-Premier)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 72 at r3 (raw file):

    val wrappedBlob: StorageClient.Blob = storageClient.writeBlob(blobKey, processedContent)
    logger.fine { "Wrote content to storage with blobKey: $blobKey" }

Part of me would like to log the number of records written

Copy link
Member

@kungfucraig kungfucraig left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks to be shaping up well. I'm going to defer to @SanjayVas and @stevenwarejones at this point. If you need a final approval to unblock when they're done let me know.

Reviewable status: 6 of 8 files reviewed, 16 unresolved discussions (waiting on @dawnwang22 and @Marco-Premier)

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 11 files reviewed, 14 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @SanjayVas, and @stevenwarejones)


src/test/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClientTest.kt line 44 at r3 (raw file):

Previously, stevenwarejones (Steven Ware Jones) wrote…

this should extend the abstractstorageclient test

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/BUILD.bazel line 9 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

This should not depend directly on GCS.

Done.


src/main/kotlin/org/wfanet/measurement/storage/testing/AbstractStreamingStorageClientTest.kt line 38 at r3 (raw file):

Previously, stevenwarejones (Steven Ware Jones) wrote…

this is unused. is an abstract streaming storage client test needed?

Done.


src/test/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClientTest.kt line 32 at r3 (raw file):

Previously, stevenwarejones (Steven Ware Jones) wrote…

this should extend the abstractstorageclient test

This is slightly different form the KmsStorageClient class, as the generated blob is not a random array of bytes, nor the test for testing the size would work as writeBlob method adds the length of each record to the resulting blob.


src/test/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClientTest.kt line 43 at r3 (raw file):

Previously, stevenwarejones (Steven Ware Jones) wrote…

can you add

  1. a test where you write the raw bytes using a normal storage client and then read it with RecordIOStorageClient successfully?
  2. a test where invalid format is previously written and then read when a verified error

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 15 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

This doesn't use any Tink primitives and can therefore move under measurement.storage, with the Kotlin package updated to match the Bazel package.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 34 at r3 (raw file):

Previously, stevenwarejones (Steven Ware Jones) wrote…

MesosRecordIoStorageClient since this isn't necessarily compatible with other RecordIo formats

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 58 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Extract a RECORD_DELIMITER constant?

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 72 at r3 (raw file):

Previously, kungfucraig (Craig Wright) wrote…

Part of me would like to log the number of records written

totally agree!


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 110 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use ByteString.Output to avoid unnecessary copies. It can be reset, so this can be a val

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 114 at r3 (raw file):

      blob.read().collect { chunk ->
        var position = 0
        val chunkString = chunk.toByteArray().toString(Charsets.UTF_8)

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 15 at r3 (raw file):

// limitations under the License.

package org.wfanet.measurement.securecomputation.teesdk.cloudstorage.v1alpha

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 28 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Wildcard imports are not allowed. See https://developer.android.com/kotlin/style-guide

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 60 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

We should be able to copy a lot of the read/write implementation from GcsStorageClient, as that also operates on ReadableByteChannel/WriteableByteChannel. In particular, use the extension functions in src/main/kotlin/org/wfanet/measurement/common/Bytes.kt for going between a byte channel and a Flow

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 118 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

We don't need async reading in a separate coroutine, meaning no need for a separate CoroutineScope/channel/launch call. The only reason for the separate CoroutineContext is that the default one may not be suited for blocking IO. All that's needed is to wrap any blocking IO calls in withContext(streamingAeadContext)

See GcsStorageClient as an example.

thanks!

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed all commit messages.
Reviewable status: 0 of 11 files reviewed, 13 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @Marco-Premier, and @stevenwarejones)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 63 at r4 (raw file):

  override suspend fun writeBlob(blobKey: String, content: Flow<ByteString>): StorageClient.Blob {
    val encryptedContent = flow {
      val outputStream = ByteArrayOutputStream()

The whole point of java.nio is to avoid using IO streams for bytes and instead use ByteBuffers so that all reads/writes are buffered. What this means is that you want to have your own WriteableByteChannel implementation that emits flow elements on write rather than wrapping an OutputStream.

Code quote:

val outputStream = ByteArrayOutputStream()

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 122 at r4 (raw file):

     */
    override fun read(): Flow<ByteString> = flow {
      val chunkChannel = Channel<ByteString>(capacity = Channel.UNLIMITED)

Use Flow.produceIn to get the flow elements into a Channel.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 163 at r4 (raw file):

        buffer.flip()
        emit(ByteString.copyFrom(buffer.array(), 0, buffer.limit()))
      }

Use ReadableByteChannel.asFlow extension from org.wfanet.measurement.common

Suggestion:

      emitAll(plaintextChannel.asFlow(BUFFER_SIZE, streamingAeadContext))

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 0 of 11 files reviewed, 13 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @SanjayVas, and @stevenwarejones)


MODULE.bazel.lock line 1583 at r3 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

There should be no changes to this file unless MODULE.bazel is also changed

not sure why this pops up. probably a rebase I did, but I reverted it now


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 63 at r4 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

The whole point of java.nio is to avoid using IO streams for bytes and instead use ByteBuffers so that all reads/writes are buffered. What this means is that you want to have your own WriteableByteChannel implementation that emits flow elements on write rather than wrapping an OutputStream.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 122 at r4 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use Flow.produceIn to get the flow elements into a Channel.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 163 at r4 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use ReadableByteChannel.asFlow extension from org.wfanet.measurement.common

Done.

Copy link
Contributor

@stevenwarejones stevenwarejones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 11 files at r4, 1 of 1 files at r5, all commit messages.
Reviewable status: 6 of 11 files reviewed, 12 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/RecordIoStorageClient.kt line 58 at r3 (raw file):

Previously, Marco-Premier (marcopremier) wrote…

Done.

this is fine to extract but the Mesos standard seems to specify the delimiter so I"m fine how it was

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 1 of 1 files at r5.
Reviewable status: 6 of 11 files reviewed, 17 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @Marco-Premier, and @stevenwarejones)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 69 at r5 (raw file):

   * @return A Blob object representing the encrypted data that was written to storage.
   */
  //  override suspend fun writeBlob(blobKey: String, content: Flow<ByteString>): StorageClient.Blob

nit: drop commented-out code


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 104 at r5 (raw file):

  override suspend fun writeBlob(blobKey: String, content: Flow<ByteString>): StorageClient.Blob {
    val encryptedContent = channelFlow {
      val channel = Channel<ByteString>(Channel.UNLIMITED)

Don't declare another channel. Use the one from the ProducerScope.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 113 at r5 (raw file):

            src.get(bytes)
            channel.trySend(bytes.toByteString())
            return bytesWritten

I think this is doing at least one unnecessary copy. You can copy from a ByteBuffer directly into a ByteString. Finally, you only want to return the number of bytes actually "written", so you need to condition on whether trySend succeeded.

Suggestion:

            val bytes = ByteString.copyFrom(src)
            val result: ChannelResult<Unit> = channel.trySend(bytes)
            if (result.isClosed) {
              throw ClosedChannelException()
            }
            return if (result.isSuccess) { bytes.size() } else { 0 }

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 116 at r5 (raw file):

          }

          override fun isOpen() = true

Suggestion:

channel.isClosedForSend

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 125 at r5 (raw file):

      launch {
        for (byteString in channel) {
          send(byteString)

See above note about not needing two channels.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 136 at r5 (raw file):

      }

      ciphertextChannel.close()

Use try-with-resources pattern. In Kotlin, this is the use extension.

Note that the suggestion below doesn't handle the other issue, which is that you need to call write on the byte channel in a loop until all bytes from the buffer are consumed.

Suggestion:

      streamingAead.newEncryptingChannel(flowEmittingChannel, blobKey.encodeToByteArray()).use { ciphertextChannel ->
         content.collect { byteString ->
           byteString.asReadOnlyByteBufferList().forEach { buffer -> ciphertextChannel.write(buffer) }
         }
      }

src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 139 at r5 (raw file):

    }

    val wrappedBlob: StorageClient.Blob = storageClient.writeBlob(blobKey, encryptedContent)

Something is tickling my brain about this implementation that makes me think it shouldn't work. collect will suspend until everything is collected, but it's writing into a non-blocking WriteableByteChannel that itself emits a flow which is then consumed by the writeBlob call to the underlying StorageClient. In theory, I would expect collect to therefore suspend until writeBlob completes.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 191 at r5 (raw file):

              override fun read(buffer: ByteBuffer): Int {
                if (currentChunk == null || bufferOffset >= currentChunk!!.size()) {
                  currentChunk = runBlocking { chunkChannel.receiveCatching().getOrNull() }

The ReadableByteChannel should be non-blocking. If it couldn't read anything (i.e. couldn't receive anything from the coroutine channel), the read method should return 0.

Use ReceiveChannel.tryReceive to accomplish this.

Code quote:

currentChunk = runBlocking { chunkChannel.receiveCatching().getOrNull() }

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 6 of 11 files reviewed, 17 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @Marco-Premier, and @stevenwarejones)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 139 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Something is tickling my brain about this implementation that makes me think it shouldn't work. collect will suspend until everything is collected, but it's writing into a non-blocking WriteableByteChannel that itself emits a flow which is then consumed by the writeBlob call to the underlying StorageClient. In theory, I would expect collect to therefore suspend until writeBlob completes.

It may be clearer to extract two classes into org.wfanet.measurement.common that you can use as primitives:

class CoroutineWritableByteChannel(delegate: SendChannel<ByteString>) : WritableByteChannel

and

class CoroutineReadableByteChannel(delegate: ReceiveChannel<ByteString>) : ReadableByteChannel

Both of these would be implemented as non-blocking channels that delegate to the coroutine channel. They would not own the coroutine channel, meaning the lifecycle would need to be handled by the caller.

Copy link
Contributor

@stevenwarejones stevenwarejones left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 5 of 11 files at r4.
Reviewable status: all files reviewed, 14 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @Marco-Premier, and @SanjayVas)

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kungfucraig I think the PR now is ready for final review :)

Reviewable status: 10 of 13 files reviewed, 12 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @SanjayVas, and @stevenwarejones)


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 104 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Don't declare another channel. Use the one from the ProducerScope.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 113 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

I think this is doing at least one unnecessary copy. You can copy from a ByteBuffer directly into a ByteString. Finally, you only want to return the number of bytes actually "written", so you need to condition on whether trySend succeeded.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 116 at r5 (raw file):

          }

          override fun isOpen() = true

I think this is !channel.isClosedForSend


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 125 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

See above note about not needing two channels.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 136 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use try-with-resources pattern. In Kotlin, this is the use extension.

Note that the suggestion below doesn't handle the other issue, which is that you need to call write on the byte channel in a loop until all bytes from the buffer are consumed.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 191 at r5 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

The ReadableByteChannel should be non-blocking. If it couldn't read anything (i.e. couldn't receive anything from the coroutine channel), the read method should return 0.

Use ReceiveChannel.tryReceive to accomplish this.

Done.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 3 of 3 files at r6, all commit messages.
Reviewable status: all files reviewed, 18 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @Marco-Premier)


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 29 at r6 (raw file):

 * @property delegate The [ReceiveChannel] from which this [ReadableByteChannel] will read data.
 * @return The number of bytes read into the buffer, or -1 if the channel is closed and no data
 *   remains.

This is misplaced.

Code quote:

 * @return The number of bytes read into the buffer, or -1 if the channel is closed and no data
 *   remains.

src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 37 at r6 (raw file):

  override fun read(destination: ByteBuffer): Int {
    val result = delegate.tryReceive()
    val byteString = result.getOrNull() ?: return -1 // -1 indicates the end of the stream

Getting no result does not mean that there is no more data, it just means there's no data available yet. You need to handle failed and closed separately. See https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/try-receive.html and https://docs.oracle.com/javase/8/docs/api/index.html?java/nio/ByteBuffer.html


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 38 at r6 (raw file):

    val result = delegate.tryReceive()
    val byteString = result.getOrNull() ?: return -1 // -1 indicates the end of the stream
    destination.put(byteString.toByteArray())

The buffer may have less remaining space than the size of the ByteString. You need to check whether there is enough space first to avoid a BufferOverflowException, and then only write however many bytes there is space for. Finally, you need to only return however many bytes were written.

This means you'll need to internally buffer the bytes from the delegate channel that couldn't be written to the destination buffer, and only read from the channel once that internal buffer is empty.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 38 at r6 (raw file):

    val result = delegate.tryReceive()
    val byteString = result.getOrNull() ?: return -1 // -1 indicates the end of the stream
    destination.put(byteString.toByteArray())

This does an additional copy to a ByteArray before copying into the destination buffer. Use 'ByteString.copyTo' instead, calling substring first if necessary.

Code quote:

toByteArray()

src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 45 at r6 (raw file):

  override fun close() {
    delegate.cancel()

Suggestion:

close()

src/main/kotlin/org/wfanet/measurement/common/CoroutineWritableByteChannel.kt line 31 at r6 (raw file):

 * @constructor Creates a writable channel that writes each [ByteBuffer] as a [ByteString] to the
 *   provided [SendChannel].
 * @throws ClosedChannelException if the channel is closed and cannot accept more data.

This is misplaced.

Code quote:

 * @throws ClosedChannelException if the channel is closed and cannot accept more data.

src/main/kotlin/org/wfanet/measurement/common/CoroutineWritableByteChannel.kt line 36 at r6 (raw file):

  WritableByteChannel {
  override fun write(source: ByteBuffer): Int {
    val byteString = ByteString.copyFrom(source)

Similar notes as above for the readable version. You need to handle the case where the channel is not ready to receive, but isn't yet closed. In this case, you need to return that 0 bytes were written and not consume any bytes from the buffer.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 69 at r6 (raw file):

  override suspend fun writeBlob(blobKey: String, content: Flow<ByteString>): StorageClient.Blob {
    val encryptedContent = channelFlow {
      val channel = this@channelFlow

I don't think this line is necessary. You're copying a variable into a scope that can already access it with the same name.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 76 at r6 (raw file):

        content.collect { byteString ->
          byteString.asReadOnlyByteBufferList().forEach { buffer ->
            ciphertextChannel.write(buffer)

You need to handle the case where the channel returns that fewer than buffer.remaining bytes were written.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 124 at r6 (raw file):

        val plaintextChannel =
          this@StreamingAeadStorageClient.streamingAead.newDecryptingChannel(
            object : ReadableByteChannel {

Use CoroutineReadableByteChannel

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: all files reviewed, 19 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @Marco-Premier)


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 33 at r6 (raw file):

 *   [ReceiveChannel] and writes it to the specified [ByteBuffer].
 */
class CoroutineReadableByteChannel(private val delegate: ReceiveChannel<ByteString>) :

These classes warrant their own unit tests. You can use unbuffered (rendezvous) channels to trigger some of the more interesting behavior w.r.t. not being ready for send/receive.

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 10 of 16 files reviewed, 19 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 29 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

This is misplaced.

Done.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 33 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

These classes warrant their own unit tests. You can use unbuffered (rendezvous) channels to trigger some of the more interesting behavior w.r.t. not being ready for send/receive.

Done.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 37 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Getting no result does not mean that there is no more data, it just means there's no data available yet. You need to handle failed and closed separately. See https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/try-receive.html and https://docs.oracle.com/javase/8/docs/api/index.html?java/nio/ByteBuffer.html

Done.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 38 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

The buffer may have less remaining space than the size of the ByteString. You need to check whether there is enough space first to avoid a BufferOverflowException, and then only write however many bytes there is space for. Finally, you need to only return however many bytes were written.

This means you'll need to internally buffer the bytes from the delegate channel that couldn't be written to the destination buffer, and only read from the channel once that internal buffer is empty.

Done.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 38 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

This does an additional copy to a ByteArray before copying into the destination buffer. Use 'ByteString.copyTo' instead, calling substring first if necessary.

Done.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 45 at r6 (raw file):

  override fun close() {
    delegate.cancel()

I don't see close method available?
The cancel method does close the channel: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/#-2081943076%2FFunctions%2F1975948010


src/main/kotlin/org/wfanet/measurement/common/CoroutineWritableByteChannel.kt line 31 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

This is misplaced.

Done.


src/main/kotlin/org/wfanet/measurement/common/CoroutineWritableByteChannel.kt line 36 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Similar notes as above for the readable version. You need to handle the case where the channel is not ready to receive, but isn't yet closed. In this case, you need to return that 0 bytes were written and not consume any bytes from the buffer.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 69 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

I don't think this line is necessary. You're copying a variable into a scope that can already access it with the same name.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 76 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

You need to handle the case where the channel returns that fewer than buffer.remaining bytes were written.

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 124 at r6 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use CoroutineReadableByteChannel

Done.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 11 files at r4, 4 of 6 files at r7, all commit messages.
Reviewable status: 14 of 16 files reviewed, 15 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @Marco-Premier)


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 45 at r6 (raw file):

Previously, Marco-Premier (marcopremier) wrote…

I don't see close method available?
The cancel method does close the channel: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/#-2081943076%2FFunctions%2F1975948010

Sorry, I was looking at SendChannel.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 35 at r7 (raw file):

  private var remainingBytes: ByteArray? = null
  private var remainingOffset = 0

Use a ByteBuffer, which would also mean you shouldn't need to track the offset yourself and you can possibly avoid an extra copy by using ByteString.asReadOnlyByteBuffer.

Code quote:

  private var remainingBytes: ByteArray? = null
  private var remainingOffset = 0

src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 53 at r7 (raw file):

   */
  override fun read(destination: ByteBuffer): Int {
    remainingBytes?.let {

nit: prefer standard if structure, especially if not using the result of let.

Suggestion:

if (remainingBytes != null) {

src/test/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannelTest.kt line 34 at r7 (raw file):

  private lateinit var channel: Channel<ByteString>
  private lateinit var coroutineReadableByteChannel: CoroutineReadableByteChannel

If you're just calling the constructors, you can make the properties not be lateinit and instead just call these during regular instance initialization. Recall that JUnit will initialize a new instance for each test method.

That said, you may want to initialize these within each test method to have more control or be able to use builders like produce.

Suggestion:

  private val channel: Channel<ByteString> = Channel()
  private val coroutineReadableByteChannel = CoroutineReadableByteChannel(channel)

src/test/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannelTest.kt line 52 at r7 (raw file):

    val buffer = ByteBuffer.allocate(5)
    launch { channel.send(testData) }
    delay(100)

Unit tests should neither use real delays nor rely on timing.


src/test/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannelTest.kt line 59 at r7 (raw file):

  @Test
  fun `read - handles remaining bytes correctly`() = runBlocking {

nit: be more specific than "handles correctly"

Suggestion:

read buffers channel items when destination buffer is filled

src/test/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannelTest.kt line 65 at r7 (raw file):

    delay(100)
    val bytesRead = coroutineReadableByteChannel.read(buffer)
    assertThat(5).isEqualTo(bytesRead)

These test assertions are inverted. In Truth, you wrap the Subject around the actual value. See https://truth.dev/faq#order


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 140 at r7 (raw file):

  companion object {
    internal val logger = Logger.getLogger(this::class.java.name)

Can this be private?

Code quote:

internal

src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 54 at r7 (raw file):

    var recordsWritten = 0
    val processedContent = flow {
      val outputStream = ByteArrayOutputStream()

Use ByteString.Output so you never need to call ByteString.copyFrom. ByteString.Output extends OutputStream.

Suggestion:

val outputStream: ByteString.Output = ByteString.newOutput()

src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 58 at r7 (raw file):

        val rawBytes = byteString.toByteArray()
        val recordSize = rawBytes.size.toString()
        val fullRecord = recordSize + RECORD_DELIMITER + String(rawBytes, Charsets.UTF_8)

I don't think you ever need rawBytes.

In general, you want to avoid copying ByteString into a new ByteArray whenever possible. This is what toByteArray does.

Suggestion:

        val recordSize: String = byteString.size().toString()
        val fullRecord: String = recordSize + RECORD_DELIMITER + byteString.toStringUtf8()

src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 66 at r7 (raw file):

      }

      val remainingBytes = outputStream.toByteArray()

How would we have remaining bytes in this stream when it's reset at the end of every collect iteration?


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 120 at r7 (raw file):

          if (currentRecordSize == -1) {
            while (position < chunkString.length) {
              val char = chunkString[position++]

Rather than iterating yourself char-by-char, utilize String functions like indexOf.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 121 at r7 (raw file):

            while (position < chunkString.length) {
              val char = chunkString[position++]
              if (char == '\n') {

Use the RECORD_DELIMITER constant you already defined. You may need to change the type of that constant, as it's currently String.

Code quote:

'\n'

src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 155 at r7 (raw file):

  companion object {
    const val RECORD_DELIMITER = "\n"
    internal val logger = Logger.getLogger(this::class.java.name)

I believe these can both be private

Code quote:

    const val RECORD_DELIMITER = "\n"
    internal val logger = Logger.getLogger(this::class.java.name)

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 11 of 16 files reviewed, 13 unresolved discussions (waiting on @dawnwang22, @kungfucraig, @SanjayVas, and @stevenwarejones)


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 35 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use a ByteBuffer, which would also mean you shouldn't need to track the offset yourself and you can possibly avoid an extra copy by using ByteString.asReadOnlyByteBuffer.

Done.


src/test/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannelTest.kt line 34 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

If you're just calling the constructors, you can make the properties not be lateinit and instead just call these during regular instance initialization. Recall that JUnit will initialize a new instance for each test method.

That said, you may want to initialize these within each test method to have more control or be able to use builders like produce.

Done.


src/test/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannelTest.kt line 52 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Unit tests should neither use real delays nor rely on timing.

Done.


src/test/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannelTest.kt line 65 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

These test assertions are inverted. In Truth, you wrap the Subject around the actual value. See https://truth.dev/faq#order

Done.


src/main/kotlin/org/wfanet/measurement/common/crypto/tink/StreamingAeadStorageClient.kt line 140 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Can this be private?

Done.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 54 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use ByteString.Output so you never need to call ByteString.copyFrom. ByteString.Output extends OutputStream.

Done.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 58 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

I don't think you ever need rawBytes.

In general, you want to avoid copying ByteString into a new ByteArray whenever possible. This is what toByteArray does.

Done.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 66 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

How would we have remaining bytes in this stream when it's reset at the end of every collect iteration?

thanks for all the tips!


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 120 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Rather than iterating yourself char-by-char, utilize String functions like indexOf.

Done.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 121 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Use the RECORD_DELIMITER constant you already defined. You may need to change the type of that constant, as it's currently String.

Done.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 155 at r7 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

I believe these can both be private

Done.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 8 of 11 files at r4, 5 of 5 files at r8, all commit messages.
Reviewable status: all files reviewed, 5 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @Marco-Premier)


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 21 at r8 (raw file):

import java.nio.ByteBuffer
import java.nio.channels.ReadableByteChannel
import java.util.*

Wildcard imports are forbidden


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 55 at r8 (raw file):

  override fun read(destination: ByteBuffer): Int {

    if (remainingBuffer != null) {

You can rewrite this to take advantage of smart casts by assigning remainingBuffer to a local variable first. Then you won't need to use the non-null assertion.

Alternatively, I think the better approach would be to have it so that remainingBuffer is never null by initializing it to ByteString.EMPTY.asReadOnlyByteBuffer() and always assigning it to the ByteString buffer from the coroutine channel. This way you can just check if it has any bytes remaining.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 75 at r8 (raw file):

  }

  private fun writeToDestination(destination: ByteBuffer, source: ByteBuffer): Int{

nit: Document what this method is doing, since it's not obvious from the name.

I know that this is essentially ReadableByteChannel.read, but with the source ByteBuffer acting as the ByteChannel itself.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 32 at r8 (raw file):

 * @param storageClient underlying client for accessing blob/object storage
 */
class MesosRecordIoStorageClient(private val storageClient: StorageClient) : StorageClient {

Sorry for not catching this sooner, but you probably want the StorageClient impl to be a private implementation detail class inside of your public MesosRecordIoStore, where the latter provides a similar interface but with the actual record type specified. The class would then abstract away the protobuf message serialization.

Since this PR is already long-lived, I don't mind if that work is done in a follow-up PR.

e.g.

class MesosRecordIoStore<T : Message>(storageClient: StorageClient) {
  private val internalStorageClient = InternalStorageClient(storageClient)

  suspend fun writeBlob(blobKey: String, records: Flow<T>}: Blob

  class Blob {
    fun read(): Flow<T>

    suspend fun delete()
  }

  private class InternalStorageClient(private val storageClient: StorageClient) : StorageClient {
    // The implementation you have now
  }
}

Code quote:

StorageClient

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: 14 of 16 files reviewed, 4 unresolved discussions (waiting on @dawnwang22, @kungfucraig, and @SanjayVas)


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 21 at r8 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Wildcard imports are forbidden

Done.


src/main/kotlin/org/wfanet/measurement/common/CoroutineReadableByteChannel.kt line 55 at r8 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

You can rewrite this to take advantage of smart casts by assigning remainingBuffer to a local variable first. Then you won't need to use the non-null assertion.

Alternatively, I think the better approach would be to have it so that remainingBuffer is never null by initializing it to ByteString.EMPTY.asReadOnlyByteBuffer() and always assigning it to the ByteString buffer from the coroutine channel. This way you can just check if it has any bytes remaining.

Done.


src/main/kotlin/org/wfanet/measurement/storage/MesosRecordIoStorageClient.kt line 32 at r8 (raw file):

Previously, SanjayVas (Sanjay Vasandani) wrote…

Sorry for not catching this sooner, but you probably want the StorageClient impl to be a private implementation detail class inside of your public MesosRecordIoStore, where the latter provides a similar interface but with the actual record type specified. The class would then abstract away the protobuf message serialization.

Since this PR is already long-lived, I don't mind if that work is done in a follow-up PR.

e.g.

class MesosRecordIoStore<T : Message>(storageClient: StorageClient) {
  private val internalStorageClient = InternalStorageClient(storageClient)

  suspend fun writeBlob(blobKey: String, records: Flow<T>}: Blob

  class Blob {
    fun read(): Flow<T>

    suspend fun delete()
  }

  private class InternalStorageClient(private val storageClient: StorageClient) : StorageClient {
    // The implementation you have now
  }
}

Yes please as I'd like to merge this PR and mark some progress.
I added a TODO for it.

Copy link
Member

@SanjayVas SanjayVas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 2 of 2 files at r9, all commit messages.
Reviewable status: :shipit: complete! all files reviewed, all discussions resolved (waiting on @dawnwang22 and @kungfucraig)

Copy link
Contributor Author

@Marco-Premier Marco-Premier left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed 9 of 11 files at r4, 1 of 6 files at r7, 3 of 5 files at r8, 1 of 2 files at r9, 2 of 2 files at r10, all commit messages.
Reviewable status: :shipit: complete! all files reviewed, all discussions resolved (waiting on @dawnwang22 and @kungfucraig)

@Marco-Premier Marco-Premier merged commit 1a7a85b into main Nov 6, 2024
3 checks passed
@Marco-Premier Marco-Premier deleted the marcopremier/recordio-storage-client branch November 6, 2024 18:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants