-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25253][PYSPARK] Refactor local connection & auth code #22247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -134,7 +134,7 @@ def killChild(): | |
| return gateway | ||
|
|
||
|
|
||
| def do_server_auth(conn, auth_secret): | ||
| def _do_server_auth(conn, auth_secret): | ||
| """ | ||
| Performs the authentication protocol defined by the SocketAuthHelper class on the given | ||
| file-like object 'conn'. | ||
|
|
@@ -147,6 +147,39 @@ def do_server_auth(conn, auth_secret): | |
| raise Exception("Unexpected reply from iterator server.") | ||
|
|
||
|
|
||
| def local_connect_and_auth(sock_info): | ||
| """ | ||
| Connect to local host, authenticate with it, and return a (sockfile,sock) for that connection. | ||
| Handles IPV4 & IPV6, does some error handling. | ||
| :param sock_info: a tuple of (port, auth_secret) for connecting | ||
| :return: a tuple with (sockfile, sock) | ||
| """ | ||
| port, auth_secret = sock_info | ||
| sock = None | ||
| errors = [] | ||
| # Support for both IPv4 and IPv6. | ||
| # On most of IPv6-ready systems, IPv6 will take precedence. | ||
| for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): | ||
| af, socktype, proto, canonname, sa = res | ||
|
||
| sock = socket.socket(af, socktype, proto) | ||
| try: | ||
| sock.settimeout(15) | ||
| sock.connect(sa) | ||
| except socket.error as e: | ||
| emsg = _exception_message(e) | ||
| errors.append("tried to connect to %s, but an error occured: %s" % (sa, emsg)) | ||
| sock.close() | ||
| sock = None | ||
| continue | ||
| break | ||
|
||
| if not sock: | ||
| raise Exception("could not open socket: %s" % errors) | ||
|
|
||
| sockfile = sock.makefile("rwb", 65536) | ||
| _do_server_auth(sockfile, auth_secret) | ||
| return (sockfile, sock) | ||
|
|
||
|
|
||
| def ensure_callback_server_started(gw): | ||
| """ | ||
| Start callback server if not already started. The callback server is needed if the Java | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ | |
|
|
||
| from pyspark.accumulators import _accumulatorRegistry | ||
| from pyspark.broadcast import Broadcast, _broadcastRegistry | ||
| from pyspark.java_gateway import do_server_auth | ||
| from pyspark.java_gateway import local_connect_and_auth | ||
| from pyspark.taskcontext import BarrierTaskContext, TaskContext | ||
| from pyspark.files import SparkFiles | ||
| from pyspark.rdd import PythonEvalType | ||
|
|
@@ -364,8 +364,5 @@ def process(): | |
| # Read information about how to connect back to the JVM from the environment. | ||
| java_port = int(os.environ["PYTHON_WORKER_FACTORY_PORT"]) | ||
| auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] | ||
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
| sock.connect(("127.0.0.1", java_port)) | ||
| sock_file = sock.makefile("rwb", 65536) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vanzin, BTW, did you test this on Windows too?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I quickly tested and seems working fine. Please ignore this comment. |
||
| do_server_auth(sock_file, auth_secret) | ||
| (sock_file, _) = local_connect_and_auth((java_port, auth_secret)) | ||
| main(sock_file, sock_file) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito, not a big deal but how about
local_connect_and_auth (port, auth_secret)and ..or
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks -- I used the varargs version
*sock_infofor the last one, i forgot about that in python.