Skip to content

Commit

Permalink
Merge branch '5.x' into 7.x
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Aug 19, 2022
2 parents 7938ed5 + ae90093 commit 68ff7d1
Show file tree
Hide file tree
Showing 15 changed files with 184 additions and 38 deletions.
7 changes: 7 additions & 0 deletions bin/oio-blob-rebuilder
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# oio-blob-rebuilder.py
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2022 OVH SAS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -103,6 +104,11 @@ tube for broken chunks events.
'elsewhere. This option is useful if the chunks you are '
'rebuilding are not actually missing but are corrupted. '
'(default=%s)' % BlobRebuilder.DEFAULT_TRY_CHUNK_DELETE)
parser.add_argument(
'--read-all-available-sources', action='store_true',
help='For objects using erasure-coding, connect to all apparently '
'available chunks, to have backups in case one of them is '
'silently corrupt.')
parser.add_argument(
'--allow-frozen-container', action='store_true',
help="Allow rebuilding a chunk in a frozen container.")
Expand Down Expand Up @@ -153,6 +159,7 @@ def main():
# local
conf['concurrency'] = args.concurrency
conf['items_per_second'] = args.chunks_per_second
conf['read_all_available_sources'] = args.read_all_available_sources
conf['try_chunk_delete'] = args.delete_faulty_chunks
# distributed
conf['distributed_beanstalkd_worker_tube'] = args.distributed_tube
Expand Down
48 changes: 37 additions & 11 deletions oio/api/ec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
# Copyright (C) 2021-2022 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -34,6 +34,7 @@
from oio.common.http import HeadersDict, parse_content_range, \
ranges_from_http_header, headers_from_object_metadata
from oio.common.logger import get_logger
from oio.common.storage_method import ECDriverError
from oio.common.utils import fix_ranges, monotonic_time


Expand Down Expand Up @@ -341,7 +342,7 @@ def put_in_queue(fragment_iterator, queue):
ec_start = monotonic_time()
try:
segment = self.storage_method.driver.decode(data)
except exceptions.ECError:
except ECDriverError:
# something terrible happened
self.logger.exception(
"ERROR decoding fragments (reqid=%s)", self.reqid)
Expand Down Expand Up @@ -754,7 +755,7 @@ def __init__(self, sysmeta, meta_chunk, global_checksum, storage_method,
self.meta_chunk = meta_chunk
self.global_checksum = global_checksum
# Unlike plain replication, we cannot use the checksum returned
# by rawx services, whe have to compute the checksum client-side.
# by rawx services, we have to compute the checksum client-side.
self.checksum = hashlib.new(self.chunk_checksum_algo or 'md5')
self.connection_timeout = connection_timeout or io.CONNECTION_TIMEOUT
self.write_timeout = write_timeout or io.CHUNK_TIMEOUT
Expand Down Expand Up @@ -1090,13 +1091,14 @@ def stream(self):
class ECRebuildHandler(object):
def __init__(self, meta_chunk, missing, storage_method,
connection_timeout=None, read_timeout=None,
**_kwargs):
read_all_available_sources=False, **kwargs):
self.meta_chunk = meta_chunk
self.missing = missing
self.storage_method = storage_method
self.connection_timeout = connection_timeout or io.CONNECTION_TIMEOUT
self.read_timeout = read_timeout or io.CHUNK_TIMEOUT
self.logger = _kwargs.get('logger', LOGGER)
self.logger = kwargs.get('logger', LOGGER)
self.read_all_available_sources = read_all_available_sources

def _get_response(self, chunk, headers):
resp = None
Expand Down Expand Up @@ -1175,7 +1177,10 @@ def rebuild(self):
self.logger.warning(
'Use chunk(s) without size information to rebuild a chunk')

rebuild_iter = self._make_rebuild_iter(resps[:nb_data])
if self.read_all_available_sources:
rebuild_iter = self._make_rebuild_iter(resps)
else:
rebuild_iter = self._make_rebuild_iter(resps[:nb_data])
return assumed_chunk_size, rebuild_iter

def _make_rebuild_iter(self, resps):
Expand All @@ -1197,18 +1202,39 @@ def frag_iter():
pile.spawn(_get_frag, resp)
try:
with Timeout(self.read_timeout):
frag = [frag for frag in pile]
in_frags = [frag for frag in pile]
except Timeout as to:
self.logger.error('ERROR while rebuilding: %s', to)
except Exception:
self.logger.exception('ERROR while rebuilding')
break
if not all(frag):
if not all(in_frags):
break
rebuilt_frag = self._reconstruct(frag)
ok_frags = self._filter_broken_fragments(in_frags)
rebuilt_frag = self._reconstruct(ok_frags)
yield rebuilt_frag

return frag_iter()

def _reconstruct(self, frag):
return self.storage_method.driver.reconstruct(frag, [self.missing])[0]
def _filter_broken_fragments(self, frags):
"""
Try to read and check each fragment's EC metadata.
:returns: the list of fragments whose metadata is ok
"""
frag_md_list = []
ok_frags = []
for i, frag in enumerate(frags):
try:
frag_md = self.storage_method.driver.get_metadata(frag)
frag_md_list.append(frag_md)
ok_frags.append(frag)
except ECDriverError as err:
self.logger.error(
"Fragment %d in error, discarding it: %s", i, err)
# FIXME(FVE): here we should call verify_stripe_metadata(frag_md_list)
# but it does not work and I don't know why.
return ok_frags

def _reconstruct(self, frags):
return self.storage_method.driver.reconstruct(frags, [self.missing])[0]
8 changes: 5 additions & 3 deletions oio/blob/operator.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
# Copyright (C) 2021-2022 OVH SAS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -47,7 +47,8 @@ def __init__(self, conf, logger=None):

def rebuild(self, container_id, content_id, chunk_id_or_pos,
rawx_id=None, try_chunk_delete=False,
allow_frozen_container=True, allow_same_rawx=True):
allow_frozen_container=True, allow_same_rawx=True,
read_all_available_sources=False):
"""
Try to find the chunk in the metadata of the specified object,
then rebuild it.
Expand Down Expand Up @@ -86,7 +87,8 @@ def rebuild(self, container_id, content_id, chunk_id_or_pos,
chunk_id, service_id=rawx_id,
allow_frozen_container=allow_frozen_container,
allow_same_rawx=allow_same_rawx,
chunk_pos=chunk_pos)
chunk_pos=chunk_pos,
read_all_available_sources=read_all_available_sources)

if try_chunk_delete:
try:
Expand Down
9 changes: 7 additions & 2 deletions oio/blob/rebuilder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
# Copyright (C) 2021-2022 OVH SAS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -38,6 +38,7 @@ class BlobRebuilder(Tool):
DEFAULT_RDIR_TIMEOUT = 60.0
DEFAULT_ALLOW_FROZEN_CT = False
DEFAULT_ALLOW_SAME_RAWX = True
DEFAULT_READ_ALL_AVAILABLE_SOURCES = False
DEFAULT_TRY_CHUNK_DELETE = False
DEFAULT_DRY_RUN = False

Expand Down Expand Up @@ -255,6 +256,9 @@ def __init__(self, tool, queue_workers, queue_reply):
'allow_frozen_container', self.tool.DEFAULT_ALLOW_FROZEN_CT))
self.allow_same_rawx = true_value(self.tool.conf.get(
'allow_same_rawx', self.tool.DEFAULT_ALLOW_SAME_RAWX))
self.read_all_available_sources = true_value(self.tool.conf.get(
'read_all_available_sources',
self.tool.DEFAULT_READ_ALL_AVAILABLE_SOURCES))
self.try_chunk_delete = true_value(self.tool.conf.get(
'try_chunk_delete', self.tool.DEFAULT_TRY_CHUNK_DELETE))
self.dry_run = true_value(self.tool.conf.get(
Expand All @@ -280,7 +284,8 @@ def _process_item(self, item):
rawx_id=self.tool.rawx_id,
try_chunk_delete=self.try_chunk_delete,
allow_frozen_container=self.allow_frozen_container,
allow_same_rawx=self.allow_same_rawx)
allow_same_rawx=self.allow_same_rawx,
read_all_available_sources=self.read_all_available_sources)
except OioException as exc:
if not isinstance(exc, OrphanChunk):
raise RetryLater(exc)
Expand Down
9 changes: 8 additions & 1 deletion oio/cli/admin/xcute/rawx.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2019-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
# Copyright (C) 2021-2022 OVH SAS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -51,6 +51,11 @@ def get_parser(self, prog_name):
'rebuilt elsewhere. This option is useful if the chunks '
'you are rebuilding are not actually missing but are '
'corrupted.')
parser.add_argument(
'--read-all-available-sources', action='store_true',
help='For objects using erasure-coding, connect to all apparently '
'available chunks, to have backups in case one of them is '
'silently corrupt.')
parser.add_argument(
'--allow-frozen-container', action='store_true',
help='Allow rebuilding a chunk in a frozen container.')
Expand All @@ -74,6 +79,8 @@ def get_job_config(self, parsed_args):
'rdir_timeout': parsed_args.rdir_timeout,
'rawx_timeout': parsed_args.rawx_timeout,
'dry_run': parsed_args.dry_run,
'read_all_available_sources':
parsed_args.read_all_available_sources,
'try_chunk_delete': parsed_args.delete_faulty_chunks,
'allow_frozen_container': parsed_args.allow_frozen_container,
'set_incident_date': parsed_args.set_incident_date,
Expand Down
4 changes: 3 additions & 1 deletion oio/cli/object/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2017 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2022 OVH SAS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
Expand Down Expand Up @@ -26,6 +27,7 @@ def make_client(instance):
endpoint=instance.get_endpoint('storage'),
namespace=instance.namespace,
admin_mode=instance.admin_mode,
perfdata=instance.cli_conf().get('perfdata')
perfdata=instance.cli_conf().get('perfdata'),
logger=instance.logger
)
return client
4 changes: 2 additions & 2 deletions oio/content/content.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2020-2021 OVH SAS
# Copyright (C) 2020-2022 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -238,7 +238,7 @@ def _create_object(self, **kwargs):
**kwargs)

def rebuild_chunk(self, chunk_id, service_id=None, allow_same_rawx=False,
chunk_pos=None, allow_frozen_container=False):
chunk_pos=None, allow_frozen_container=False, **kwargs):
raise NotImplementedError()

def create(self, stream, **kwargs):
Expand Down
9 changes: 6 additions & 3 deletions oio/content/ec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
# Copyright (C) 2021-2022 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -29,7 +29,8 @@
class ECContent(Content):
def rebuild_chunk(self, chunk_id, service_id=None,
allow_same_rawx=False, chunk_pos=None,
allow_frozen_container=False):
allow_frozen_container=False,
read_all_available_sources=False):
# Identify the chunk to rebuild
candidates = self.chunks.filter(id=chunk_id)
if service_id is not None:
Expand Down Expand Up @@ -71,7 +72,9 @@ def rebuild_chunk(self, chunk_id, service_id=None,

# Regenerate the lost chunk's data, from existing chunks
handler = ECRebuildHandler(
chunks.raw(), current_chunk.subpos, self.storage_method)
chunks.raw(), current_chunk.subpos, self.storage_method,
read_all_available_sources=read_all_available_sources,
logger=self.logger)
expected_chunk_size, stream = handler.rebuild()

# Actually create the spare chunk
Expand Down
4 changes: 2 additions & 2 deletions oio/content/plain.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
# Copyright (C) 2021-2022 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -54,7 +54,7 @@ def create(self, stream, **kwargs):

def rebuild_chunk(self, chunk_id, service_id=None,
allow_same_rawx=False, chunk_pos=None,
allow_frozen_container=False):
allow_frozen_container=False, **_kwargs):
# Identify the chunk to rebuild
candidates = self.chunks.filter(id=chunk_id)
if service_id is not None:
Expand Down
19 changes: 14 additions & 5 deletions oio/xcute/jobs/blob_rebuilder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2019-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2022 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -33,6 +34,8 @@ def __init__(self, conf, job_params, logger=None):
self.rawx_timeout = job_params['rawx_timeout']
self.allow_frozen_container = job_params['allow_frozen_container']
self.allow_same_rawx = job_params['allow_same_rawx']
self.read_all_available_sources = \
job_params['read_all_available_sources']
self.try_chunk_delete = job_params['try_chunk_delete']
self.dry_run = job_params['dry_run']

Expand All @@ -52,11 +55,12 @@ def process(self, task_id, task_payload, reqid=None):
self.logger.debug('[reqid=%s] Rebuilding %s', reqid, chunk_id)
try:
chunk_size = self.chunk_operator.rebuild(
container_id, content_id, chunk_id,
rawx_id=self.service_id,
try_chunk_delete=self.try_chunk_delete,
allow_frozen_container=self.allow_frozen_container,
allow_same_rawx=self.allow_same_rawx)
container_id, content_id, chunk_id,
rawx_id=self.service_id,
try_chunk_delete=self.try_chunk_delete,
allow_frozen_container=self.allow_frozen_container,
allow_same_rawx=self.allow_same_rawx,
read_all_available_sources=self.read_all_available_sources)
except (ContentNotFound, OrphanChunk):
return {'orphan_chunks': 1}

Expand All @@ -74,6 +78,7 @@ class RawxRebuildJob(XcuteRdirJob):
DEFAULT_TRY_CHUNK_DELETE = False
DEFAULT_ALLOW_FROZEN_CT = False
DEFAULT_DECLARE_INCIDENT_DATE = False
DEFAULT_READ_ALL_AVAILABLE_SOURCES = False

@classmethod
def sanitize_params(cls, job_params):
Expand All @@ -98,6 +103,10 @@ def sanitize_params(cls, job_params):
job_params.get('allow_same_rawx'),
cls.DEFAULT_ALLOW_SAME_RAWX)

sanitized_job_params['read_all_available_sources'] = boolean_value(
job_params.get('read_all_available_sources'),
cls.DEFAULT_READ_ALL_AVAILABLE_SOURCES)

sanitized_job_params['try_chunk_delete'] = boolean_value(
job_params.get('try_chunk_delete'),
cls.DEFAULT_TRY_CHUNK_DELETE)
Expand Down
Loading

0 comments on commit 68ff7d1

Please sign in to comment.