Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions tests/jobs/python/long_running.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from __future__ import print_function


import sys
import time
from pyspark.sql import SparkSession


if __name__ == "__main__":
"""
Usage: long_running [partitions] [run_time_sec]
"""
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 5
run_time_sec = int(sys.argv[2]) if len(sys.argv) > 2 else 600

spark = SparkSession \
.builder \
.appName("Long-Running Spark Job") \
.getOrCreate()

n = 100000 * partitions
data = spark.sparkContext.parallelize(range(1, n + 1), partitions)

def processPartition(partition):
"""Sleep for run_time_sec"""
print('Start processing partition')
time.sleep(run_time_sec)
print('Done processing partition')

data.foreachPartition(processPartition)
print('Job completed successfully')

spark.stop()
89 changes: 89 additions & 0 deletions tests/test_recovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import logging
import os
import pytest
import re
import shakedown
import time

import utils
from utils import SPARK_PACKAGE_NAME


LOGGER = logging.getLogger(__name__)
THIS_DIR = os.path.dirname(os.path.abspath(__file__))
LONG_RUNNING_FW_NAME = "Long-Running Spark Job"
LONG_RUNNING_FW_NUM_TASKS = 1
MASTER_CONNECTION_TIMEOUT_SEC = 15 * 60
LONG_RUNNING_RUN_TIME_SEC = MASTER_CONNECTION_TIMEOUT_SEC + (15 * 60)


def setup_module(module):
utils.require_spark()


def teardown_module(module):
shakedown.uninstall_package_and_wait(SPARK_PACKAGE_NAME)


@pytest.mark.skip(reason="Waiting for upstream change, https://issues.apache.org/jira/browse/SPARK-21419")
@pytest.mark.recovery
def test_disconnect_from_master():
python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'long_running.py')
python_script_url = utils.upload_file(python_script_path)
task_id = utils.submit_job(python_script_url,
"{} {}".format(LONG_RUNNING_FW_NUM_TASKS, LONG_RUNNING_RUN_TIME_SEC),
["--conf", "spark.mesos.driver.failoverTimeout=1800",
"--conf", "spark.cores.max=1"])

# Wait until executor is running
LOGGER.info("Waiting for executor task to be RUNNING...")
shakedown.wait_for(lambda: utils.is_service_ready(LONG_RUNNING_FW_NAME, LONG_RUNNING_FW_NUM_TASKS),
ignore_exceptions=False,
timeout_seconds=600)

# Block the driver's connection to Mesos master
framework_info = shakedown.get_service(LONG_RUNNING_FW_NAME)
(driver_host, port) = _parse_fw_pid_host_port(framework_info["pid"])
_block_master_connection(driver_host, port)

# The connection will timeout after 15 minutes of inactivity.
# Add 5 minutes to make sure the master has detected the disconnection.
# The framework will be considered disconnected => failover_timeout kicks in.
LOGGER.info("Waiting {} seconds for connection with master to timeout...".format(MASTER_CONNECTION_TIMEOUT_SEC))
time.sleep(MASTER_CONNECTION_TIMEOUT_SEC + 5 * 60)

# Restore the connection. The driver should reconnect.
_unblock_master_connection(driver_host)

# The executor and driver should finish.
utils.check_job_output(task_id, "Job completed successfully")

# Due to https://issues.apache.org/jira/browse/MESOS-5180, the driver does not re-register, so
# teardown won't occur until the failover_timeout period ends. The framework remains "Inactive".
# Uncomment when the bug is fixed:
#_wait_for_completed_framework(LONG_RUNNING_FW_NAME, 60)


def _parse_fw_pid_host_port(pid):
# Framework pid looks like: "scheduler-cd28f2eb-3aec-4060-a731-f5be1f5186c4@10.0.1.7:37085"
regex = r"([^@]+)@([^:]+):(\d+)"
match = re.search(regex, pid)
return match.group(2), int(match.group(3))


def _block_master_connection(host, port):
LOGGER.info("Blocking connection with master")
shakedown.network.save_iptables(host)
# Reject incoming packets from master
shakedown.network.run_iptables(host, '-I INPUT -p tcp --dport {} -j REJECT'.format(port))


def _unblock_master_connection(host):
LOGGER.info("Unblocking connection with master")
shakedown.network.restore_iptables(host)


def _wait_for_completed_framework(fw_name, timeout_seconds):
shakedown.wait_for(lambda: utils.is_framework_completed(fw_name),
ignore_exceptions=False,
timeout_seconds=timeout_seconds)
21 changes: 5 additions & 16 deletions tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def setup_module(module):
if utils.hdfs_enabled():
utils.require_hdfs()
utils.require_spark()
_upload_file(os.environ["SCALA_TEST_JAR_PATH"])
utils.upload_file(os.environ["SCALA_TEST_JAR_PATH"])


def teardown_module(module):
Expand All @@ -42,7 +42,7 @@ def test_jar():
spark_job_runner_args = '{} dcos \\"*\\" spark:only 2 --auth-token={}'.format(
master_url,
shakedown.dcos_acs_token())
jar_url = _upload_file(os.getenv('TEST_JAR_PATH'))
jar_url = utils.upload_file(os.getenv('TEST_JAR_PATH'))
utils.run_tests(jar_url,
spark_job_runner_args,
"All tests passed",
Expand All @@ -62,9 +62,9 @@ def test_teragen():
@pytest.mark.sanity
def test_python():
python_script_path = os.path.join(THIS_DIR, 'jobs', 'python', 'pi_with_include.py')
python_script_url = _upload_file(python_script_path)
python_script_url = utils.upload_file(python_script_path)
py_file_path = os.path.join(THIS_DIR, 'jobs', 'python', 'PySparkTestInclude.py')
py_file_url = _upload_file(py_file_path)
py_file_url = utils.upload_file(py_file_path)
utils.run_tests(python_script_url,
"30",
"Pi is roughly 3",
Expand Down Expand Up @@ -97,7 +97,7 @@ def test_kerberos():
@pytest.mark.sanity
def test_r():
r_script_path = os.path.join(THIS_DIR, 'jobs', 'R', 'dataframe.R')
r_script_url = _upload_file(r_script_path)
r_script_url = utils.upload_file(r_script_path)
utils.run_tests(r_script_url,
'',
"Justin")
Expand Down Expand Up @@ -186,16 +186,5 @@ def _run_janitor(service_name):
auth=shakedown.dcos_acs_token()))


def _upload_file(file_path):
LOGGER.info("Uploading {} to s3://{}/{}".format(
file_path,
os.environ['S3_BUCKET'],
os.environ['S3_PREFIX']))

s3.upload_file(file_path)

basename = os.path.basename(file_path)
return s3.http_url(basename)

def _scala_test_jar_url():
return s3.http_url(os.path.basename(os.environ["SCALA_TEST_JAR_PATH"]))
32 changes: 29 additions & 3 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import re
import requests
import s3
import shakedown
import subprocess
import urllib
Expand Down Expand Up @@ -111,7 +112,11 @@ def _wait_for_hdfs():


def _is_hdfs_ready(expected_tasks = DEFAULT_HDFS_TASK_COUNT):
running_tasks = [t for t in shakedown.get_service_tasks(HDFS_SERVICE_NAME) \
return is_service_ready(HDFS_SERVICE_NAME, expected_tasks)


def is_service_ready(service_name, expected_tasks):
running_tasks = [t for t in shakedown.get_service_tasks(service_name) \
if t['state'] == 'TASK_RUNNING']
return len(running_tasks) >= expected_tasks

Expand All @@ -138,7 +143,11 @@ def _get_spark_options(options = None):


def run_tests(app_url, app_args, expected_output, args=[]):
task_id = _submit_job(app_url, app_args, args)
task_id = submit_job(app_url, app_args, args)
check_job_output(task_id, expected_output)


def check_job_output(task_id, expected_output):
LOGGER.info('Waiting for task id={} to complete'.format(task_id))
shakedown.wait_for_task_completion(task_id)
stdout = _task_log(task_id)
Expand Down Expand Up @@ -167,7 +176,19 @@ def create_secret(name, value):
dcos.http.put(url, data=json.dumps(data))


def _submit_job(app_url, app_args, args=[]):
def upload_file(file_path):
LOGGER.info("Uploading {} to s3://{}/{}".format(
file_path,
os.environ['S3_BUCKET'],
os.environ['S3_PREFIX']))

s3.upload_file(file_path)

basename = os.path.basename(file_path)
return s3.http_url(basename)


def submit_job(app_url, app_args, args=[]):
if is_strict():
args += ["--conf", 'spark.mesos.driverEnv.MESOS_MODULES=file:///opt/mesosphere/etc/mesos-scheduler-modules/dcos_authenticatee_module.json']
args += ["--conf", 'spark.mesos.driverEnv.MESOS_AUTHENTICATEE=com_mesosphere_dcos_ClassicRPCAuthenticatee']
Expand All @@ -193,3 +214,8 @@ def _task_log(task_id, filename=None):
LOGGER.info("Running {}".format(cmd))
stdout = subprocess.check_output(cmd, shell=True).decode('utf-8')
return stdout


def is_framework_completed(fw_name):
# The framework is not Active or Inactive
return shakedown.get_service(fw_name, True) is None