Skip to content

Commit

Permalink
fc-ceph/osds: extract OSDs into separate units to avoid shared fate
Browse files Browse the repository at this point in the history
We had hosts run out of memory and the OSDs were all subsumed in the
same cgroup which caused them to be killed together because systemd
values consistency in a unit's state ...

There might be interesting cases here if NixOS decides to
activate/deactivate anything, but hopefully the current setup will cause
the "group unit" to catch this. I can imagine cases where this might
not be the case, but I can't show that they actually exist currently.

Re PL-131646
co-authored by [email protected]
  • Loading branch information
ctheune authored and osnyx committed Sep 19, 2023
1 parent 80747a2 commit 53a52a4
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 48 deletions.
30 changes: 29 additions & 1 deletion nixos/roles/ceph/osd.nix
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ in
flyingcircus.services.ceph.cluster_network = head fclib.network.stb.v4.networks;

systemd.services.fc-ceph-osds = rec {
description = "Start/stop local Ceph OSDs (via fc-ceph)";
description = "All locally known Ceph OSDs (via fc-ceph managed units)";
wantedBy = [ "multi-user.target" ];
# Ceph requires the IPs to be properly attached to interfaces so it
# knows where to bind to the public and cluster networks.
Expand Down Expand Up @@ -216,6 +216,34 @@ in
};
};

systemd.services."fc-ceph-osd@" = rec {
description = "Ceph OSD %i";
# Ceph requires the IPs to be properly attached to interfaces so it
# knows where to bind to the public and cluster networks.
wants = [ "network.target" ];
after = wants;

environment = {
PYTHONUNBUFFERED = "1";
};

restartIfChanged = false;

serviceConfig = {
Type = "forking";
Restart = "always";
PIDFile = "/run/ceph/osd.%i.pid";
ExecStart = ''
${fc-ceph}/bin/fc-ceph osd activate --as-systemd-unit %i
'';
ExecStop = ''
${fc-ceph}/bin/fc-ceph osd deactivate --as-systemd-unit %i
'';
};

};


})
(lib.mkIf (role.enable && role.config == "") {
flyingcircus.services.ceph.extraSettingsSections.osd = lib.recursiveUpdate
Expand Down
10 changes: 10 additions & 0 deletions pkgs/fc/ceph/src/fc/ceph/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def main(args=sys.argv[1:]):
parser_activate = osd_sub.add_parser(
"activate", help="Activate one or more OSDs."
)
parser_activate.add_argument(
"--as-systemd-unit",
action="store_true",
help="Flag if we are being called from the systemd unit startup.",
)
parser_activate.add_argument(
"ids",
help="IDs of OSD to activate. Use `all` to activate all local OSDs.",
Expand All @@ -102,6 +107,11 @@ def main(args=sys.argv[1:]):
parser_deactivate = osd_sub.add_parser(
"deactivate", help="Deactivate an OSD."
)
parser_deactivate.add_argument(
"--as-systemd-unit",
action="store_true",
help="Flag if we are being called from the systemd unit startup.",
)
parser_deactivate.add_argument(
"ids",
help="IDs of OSD to deactivate. "
Expand Down
187 changes: 140 additions & 47 deletions pkgs/fc/ceph/src/fc/ceph/osd/nautilus.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import traceback
from subprocess import CalledProcessError
from typing import Optional

from fc.ceph.util import kill, mount_status, run

Expand Down Expand Up @@ -51,7 +52,7 @@ def _list_local_osd_ids(self):
vgs = run.json.vgs("-S", r"vg_name=~^vgosd\-[0-9]+$")
return [int(vg["vg_name"].replace("vgosd-", "", 1)) for vg in vgs]

def _parse_ids(self, ids, allow_non_local=False):
def _parse_ids(self, ids: str, allow_non_local=False):
if ids == "all":
return self.local_osd_ids
ids = [int(x) for x in ids.split(",")]
Expand All @@ -72,7 +73,9 @@ def _parse_ids(self, ids, allow_non_local=False):
)
return ids

def create_filestore(self, device, journal, journal_size, crush_location):
def create_filestore(
self, device: str, journal: str, journal_size: str, crush_location: str
):
assert "=" in crush_location
assert journal in ["internal", "external"]
assert os.path.exists(device)
Expand Down Expand Up @@ -109,18 +112,55 @@ def create_bluestore(self, device, wal, crush_location):
osd = BlueStoreOSD(id_)
osd.create(device, wal, crush_location)

def activate(self, ids):
def activate(self, ids: str, as_systemd_unit: bool = False):
# special case optimisation
nonblocking_start = ids == "all"
ids = self._parse_ids(ids)
run.systemctl("start", "fc-blockdev")

for id_ in ids:
if as_systemd_unit:
if len(ids) > 1:
raise RuntimeError("Only single OSDs may be called as a unit.")
id_ = ids[0]
osd = OSD(id_)
try:
osd = OSD(id_)
osd.activate()
self._activate_single(osd, as_systemd_unit)
except Exception:
traceback.print_exc()
else:
for id_ in ids:
osd = OSD(id_)
self._activate_single(
osd, as_systemd_unit=False, nonblocking=nonblocking_start
)

def destroy(self, ids, unsafe_destroy, force_objectstore_type=None):
def _activate_single(
self,
osd: "GenericOSD",
as_systemd_unit: bool = False,
nonblocking: bool = False,
):
"""
entry point for low-level OSD operations that need to activate themselfs again,
without having to know about systemd units"""
# this is then also allowed to bubble up errors

if as_systemd_unit:
run.systemctl(
"start", "fc-blockdev"
) # FIXME shouldn't this better be handled via systemd requirements?
osd.activate()
else:
if nonblocking:
run.systemctl("start", "--no-block", f"fc-ceph-osd@{osd.id}")
else:
run.systemctl("start", f"fc-ceph-osd@{osd.id}")

def destroy(
self,
ids: str,
unsafe_destroy: bool,
force_objectstore_type: Optional[str] = None,
):
ids = self._parse_ids(ids, allow_non_local=f"DESTROY {ids}")

for id_ in ids:
Expand All @@ -130,36 +170,69 @@ def destroy(self, ids, unsafe_destroy, force_objectstore_type=None):
except Exception:
traceback.print_exc()

def deactivate(self, ids):
def deactivate(
self, ids: str, as_systemd_unit: bool = False, flush: bool = False
):
ids = self._parse_ids(ids)

threads = []
for id_ in ids:
try:
osd = OSD(id_)
thread = threading.Thread(target=osd.deactivate)
thread.start()
threads.append(thread)
except Exception:
traceback.print_exc()

for thread in threads:
thread.join()
if as_systemd_unit:
if len(ids) > 1:
raise RuntimeError("Only single OSDs may be called as a unit.")
id_ = ids[0]
osd = OSD(id_)
self._deactivate_single(osd, as_systemd_unit, flush)
else:
threads = []
for id_ in ids:
try:
osd = OSD(id_)
thread = threading.Thread(
target=lambda: self._deactivate_single(
osd, as_systemd_unit, flush
)
)
thread.start()
threads.append(thread)
except Exception:
traceback.print_exc()

for thread in threads:
thread.join()

def _deactivate_single(
self,
osd: "GenericOSD",
as_systemd_unit: bool = False,
flush: bool = False,
):
"""
entry point for low-level OSD operations that need to activate themselfs again,
without having to know about systemd units"""
if as_systemd_unit:
osd.deactivate()
else:
run.systemctl("stop", f"fc-ceph-osd@{osd.id}")
if flush:
osd.flush()

def reactivate(self, ids):
def reactivate(self, ids: str):
ids = self._parse_ids(ids)

for id_ in ids:
osd = OSD(id_)
wait_for_clean_cluster()
try:
osd = OSD(id_)
osd.deactivate(flush=False)
osd.activate()
self._deactivate_single(osd, as_systemd_unit=False, flush=False)
self._activate_single(osd, as_systemd_unit=False)
except Exception:
traceback.print_exc()

def rebuild(
self, ids, journal_size, unsafe_destroy, target_objectstore_type=None
self,
ids: str,
journal_size: str,
unsafe_destroy: bool,
target_objectstore_type: Optional[str] = None,
):
ids = self._parse_ids(ids)

Expand All @@ -172,7 +245,7 @@ def rebuild(
except Exception:
traceback.print_exc()

def prepare_journal(self, device):
def prepare_journal(self, device: str):
if not os.path.exists(device):
print(f"Device does not exist: {device}")
try:
Expand Down Expand Up @@ -277,7 +350,7 @@ def activate(self):

resource.setrlimit(resource.RLIMIT_NOFILE, (270000, 270000))

def deactivate(self, flush=True):
def deactivate(self):
print(f"Stopping OSD {self.id} ...")
kill(self.pid_file)

Expand Down Expand Up @@ -351,7 +424,9 @@ def create(
self._create_crush_and_auth(self.data_lv, crush_location)

# 6. activate OSD
self.activate()
osdmanager = OSDManager()
# rebuilds are never triggered from inside a systemd unit
osdmanager._activate_single(self, as_systemd_unit=False)

def _create_journal(journal_location, journal_size):
"""Hook function for creating journal/ WAL/ other similar devices.
Expand Down Expand Up @@ -421,9 +496,14 @@ def purge(self, unsafe_destroy):
sys.exit(10)

print(f"Destroying OSD {self.id} ...")
osdmanager = OSDManager()

try:
self.deactivate(flush=False)
# purging is never done from inside a single systemd unit and thus left to
# the OSD manager
osdmanager._deactivate_single(
self, as_systemd_unit=False, flush=False
)
except Exception as e:
print(e)

Expand Down Expand Up @@ -597,21 +677,21 @@ def activate(self):
# fmt: on
)

def deactivate(self, flush=True):
def deactivate(self):
# deactivate (shutdown osd, remove things but don't delete it, make
# the osd able to be relocated somewhere else)
super().deactivate()

if flush:
print(f"Flushing journal for OSD {self.id} ...")
run.ceph_osd(
# fmt: off
"-i", str(self.id),
"--flush-journal",
"--osd-data", self.datadir,
"--osd-journal", self._locate_journal_lv(),
# fmt: on
)
def flush(self):
print(f"Flushing journal for OSD {self.id} ...")
run.ceph_osd(
# fmt: off
"-i", str(self.id),
"--flush-journal",
"--osd-data", self.datadir,
"--osd-journal", self._locate_journal_lv(),
# fmt: on
)

def _create_journal(self, journal, journal_size):
# External journal
Expand Down Expand Up @@ -672,7 +752,12 @@ def purge(self, unsafe_destroy):
except ValueError:
pass

def rebuild(self, journal_size, unsafe_destroy, target_objectstore_type):
def rebuild(
self,
journal_size: str,
unsafe_destroy: bool,
target_objectstore_type: Optional[str],
):
"""Fully destroy and create the FileStoreOSD again with the same properties,
optionally converting it to another OSD type.
"""
Expand Down Expand Up @@ -791,13 +876,16 @@ def activate(self):
# fmt: on
)

def deactivate(
self, flush=False # ignored, just for call compatibility with FileStore
):
def deactivate(self):
# deactivate (shutdown osd, remove things but don't delete it
# FIXME: this is not sufficient for migrating the OSD to another host if it has
# an external WAL, that requires a manual outmigration command PL-130677
super().deactivate(flush=False)
super().deactivate()

def flush(self):
# trivial journal flushing is not implemented for Bluestore OSDs,
# WAL management is a bit more complex
pass

def _create_journal(self, wal, __size_is_ignored):
# External WAL
Expand Down Expand Up @@ -872,7 +960,12 @@ def purge(self, unsafe_destroy):
except ValueError:
pass

def rebuild(self, journal_size, unsafe_destroy, target_objectstore_type):
def rebuild(
self,
journal_size: str,
unsafe_destroy: bool,
target_objectstore_type: Optional[str],
):
"""Fully destroy and create the FileStoreOSD again with the same properties,
optionally converting it to another OSD type.
"""
Expand Down
2 changes: 2 additions & 0 deletions tests/ceph-nautilus.nix
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,9 @@ in
with subtest("Initialize first OSD (bluestore)"):
host1.execute('systemctl status [email protected] > /dev/kmsg 2>&1')
host1.execute('fc-ceph osd create-bluestore /dev/vdc > /dev/kmsg 2>&1')
host1.execute('systemctl status [email protected] > /dev/kmsg 2>&1')
with subtest("Initialize second MON and OSD (filestore)"):
host2.succeed('fc-ceph osd prepare-journal /dev/vdb')
Expand Down

0 comments on commit 53a52a4

Please sign in to comment.