Skip to content

Commit

Permalink
Merge branch 'master' into kt_jvm_proto_helper
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesward authored Dec 8, 2023
2 parents 165b622 + 98bebab commit c0997e0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 17 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/gradle_arm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- uses: actions/checkout@v4
- name: test_on_arm
continue-on-error: true
uses: uraimo/run-on-arch-action@v2.5.1
uses: uraimo/run-on-arch-action@v2.6.0
with:
arch: aarch64
distro: ubuntu18.04
Expand All @@ -29,7 +29,7 @@ jobs:
./gradlew --console=plain build
- name: test_on_arm_retry
if: steps.test_on_arm.outcome=='failure'
uses: uraimo/run-on-arch-action@v2.5.1
uses: uraimo/run-on-arch-action@v2.6.0
with:
arch: aarch64
distro: ubuntu18.04
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
kotlin("jvm") version "1.8.0" apply false
id("com.google.protobuf") version "0.9.4" apply false
id("org.gradle.test-retry") version "1.5.6"
id("org.gradle.test-retry") version "1.5.7"
id("io.github.gradle-nexus.publish-plugin") version "1.3.0"
}

Expand Down
22 changes: 8 additions & 14 deletions stub/src/main/java/io/grpc/kotlin/ServerCalls.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import io.grpc.Metadata as GrpcMetadata
Expand Down Expand Up @@ -237,26 +235,22 @@ object ServerCalls {
}

val rpcJob = CoroutineScope(context).launch {
val mutex = Mutex()
val headersSent = AtomicBoolean(false) // enforces only sending headers once
var headersSent = false
val failure = runCatching {
implementation(requests).collect {
// once we have a response message, check if we've sent headers yet - if not, do so
if (headersSent.compareAndSet(false, true)) {
mutex.withLock {
call.sendHeaders(GrpcMetadata())
}
if (!headersSent) {
call.sendHeaders(GrpcMetadata())
headersSent = true
}
readiness.suspendUntilReady()
mutex.withLock { call.sendMessage(it) }
call.sendMessage(it)
}
}.exceptionOrNull()
// check headers again once we're done collecting the response flow - if we received
// no elements or threw an exception, then we wouldn't have sent them
if (failure == null && headersSent.compareAndSet(false, true)) {
mutex.withLock {
call.sendHeaders(GrpcMetadata())
}
if (failure == null && !headersSent) {
call.sendHeaders(GrpcMetadata())
}
val closeStatus = when (failure) {
null -> Status.OK
Expand All @@ -265,7 +259,7 @@ object ServerCalls {
else -> Status.fromThrowable(failure).withCause(failure)
}
val trailers = failure?.let { Status.trailersFromThrowable(it) } ?: GrpcMetadata()
mutex.withLock { call.close(closeStatus, trailers) }
call.close(closeStatus, trailers)
}

return object: ServerCall.Listener<RequestT>() {
Expand Down

0 comments on commit c0997e0

Please sign in to comment.