Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
def handle(self):
from pyspark.accumulators import _accumulatorRegistry
auth_token = self.server.auth_token

def poll(func):
while not self.server.server_shutdown:
# Poll every 1 second for new data -- don't block in case of shutdown.
Expand All @@ -254,13 +255,15 @@ def authenticate_and_accum_updates():
# we've authenticated, we can break out of the first loop now
return True
else:
raise Exception("The value of the provided token to the AccumulatorServer is not correct.")
raise Exception(
"The value of the provided token to the AccumulatorServer is not correct.")

# first we keep polling till we've received the authentication token
poll(authenticate_and_accum_updates)
# now we've authenticated, don't need to check for the token anymore
poll(accum_updates)


class AccumulatorServer(SocketServer.TCPServer):

def __init__(self, server_address, RequestHandlerClass, auth_token):
Expand All @@ -278,6 +281,7 @@ def shutdown(self):
SocketServer.TCPServer.shutdown(self)
self.server_close()


def _start_update_server(auth_token):
"""Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler, auth_token)
Expand Down