4141# A URL prefix from which to fetch AMI information
4242AMI_PREFIX = "https://raw.github.com/mesos/spark-ec2/v2/ami-list"
4343
44+
4445class UsageError (Exception ):
4546 pass
4647
@@ -124,7 +125,7 @@ def parse_args():
124125 help = "The SSH user you want to connect as (default: root)" )
125126 parser .add_option (
126127 "--delete-groups" , action = "store_true" , default = False ,
127- help = "When destroying a cluster, delete the security groups that were created. " )
128+ help = "When destroying a cluster, delete the security groups that were created" )
128129 parser .add_option (
129130 "--use-existing-master" , action = "store_true" , default = False ,
130131 help = "Launch fresh slaves, but use an existing stopped master if possible" )
@@ -138,9 +139,7 @@ def parse_args():
138139 parser .add_option (
139140 "--user-data" , type = "string" , default = "" ,
140141 help = "Path to a user-data file (most AMI's interpret this as an initialization script)" )
141- parser .add_option (
142- "--security-group-prefix" , type = "string" , default = None ,
143- help = "Use this prefix for the security group rather than the cluster name." )
142+
144143
145144 (opts , args ) = parser .parse_args ()
146145 if len (args ) != 2 :
@@ -287,12 +286,8 @@ def launch_cluster(conn, opts, cluster_name):
287286 user_data_content = user_data_file .read ()
288287
289288 print "Setting up security groups..."
290- if opts .security_group_prefix is None :
291- master_group = get_or_make_group (conn , cluster_name + "-master" )
292- slave_group = get_or_make_group (conn , cluster_name + "-slaves" )
293- else :
294- master_group = get_or_make_group (conn , opts .security_group_prefix + "-master" )
295- slave_group = get_or_make_group (conn , opts .security_group_prefix + "-slaves" )
289+ master_group = get_or_make_group (conn , cluster_name + "-master" )
290+ slave_group = get_or_make_group (conn , cluster_name + "-slaves" )
296291 if master_group .rules == []: # Group was just now created
297292 master_group .authorize (src_group = master_group )
298293 master_group .authorize (src_group = slave_group )
@@ -316,11 +311,12 @@ def launch_cluster(conn, opts, cluster_name):
316311 slave_group .authorize ('tcp' , 60060 , 60060 , '0.0.0.0/0' )
317312 slave_group .authorize ('tcp' , 60075 , 60075 , '0.0.0.0/0' )
318313
319- # Check if instances are already running with the cluster name
314+ # Check if instances are already running in our groups
320315 existing_masters , existing_slaves = get_existing_cluster (conn , opts , cluster_name ,
321316 die_on_error = False )
322317 if existing_slaves or (existing_masters and not opts .use_existing_master ):
323- print >> stderr , ("ERROR: There are already instances for name: %s " % cluster_name )
318+ print >> stderr , ("ERROR: There are already instances running in " +
319+ "group %s or %s" % (master_group .name , slave_group .name ))
324320 sys .exit (1 )
325321
326322 # Figure out Spark AMI
@@ -385,13 +381,9 @@ def launch_cluster(conn, opts, cluster_name):
385381 for r in reqs :
386382 id_to_req [r .id ] = r
387383 active_instance_ids = []
388- outstanding_request_ids = []
389384 for i in my_req_ids :
390- if i in id_to_req :
391- if id_to_req [i ].state == "active" :
392- active_instance_ids .append (id_to_req [i ].instance_id )
393- else :
394- outstanding_request_ids .append (i )
385+ if i in id_to_req and id_to_req [i ].state == "active" :
386+ active_instance_ids .append (id_to_req [i ].instance_id )
395387 if len (active_instance_ids ) == opts .slaves :
396388 print "All %d slaves granted" % opts .slaves
397389 reservations = conn .get_all_instances (active_instance_ids )
@@ -400,8 +392,8 @@ def launch_cluster(conn, opts, cluster_name):
400392 slave_nodes += r .instances
401393 break
402394 else :
403- print "%d of %d slaves granted, waiting longer for request ids including %s " % (
404- len (active_instance_ids ), opts .slaves , outstanding_request_ids [ 0 : 10 ] )
395+ print "%d of %d slaves granted, waiting longer" % (
396+ len (active_instance_ids ), opts .slaves )
405397 except :
406398 print "Canceling spot instance requests"
407399 conn .cancel_spot_instance_requests (my_req_ids )
@@ -460,66 +452,45 @@ def launch_cluster(conn, opts, cluster_name):
460452
461453 # Give the instances descriptive names
462454 for master in master_nodes :
463- name = '{cn}- master-{iid}' . format ( cn = cluster_name , iid = master . id )
464- tag_instance ( master , name )
465-
455+ master . add_tag (
456+ key = 'Name' ,
457+ value = '{cn}-master-{iid}' . format ( cn = cluster_name , iid = master . id ))
466458 for slave in slave_nodes :
467- name = '{cn}-slave-{iid}' .format (cn = cluster_name , iid = slave .id )
468- tag_instance (slave , name )
459+ slave .add_tag (
460+ key = 'Name' ,
461+ value = '{cn}-slave-{iid}' .format (cn = cluster_name , iid = slave .id ))
469462
470463 # Return all the instances
471464 return (master_nodes , slave_nodes )
472465
473- def tag_instance (instance , name ):
474- for i in range (0 , 5 ):
475- try :
476- instance .add_tag (key = 'Name' , value = name )
477- except :
478- print "Failed attempt %i of 5 to tag %s" % ((i + 1 ), name )
479- if (i == 5 ):
480- raise "Error - failed max attempts to add name tag"
481- time .sleep (5 )
482466
483467# Get the EC2 instances in an existing cluster if available.
484468# Returns a tuple of lists of EC2 instance objects for the masters and slaves
485469def get_existing_cluster (conn , opts , cluster_name , die_on_error = True ):
486470 print "Searching for existing cluster " + cluster_name + "..."
487- # Search all the spot instance requests, and copy any tags from the spot instance request to the cluster.
488- spot_instance_requests = conn .get_all_spot_instance_requests ()
489- for req in spot_instance_requests :
490- if req .state != u'active' :
491- continue
492- name = req .tags .get (u'Name' , "" )
493- if name .startswith (cluster_name ):
494- reservations = conn .get_all_instances (instance_ids = [req .instance_id ])
495- for res in reservations :
496- active = [i for i in res .instances if is_active (i )]
497- for instance in active :
498- if (instance .tags .get (u'Name' ) == None ):
499- tag_instance (instance , name )
500- # Now proceed to detect master and slaves instances.
501471 reservations = conn .get_all_instances ()
502472 master_nodes = []
503473 slave_nodes = []
504474 for res in reservations :
505475 active = [i for i in res .instances if is_active (i )]
506476 for inst in active :
507- name = inst . tags . get ( u'Name' , "" )
508- if name . startswith ( cluster_name + "-master" ) :
477+ group_names = [ g . name for g in inst . groups ]
478+ if group_names == [ cluster_name + "-master" ] :
509479 master_nodes .append (inst )
510- elif name . startswith ( cluster_name + "-slave" ) :
480+ elif group_names == [ cluster_name + "-slaves" ] :
511481 slave_nodes .append (inst )
512482 if any ((master_nodes , slave_nodes )):
513483 print ("Found %d master(s), %d slaves" % (len (master_nodes ), len (slave_nodes )))
514484 if master_nodes != [] or not die_on_error :
515485 return (master_nodes , slave_nodes )
516486 else :
517487 if master_nodes == [] and slave_nodes != []:
518- print >> sys .stderr , "ERROR: Could not find master in with name " + cluster_name + "-master"
488+ print >> sys .stderr , "ERROR: Could not find master in group " + cluster_name + "-master"
519489 else :
520490 print >> sys .stderr , "ERROR: Could not find any existing cluster"
521491 sys .exit (1 )
522492
493+
523494# Deploy configuration files and run setup scripts on a newly launched
524495# or started EC2 cluster.
525496def setup_cluster (conn , master_nodes , slave_nodes , opts , deploy_ssh_key ):
@@ -856,10 +827,7 @@ def real_main():
856827 # Delete security groups as well
857828 if opts .delete_groups :
858829 print "Deleting security groups (this will take some time)..."
859- if opts .security_group_prefix is None :
860- group_names = [cluster_name + "-master" , cluster_name + "-slaves" ]
861- else :
862- group_names = [opts .security_group_prefix + "-master" , opts .security_group_prefix + "-slaves" ]
830+ group_names = [cluster_name + "-master" , cluster_name + "-slaves" ]
863831
864832 attempt = 1
865833 while attempt <= 3 :
0 commit comments