|
12 | 12 |
|
13 | 13 | # Find system memory in KB and compute Spark's default limit from that |
14 | 14 | mem_command = "cat /proc/meminfo | grep MemTotal | awk '{print $2}'" |
| 15 | +cpu_command = "nproc" |
15 | 16 |
|
16 | 17 | master_ram_kb = int( |
17 | 18 | os.popen(mem_command).read().strip()) |
|
20 | 21 |
|
21 | 22 | slave_mem_command = "ssh -t -o StrictHostKeyChecking=no %s %s" %\ |
22 | 23 | (first_slave, mem_command) |
| 24 | + |
| 25 | +slave_cpu_command = "ssh -t -o StrictHostKeyChecking=no %s %s" %\ |
| 26 | + (first_slave, cpu_command) |
| 27 | + |
23 | 28 | slave_ram_kb = int(os.popen(slave_mem_command).read().strip()) |
24 | 29 |
|
| 30 | +slave_cpus = int(os.popen(slave_cpu_command).read().strip()) |
| 31 | + |
25 | 32 | system_ram_kb = min(slave_ram_kb, master_ram_kb) |
26 | 33 |
|
27 | 34 | system_ram_mb = system_ram_kb / 1024 |
|
42 | 49 | # Make tachyon_mb as spark_mb for now. |
43 | 50 | tachyon_mb = spark_mb |
44 | 51 |
|
| 52 | +worker_instances = int(os.getenv("SPARK_WORKER_INSTANCES", 1)) |
| 53 | +# Distribute equally cpu cores among worker instances |
| 54 | +worker_cores = max(slave_cpus / worker_instances, 1) |
| 55 | + |
45 | 56 | template_vars = { |
46 | 57 | "master_list": os.getenv("MASTERS"), |
47 | 58 | "active_master": os.getenv("MASTERS").split("\n")[0], |
|
50 | 61 | "mapred_local_dirs": os.getenv("MAPRED_LOCAL_DIRS"), |
51 | 62 | "spark_local_dirs": os.getenv("SPARK_LOCAL_DIRS"), |
52 | 63 | "default_spark_mem": "%dm" % spark_mb, |
| 64 | + "spark_worker_instances": "%d" % worker_instances, |
| 65 | + "spark_worker_cores": "%d" % worker_cores, |
| 66 | + "spark_master_opts": os.getenv("SPARK_MASTER_OPTS"), |
53 | 67 | "spark_version": os.getenv("SPARK_VERSION"), |
54 | 68 | "shark_version": os.getenv("SHARK_VERSION"), |
55 | 69 | "hadoop_major_version": os.getenv("HADOOP_MAJOR_VERSION"), |
|
0 commit comments