Skip to content

Commit

Permalink
Add file based semaphore queue
Browse files Browse the repository at this point in the history
  • Loading branch information
oysols committed Sep 7, 2020
1 parent 4e9dc54 commit 8913e37
Show file tree
Hide file tree
Showing 14 changed files with 440 additions and 40 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ test:
python3 tests/test_taskrunner.py
python3 tests/test_failed_import.py
python3 tests/test_statesnapshot.py
python3 tests/test_semaphore.py

tasks:
python3 -m minimalci.taskrunner
python3 -m minimalci.taskrunner --commit $(SHA)

check:
mypy minimalci --strict
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ services:
- GITHUB_CLIENT_SECRET
- GITHUB_AUTHORIZED_USERS # user1,user2,user3

- ISOLATE_PYTHON=true # Only required for running minimalci on itself

restart: always
173 changes: 173 additions & 0 deletions minimalci/semaphore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
from typing import Iterator, List, Any, Dict, Tuple, ContextManager, Sequence, Optional, TypeVar
import json
import subprocess
import contextlib
import time
import inspect
import threading
from queue import Queue

from minimalci import semaphore_subprocess


# Multiple simultaneous locks


LockLikeType = TypeVar("LockLikeType", ContextManager[Any], threading.Lock)


def _aquire_lock_and_block_until_event(
lock: LockLikeType,
lock_aquired_queue: "Queue[Optional[LockLikeType]]",
release_event: threading.Event,
) -> None:
"""Get lock (blocking), add to queue and hold lock until receiving release event"""
try:
with lock:
lock_aquired_queue.put(lock)
release_event.wait()
except Exception as e:
print(f"Error getting lock: {e}")
lock_aquired_queue.put(None)


@contextlib.contextmanager
def aquire_either_lock(locks: Sequence[LockLikeType]) -> Iterator[LockLikeType]:
"""Aquire and yield the first of two locks
Get all locks in threads, yield first lock aquired and release all other locks immediately
Usage:
with aquire_either_lock([lock_a, lock_b]) as lock:
do_stuff()
"""
if not locks:
raise Exception("No locks provided")
release_events = [threading.Event() for lock in locks]
lock_aquired_queue: Queue[LockLikeType] = Queue()
# Try to get all locks in threads
for lock, release_event in zip(locks, release_events):
threading.Thread(
target=_aquire_lock_and_block_until_event,
args=(lock, lock_aquired_queue, release_event),
name=threading.current_thread().name, # For parsing print output in taskrunner
daemon=True,
).start()
# Wait for first lock aquired
aquired_lock = lock_aquired_queue.get()
if aquired_lock is None:
[release_event.set() for release_event in release_events]
raise Exception("Error getting lock")
# Make sure all other locks are released as soon as they are aquired
for lock, release_event in zip(locks, release_events):
if lock != aquired_lock:
release_event.set()
# Yield the aquired lock to caller
try:
yield aquired_lock
finally:
# Release the lock
release_events[locks.index(aquired_lock)].set()


# File based semaphore queue


def kill_thread(process: "subprocess.Popen[Any]", kill_signal: threading.Event) -> None:
while not (kill_signal.is_set() or process.poll() is not None):
kill_signal.wait(timeout=5)
if process.poll() is None:
process.terminate()


@contextlib.contextmanager
def semaphore_queue(path: str, self_description: str = "", verbose: bool = False, kill_signal: threading.Event = threading.Event()) -> Iterator[None]:
"""File based remote or local semaphore with self healing queue
Usage:
import semaphore
with semaphore.semaphore_queue("user@remote_host:./semaphore.queue"):
do_stuff()
Semaphore file format
{"concurrency": int, "queue": {"pid": int, "description": str}}
path can be remote or local
user@remote_host:./my/semaphore/path
./my/semaphore/path
Requires passwordless SSH access for remote connections
Sends python code to python subprocess on remote/local host
Communicates with magic string to indicate aquiry of semaphore.
Design goal:
Semaphore without relying on central authority.
Ease of use.
Do not require setup on remote hosts.
Gracefully handle and recover from failure conditions.
Support Linux and MacOS
Only rely on single simple file.
"""
if ":" in path:
host, filename = path.split(":")
command = ["ssh", host]
else:
filename = path
command = ["bash", "-ce"] # Run in shell for consistency with ssh version
command += [f"python3 -u - {filename} --self-description={self_description}"]
semaphore_process_source = inspect.getsource(semaphore_subprocess)
while True:
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
threading.Thread(
target=kill_thread,
args=(process, kill_signal),
daemon=True,
name=threading.current_thread().name,
).start()
try:
process.stdin.write(semaphore_process_source.encode()) # type: ignore
process.stdin.close() # type: ignore
for raw_line in iter(process.stdout.readline, b""): # type: ignore
line = raw_line.decode().strip()
if verbose:
if line.startswith(semaphore_subprocess.MESSAGE_PREFIX):
message = line[len(semaphore_subprocess.MESSAGE_PREFIX):]
print(message)
if line == semaphore_subprocess.SEMAPHORE_AQUIRED:
if verbose:
print(f"Semaphore aquired {path}")
try:
yield
finally:
if verbose:
print(f"Semaphore released {path}")
process.terminate()
process.wait()
return
if kill_signal.is_set() or process.wait() == 0: # Required to handle KeyboardInterrupt
raise Exception("Killed while waiting for semaphore")
print("Semaphore process crashed")
time.sleep(10)
print("Retrying semaphore")
finally:
process.terminate()
process.wait()


def read_queue(path: str) -> Tuple[str, List[Dict[str, Any]]]:
"""Read remote or local semaphore queue"""
if ":" in path:
host, filename = path.split(":")
command = ["ssh", host]
else:
filename = path
command = ["bash", "-ce"] # Run in shell for consistency with ssh version
command += [f"python3 -u - {filename} --read"]
process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
semaphore_process_source = inspect.getsource(semaphore_subprocess)
process.stdin.write(semaphore_process_source.encode()) # type: ignore
process.stdin.close() # type: ignore
raw_output = process.stdout.read() # type: ignore
concurrency, queue = json.loads(raw_output)
return concurrency, queue
124 changes: 124 additions & 0 deletions minimalci/semaphore_subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from typing import List, Any, Dict, Tuple
import json
import subprocess
import fcntl
import time
import os
from pathlib import Path
import signal
import argparse


# This file intentionally Python 3.5 compatible since it might be run on outdated hosts


SEMAPHORE_AQUIRED = "SEMAPHORE_AQUIRED"
MESSAGE_PREFIX = "MESSAGE:"


def read_and_update_queue(
filename: str,
add_self: bool = False,
remove_self: bool = False,
self_description: str = "",
) -> Tuple[int, List[Dict[str, Any]]]:
"""Gets lock on queue, verifies state of pids, add or removes its own pid"""
with open(filename, "r+") as f:
# Take file lock
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
# Load queue
raw_data = f.read()
try:
data = json.loads(raw_data)
concurrency = data["concurrency"]
queue = data["queue"]
except Exception:
raise Exception("Queue parse error", raw_data)
# Check that pids are running
try:
output = subprocess.check_output(
["ps", "-o", "pid,state", *[str(entry["pid"]) for entry in queue]]
).decode().strip()
running_pids = [
int(line.strip().split()[0])
for line in output.splitlines()[1:]
if line.strip().split()[1] != "Z" # Zombie process
]
except subprocess.CalledProcessError:
running_pids = []
# Prune queue based on only running pids
verified_queue = [entry for entry in queue if entry["pid"] in running_pids]
self_pid = os.getpid()
if add_self:
# Verify that self_pid is in the queue or insert
if self_pid not in [entry["pid"] for entry in verified_queue]:
verified_queue.append(
{"pid": self_pid, "description": self_description}
)
if remove_self:
# Remove self_pid entry from queue
verified_queue = [entry for entry in verified_queue if entry["pid"] != self_pid]
if verified_queue != queue:
# Write queue
f.seek(0)
new_data = {"concurrency": concurrency, "queue": verified_queue}
f.write(json.dumps(new_data, indent=4))
f.truncate()
return concurrency, verified_queue


def signal_handler(*args: Any) -> None:
raise Exception


def wait_in_queue(filename: str, self_description: str = "") -> None:
"""Reads and updates queue with own PID
Prunes dead PIDs
Communicates aquisition by magic string on stdout
Aquisition is determined by its pids order in the queue less than concurrency setting
"""
signal.signal(signal.SIGTERM, signal_handler) # Handle SIGTERM gracefully

if not Path(filename).is_file(): # Create queue first time for ease of use
Path(filename).write_text('{"concurrency": 1, "queue": []}')

try:
last_message = ""
while True:
concurrency, queue = read_and_update_queue(filename, add_self=True, self_description=self_description)
# Get number in queue
self_pid = os.getpid()
my_index = [entry["pid"] for entry in queue].index(self_pid)
# Check if should run
if my_index < concurrency:
break # Semaphore aquired
new_message = MESSAGE_PREFIX + "Position in queue: {} (concurrency {})".format(my_index, concurrency)
if new_message != last_message:
print(new_message)
last_message = new_message
else:
print() # Force crash if parent process is dead
time.sleep(1)

print(SEMAPHORE_AQUIRED) # Signal that we have aquired semaphore
while True:
time.sleep(1)
print() # Force crash if parent process is dead
except KeyboardInterrupt:
pass
finally:
read_and_update_queue(filename, remove_self=True)


if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Aquire semaphore from file queue')
parser.add_argument('filename', help='Path to queue')
parser.add_argument('--self-description', help='Description to add to self in queue', default='')
parser.add_argument('--read', help='Read queue and return as json', action="store_true")
args = parser.parse_args()

if args.read:
concurrency, queue = read_and_update_queue(args.filename)
print(json.dumps([concurrency, queue]))
else:
wait_in_queue(args.filename, args.self_description)
2 changes: 1 addition & 1 deletion minimalci/taskrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import argparse
import signal

from .executors import global_kill_signal_handler, Stash
from .executors import global_kill_signal_handler
from .tasks import Task, State


Expand Down
Loading

0 comments on commit 8913e37

Please sign in to comment.