diff --git a/core/amber/build.sbt b/core/amber/build.sbt index 9c6f175ecbd..fecedfd0a91 100644 --- a/core/amber/build.sbt +++ b/core/amber/build.sbt @@ -127,7 +127,7 @@ val googleServiceDependencies = Seq( ///////////////////////////////////////////////////////////////////////////// // Arrow related -val arrowVersion = "10.0.0" +val arrowVersion = "11.0.0" val arrowDependencies = Seq( // https://mvnrepository.com/artifact/org.apache.arrow/flight-grpc "org.apache.arrow" % "flight-grpc" % arrowVersion, diff --git a/core/amber/requirements.txt b/core/amber/requirements.txt index 714dfbccbd7..2df0799abc3 100644 --- a/core/amber/requirements.txt +++ b/core/amber/requirements.txt @@ -9,7 +9,7 @@ loguru==0.5.3 packaging==20.9 pluggy==0.13.1 py==1.10.0 -pyarrow==10.0.0 +pyarrow==11.0.0 pyparsing==2.4.7 pytest==6.2.4 python-dateutil==2.8.1 diff --git a/core/amber/src/main/python/core/proxy/proxy_server.py b/core/amber/src/main/python/core/proxy/proxy_server.py index 21e6fc0ce42..23a4e119272 100644 --- a/core/amber/src/main/python/core/proxy/proxy_server.py +++ b/core/amber/src/main/python/core/proxy/proxy_server.py @@ -202,9 +202,9 @@ def do_action(self, context: ServerCallContext, action: Action) -> Iterator[Resu encoded = result else: encoded = str(result).encode("utf-8") + yield Result(py_buffer(encoded)) else: raise KeyError("Unknown action {!r}".format(action_name)) - yield Result(py_buffer(encoded)) @logger.catch(reraise=True) def register(self, name: str, action: Callable, description: str = "") -> None: diff --git a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala index f90550aaa29..e0ee447e68b 100644 --- a/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala +++ b/core/amber/src/main/scala/edu/uci/ics/amber/engine/architecture/pythonworker/PythonProxyClient.scala @@ -100,8 +100,19 @@ class PythonProxyClient(portNumber: Int, val actorId: ActorVirtualIdentity) ): Result = { val controlMessage = PythonControlMessage(from, payload) val action: Action = new Action("control", controlMessage.toByteArray) + logger.debug(s"sending control $controlMessage") - flightClient.doAction(action).next() + // Arrow allows multiple results from the Action call return as a stream (interator). + // In Arrow 11, it alerts if the results are not consumed fully. + val results = flightClient.doAction(action) + // As we do our own Async RPC management, we are currently not using results from Action call. + // In the future, this results can include credits for flow control purpose. + val result = results.next() + + // However, we will only expect exactly one result for now. + assert(!results.hasNext) + + result } def sendControlV1(from: ActorVirtualIdentity, payload: ControlPayload): Unit = {