-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25253][PYSPARK] Refactor local connection & auth code #22247
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #95303 has finished for PR 22247 at commit
|
python/pyspark/java_gateway.py
Outdated
| raise Exception("Unexpected reply from iterator server.") | ||
|
|
||
|
|
||
| def local_connect_and_auth(sock_info): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@squito, not a big deal but how about local_connect_and_auth (port, auth_secret) and ..
(sockfile, sock) = local_connect_and_auth(port, auth_secret)(sock_file, _) = local_connect_and_auth(java_port, auth_secret)port, auth_secret = sock_info
(sockfile, sock) = local_connect_and_auth(port, auth_secret)or
(sockfile, sock) = local_connect_and_auth(*sock_info)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done, thanks -- I used the varargs version *sock_info for the last one, i forgot about that in python.
python/pyspark/java_gateway.py
Outdated
| # Support for both IPv4 and IPv6. | ||
| # On most of IPv6-ready systems, IPv6 will take precedence. | ||
| for res in socket.getaddrinfo("127.0.0.1", port, socket.AF_UNSPEC, socket.SOCK_STREAM): | ||
| af, socktype, proto, canonname, sa = res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: af, socktype, proto, canonname, sa = res -> af, socktype, proto, _, sa = res
| auth_secret = os.environ["PYTHON_WORKER_FACTORY_SECRET"] | ||
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | ||
| sock.connect(("127.0.0.1", java_port)) | ||
| sock_file = sock.makefile("rwb", 65536) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin, BTW, did you test this on Windows too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I quickly tested and seems working fine. Please ignore this comment.
jiangxb1987
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks!
| # do_server_auth() function and data serialization methods. | ||
| sockfile = sock.makefile("rwb", 65536) | ||
|
|
||
| (sockfile, sock) = local_connect_and_auth(port, auth_secret) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We must set sock timeout to None to allow barrier() call blocking forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, thanks! updated
vanzin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
python/pyspark/java_gateway.py
Outdated
| sock.close() | ||
| sock = None | ||
| continue | ||
| break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slight shorter (and more "python-compliant"?):
- move the socket initialization (and the return) inside the try
- get rid of the continue
- use an else instead of the condition below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
Test build #95354 has finished for PR 22247 at commit
|
|
Test build #95365 has finished for PR 22247 at commit
|
|
Test build #95380 has finished for PR 22247 at commit
|
|
Merged to master. |
|
@squito Thanks for the refactor! |
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes apache#22247 from squito/py_connection_refactor. Authored-by: Imran Rashid <[email protected]> Signed-off-by: hyukjinkwon <[email protected]> (cherry picked from commit 38391c9)
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes #22247 from squito/py_connection_refactor. Authored-by: Imran Rashid <[email protected]> Signed-off-by: hyukjinkwon <[email protected]> (cherry picked from commit 38391c9)
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes #22247 from squito/py_connection_refactor. Authored-by: Imran Rashid <[email protected]> Signed-off-by: hyukjinkwon <[email protected]> (cherry picked from commit 38391c9) (cherry picked from commit a2a54a5)
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes apache#22247 from squito/py_connection_refactor. Authored-by: Imran Rashid <[email protected]> Signed-off-by: hyukjinkwon <[email protected]> (cherry picked from commit 38391c9) (cherry picked from commit a2a54a5)
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter: 1) python barrier tasks perform authentication immediately (rather than waiting for the BARRIER_FUNCTION indicator) 2) for `rdd._load_from_socket`, the timeout is only increased after authentication. Closes apache#22247 from squito/py_connection_refactor. Authored-by: Imran Rashid <[email protected]> Signed-off-by: hyukjinkwon <[email protected]> (cherry picked from commit 38391c9) (cherry picked from commit a2a54a5)
This eliminates some duplication in the code to connect to a server on localhost to talk directly to the jvm. Also it gives consistent ipv6 and error handling. Two other incidental changes, that shouldn't matter:
rdd._load_from_socket, the timeout is only increased after authentication.