3333import time
3434import urllib2
3535import warnings
36+ from datetime import datetime
3637from optparse import OptionParser
3738from sys import stderr
3839import boto
@@ -589,7 +590,9 @@ def setup_spark_cluster(master, opts):
589590
590591
591592def is_ssh_available (host , opts ):
592- "Checks if SSH is available on the host."
593+ """
594+ Check if SSH is available on a host.
595+ """
593596 try :
594597 with open (os .devnull , 'w' ) as devnull :
595598 ret = subprocess .check_call (
@@ -604,36 +607,48 @@ def is_ssh_available(host, opts):
604607
605608
606609def is_cluster_ssh_available (cluster_instances , opts ):
610+ """
611+ Check if SSH is available on all the instances in a cluster.
612+ """
607613 for i in cluster_instances :
608614 if not is_ssh_available (host = i .ip_address , opts = opts ):
609615 return False
610616 else :
611617 return True
612618
613619
614- def wait_for_cluster_state (cluster_instances , cluster_state , opts ):
620+ def wait_for_cluster_state (conn , opts , cluster_instances , cluster_state ):
615621 """
622+ Wait for all the instances in the cluster to reach a designated state.
623+
616624 cluster_instances: a list of boto.ec2.instance.Instance
617625 cluster_state: a string representing the desired state of all the instances in the cluster
618626 value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
619627 'running', 'terminated', etc.
620628 (would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
621629 """
622630 sys .stdout .write (
623- "Waiting for all instances in cluster to enter '{s}' state." .format (s = cluster_state )
631+ "Waiting for cluster to enter '{s}' state." .format (s = cluster_state )
624632 )
625633 sys .stdout .flush ()
626634
635+ start_time = datetime .now ()
636+
627637 num_attempts = 0
638+ conn = ec2 .connect_to_region (opts .region )
628639
629640 while True :
630- time .sleep (3 * num_attempts )
641+ time .sleep (5 * num_attempts ) # seconds
631642
632643 for i in cluster_instances :
633- s = i .update () # capture output to suppress print to screen in newer versions of boto
644+ i .update ()
645+
646+ statuses = conn .get_all_instance_status (instance_ids = [i .id for i in cluster_instances ])
634647
635648 if cluster_state == 'ssh-ready' :
636649 if all (i .state == 'running' for i in cluster_instances ) and \
650+ all (s .system_status .status == 'ok' for s in statuses ) and \
651+ all (s .instance_status .status == 'ok' for s in statuses ) and \
637652 is_cluster_ssh_available (cluster_instances , opts ):
638653 break
639654 else :
@@ -647,6 +662,12 @@ def wait_for_cluster_state(cluster_instances, cluster_state, opts):
647662
648663 sys .stdout .write ("\n " )
649664
665+ end_time = datetime .now ()
666+ print "Cluster is now in '{s}' state. Waited {t} seconds." .format (
667+ s = cluster_state ,
668+ t = (end_time - start_time ).seconds
669+ )
670+
650671
651672# Get number of local disks available for a given EC2 instance type.
652673def get_num_disks (instance_type ):
@@ -895,7 +916,7 @@ def real_main():
895916 # See: https://docs.python.org/3.5/whatsnew/2.7.html
896917 warnings .warn (
897918 "This option is deprecated and has no effect. "
898- "spark-ec2 automatically waits as long as necessary for clusters to startup ." ,
919+ "spark-ec2 automatically waits as long as necessary for clusters to start up ." ,
899920 DeprecationWarning
900921 )
901922
@@ -922,9 +943,10 @@ def real_main():
922943 else :
923944 (master_nodes , slave_nodes ) = launch_cluster (conn , opts , cluster_name )
924945 wait_for_cluster_state (
946+ conn = conn ,
947+ opts = opts ,
925948 cluster_instances = (master_nodes + slave_nodes ),
926- cluster_state = 'ssh-ready' ,
927- opts = opts
949+ cluster_state = 'ssh-ready'
928950 )
929951 setup_cluster (conn , master_nodes , slave_nodes , opts , True )
930952
@@ -951,9 +973,10 @@ def real_main():
951973 print "Deleting security groups (this will take some time)..."
952974 group_names = [cluster_name + "-master" , cluster_name + "-slaves" ]
953975 wait_for_cluster_state (
976+ conn = conn ,
977+ opts = opts ,
954978 cluster_instances = (master_nodes + slave_nodes ),
955- cluster_state = 'terminated' ,
956- opts = opts
979+ cluster_state = 'terminated'
957980 )
958981 attempt = 1
959982 while attempt <= 3 :
@@ -1055,9 +1078,10 @@ def real_main():
10551078 if inst .state not in ["shutting-down" , "terminated" ]:
10561079 inst .start ()
10571080 wait_for_cluster_state (
1081+ conn = conn ,
1082+ opts = opts ,
10581083 cluster_instances = (master_nodes + slave_nodes ),
1059- cluster_state = 'ssh-ready' ,
1060- opts = opts
1084+ cluster_state = 'ssh-ready'
10611085 )
10621086 setup_cluster (conn , master_nodes , slave_nodes , opts , False )
10631087
0 commit comments