Skip to content

Commit

Permalink
Remove unneeded concurrency things (#360)
Browse files Browse the repository at this point in the history
  • Loading branch information
taer authored Dec 1, 2023
1 parent 7126e46 commit 98bebab
Showing 1 changed file with 8 additions and 14 deletions.
22 changes: 8 additions & 14 deletions stub/src/main/java/io/grpc/kotlin/ServerCalls.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import io.grpc.Metadata as GrpcMetadata
Expand Down Expand Up @@ -237,26 +235,22 @@ object ServerCalls {
}

val rpcJob = CoroutineScope(context).launch {
val mutex = Mutex()
val headersSent = AtomicBoolean(false) // enforces only sending headers once
var headersSent = false
val failure = runCatching {
implementation(requests).collect {
// once we have a response message, check if we've sent headers yet - if not, do so
if (headersSent.compareAndSet(false, true)) {
mutex.withLock {
call.sendHeaders(GrpcMetadata())
}
if (!headersSent) {
call.sendHeaders(GrpcMetadata())
headersSent = true
}
readiness.suspendUntilReady()
mutex.withLock { call.sendMessage(it) }
call.sendMessage(it)
}
}.exceptionOrNull()
// check headers again once we're done collecting the response flow - if we received
// no elements or threw an exception, then we wouldn't have sent them
if (failure == null && headersSent.compareAndSet(false, true)) {
mutex.withLock {
call.sendHeaders(GrpcMetadata())
}
if (failure == null && !headersSent) {
call.sendHeaders(GrpcMetadata())
}
val closeStatus = when (failure) {
null -> Status.OK
Expand All @@ -265,7 +259,7 @@ object ServerCalls {
else -> Status.fromThrowable(failure).withCause(failure)
}
val trailers = failure?.let { Status.trailersFromThrowable(it) } ?: GrpcMetadata()
mutex.withLock { call.close(closeStatus, trailers) }
call.close(closeStatus, trailers)
}

return object: ServerCall.Listener<RequestT>() {
Expand Down

0 comments on commit 98bebab

Please sign in to comment.