Skip to content

Commit

Permalink
Try to rebuild with another destination if there is a conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
AymericDu committed Jun 10, 2021
1 parent 194f0ac commit 757c5b8
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 27 deletions.
3 changes: 3 additions & 0 deletions oio/api/io.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -680,6 +681,8 @@ def quorum_or_fail(self, successes, failures):
raise exc.SourceReadTimeout(new_exc)
elif isinstance(err, (exc.OioTimeout, green.OioTimeout)):
raise exc.OioTimeout(new_exc)
elif err == 'HTTP 409':
raise exc.Conflict(new_exc)
raise new_exc


Expand Down
4 changes: 3 additions & 1 deletion oio/blob/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -119,7 +120,8 @@ def chunk_put(self, url, meta, data, **kwargs):
else 'chunk_hash']
writer = ReplicatedMetachunkWriter(
meta, [chunk], FakeChecksum(checksum),
storage_method, quorum=1, perfdata=self.perfdata)
storage_method, quorum=1, perfdata=self.perfdata,
logger=self.logger)
bytes_transferred, chunk_hash, _ = writer.stream(data, None)
return bytes_transferred, chunk_hash

Expand Down
4 changes: 2 additions & 2 deletions oio/content/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, conf, container_id, metadata, chunks, storage_method,
self.chunks = ChunksHelper(chunks)
self.storage_method = storage_method
self.logger = logger or get_logger(self.conf)
self.blob_client = (blob_client or BlobClient(conf))
self.blob_client = blob_client or BlobClient(conf, logger=self.logger)
self.container_client = (container_client
or ContainerClient(self.conf,
logger=self.logger))
Expand Down Expand Up @@ -165,7 +165,7 @@ def _get_spare_chunk(self, chunks_notin, chunks_broken, position,
bal = ensure_better_chunk_qualities(chunks_broken, quals)
break
except (exc.ClientException, exc.SpareChunkException) as err:
self.logger.info(
self.logger.warning(
"Failed to find spare chunk (attempt %d/%d): %s",
attempt + 1, max_attempts, err)
last_exc = err
Expand Down
4 changes: 3 additions & 1 deletion oio/content/factory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2015-2018 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -32,7 +33,8 @@ def __init__(self, conf, container_client=None, blob_client=None,
self.logger = logger or get_logger(conf)
self.container_client = container_client or \
ContainerClient(conf, logger=self.logger, **kwargs)
self.blob_client = blob_client or BlobClient(conf, **kwargs)
self.blob_client = blob_client or \
BlobClient(conf, logger=self.logger, **kwargs)

def _get(self, container_id, meta, chunks,
account=None, container_name=None, **kwargs):
Expand Down
56 changes: 47 additions & 9 deletions oio/content/plain.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2015-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -13,13 +14,14 @@
# You should have received a copy of the GNU Lesser General Public
# License along with this library.

import sys
from six import reraise

from oio.api.replication import ReplicatedWriteHandler
from oio.common.storage_functions import _sort_chunks, fetch_stream
from oio.common.storage_method import STORAGE_METHODS
from oio.content.content import Content, Chunk
from oio.common import exceptions as exc
from oio.common.exceptions import UnrecoverableContent
from oio.common.exceptions import Conflict, OrphanChunk, UnrecoverableContent
from oio.common.storage_functions import _get_weighted_random_score
from oio.common.utils import group_chunk_errors

Expand Down Expand Up @@ -59,7 +61,7 @@ def rebuild_chunk(self, chunk_id, service_id=None,
candidates = candidates.filter(host=service_id)
current_chunk = candidates.one()
if current_chunk is None and chunk_pos is None:
raise exc.OrphanChunk("Chunk not found in content")
raise OrphanChunk("Chunk not found in content")
if chunk_pos is None:
chunk_pos = current_chunk.pos

Expand Down Expand Up @@ -98,16 +100,52 @@ def rebuild_chunk(self, chunk_id, service_id=None,
errors = list()
for src in duplicate_chunks:
try:
self.blob_client.chunk_copy(
src.url, spare_url, chunk_id=chunk_id,
fullpath=self.full_path, cid=self.container_id,
path=self.path, version=self.version,
content_id=self.content_id)
first = True
while True:
try:
self.blob_client.chunk_copy(
src.url, spare_url, chunk_id=chunk_id,
fullpath=self.full_path, cid=self.container_id,
path=self.path, version=self.version,
content_id=self.content_id)
break
except Conflict as exc:
if not first:
raise
storage_method = STORAGE_METHODS.load(
self.chunk_method)
if len(duplicate_chunks) \
> storage_method.expected_chunks - 2:
raise
# Now that chunk IDs are predictable,
# it is possible to have conflicts
# with another chunk being rebuilt at the same time.
exc_info = sys.exc_info()
first = False
self.logger.warning(
'The chunk destination is already used '
'by another chunk, '
'retrying to use another destination: %s', exc)
try:
chunk_notin = current_chunk.raw().copy()
chunk_notin['url'] = spare_url
chunks_notin = duplicate_chunks + [
Chunk(chunk_notin)]
spare_urls, _quals = self._get_spare_chunk(
chunks_notin, broken_list,
position=current_chunk.pos)
spare_url = spare_urls[0]
continue
except Exception as exc2:
self.logger.warning(
'Failed to find another destination: %s',
exc2)
reraise(exc_info[0], exc_info[1], exc_info[2])
self.logger.debug('Chunk copied from %s to %s, registering it',
src.url, spare_url)
break
except Exception as err:
self.logger.warn(
self.logger.warning(
"Failed to copy chunk from %s to %s: %s %s", src.url,
spare_url, type(err), err)
errors.append((src.url, err))
Expand Down
14 changes: 6 additions & 8 deletions oio/event/beanstalk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand Down Expand Up @@ -605,17 +606,14 @@ def wait_until_empty(self, tube, timeout=float('inf'), poll_interval=0.2,
"""
Wait until the the specified tube is empty, or the timeout expires.
"""
# TODO(FVE): check tube stats to ensure some jobs have passed through
# and then get rid of the initial_delay
# peek-ready requires "use", not "watch"
self.use(tube)
deadline = time.time() + timeout
if initial_delay > 0.0:
time.sleep(initial_delay)
job_id, _ = self.peek_ready()
deadline = time.time() + timeout
while job_id is not None and time.time() < deadline:
stats = self.stats_tube(tube)
while (stats['current-jobs-ready'] or stats['current-jobs-reserved']) \
and time.time() < deadline:
time.sleep(poll_interval)
job_id, _ = self.peek_ready()
stats = self.stats_tube(tube)

def wait_for_ready_job(self, tube, timeout=float('inf'),
poll_interval=0.2):
Expand Down
12 changes: 6 additions & 6 deletions tests/functional/event/filters/test_filters.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-

# Copyright (C) 2017-2020 OpenIO SAS, as part of OpenIO SDS
# Copyright (C) 2021 OVH SAS
#
# This library is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
Expand All @@ -16,7 +17,6 @@
# License along with this library.

import time
import subprocess
from random import choice
from six.moves.urllib_parse import quote
from oio.account.client import AccountClient
Expand Down Expand Up @@ -96,11 +96,11 @@ def _is_chunks_created(self, previous, after, pos_created, time_point):
return True

def _rebuild(self, event, job_id=0):
self.blob_rebuilder = subprocess.Popen(
['oio-blob-rebuilder', self.ns,
'--beanstalkd=' + self.queue_url])
time.sleep(3)
self.blob_rebuilder.kill()
bt = Beanstalk.from_url(self.conf['queue_url'])
bt.wait_until_empty(BlobRebuilder.DEFAULT_BEANSTALKD_WORKER_TUBE,
timeout=0.5)
bt.close()
time.sleep(2)

def _remove_chunks(self, chunks, content_id):
if not chunks:
Expand Down

0 comments on commit 757c5b8

Please sign in to comment.