Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/amber/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ val googleServiceDependencies = Seq(

/////////////////////////////////////////////////////////////////////////////
// Arrow related
val arrowVersion = "10.0.0"
val arrowVersion = "11.0.0"
val arrowDependencies = Seq(
// https://mvnrepository.com/artifact/org.apache.arrow/flight-grpc
"org.apache.arrow" % "flight-grpc" % arrowVersion,
Expand Down
2 changes: 1 addition & 1 deletion core/amber/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ loguru==0.5.3
packaging==20.9
pluggy==0.13.1
py==1.10.0
pyarrow==10.0.0
pyarrow==11.0.0
pyparsing==2.4.7
pytest==6.2.4
python-dateutil==2.8.1
Expand Down
2 changes: 1 addition & 1 deletion core/amber/src/main/python/core/proxy/proxy_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ def do_action(self, context: ServerCallContext, action: Action) -> Iterator[Resu
encoded = result
else:
encoded = str(result).encode("utf-8")
yield Result(py_buffer(encoded))
else:
raise KeyError("Unknown action {!r}".format(action_name))
yield Result(py_buffer(encoded))

@logger.catch(reraise=True)
def register(self, name: str, action: Callable, description: str = "") -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,19 @@ class PythonProxyClient(portNumber: Int, val actorId: ActorVirtualIdentity)
): Result = {
val controlMessage = PythonControlMessage(from, payload)
val action: Action = new Action("control", controlMessage.toByteArray)

logger.debug(s"sending control $controlMessage")
flightClient.doAction(action).next()
// Arrow allows multiple results from the Action call return as a stream (interator).
// In Arrow 11, it alerts if the results are not consumed fully.
val results = flightClient.doAction(action)
// As we do our own Async RPC management, we are currently not using results from Action call.
// In the future, this results can include credits for flow control purpose.
val result = results.next()

// However, we will only expect exactly one result for now.
assert(!results.hasNext)

result
}

def sendControlV1(from: ActorVirtualIdentity, payload: ControlPayload): Unit = {
Expand Down