Skip to content

[Refactor]: Retry client #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/curp/curp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
Test for curp client
"""

from datetime import timedelta
import pytest
from src.curp.unary import UnaryBuilder, UnaryConfig
from src.curp.retry import Retry, RetryConfig
from src.rpc.type import CurpError
from curp_command_pb2 import ProposeId
from api.xline.xline_command_pb2 import Command, CommandResponse, SyncResponse
Expand Down Expand Up @@ -235,3 +237,44 @@ async def test_unary_propose_return_early_err():
await unary.repeatable_propose(ProposeId(seq_num=9), Command())
except CurpError as e:
assert isinstance(e, CurpError)


@pytest.mark.asyncio
async def test_retry_propose_return_no_retry_error():
"""
Test retry propose return no retry error
"""
all_members = {
0: ["127.0.0.1:48081"],
1: ["127.0.0.1:48082"],
2: ["127.0.0.1:48083"],
3: ["127.0.0.1:48084"],
4: ["127.0.0.1:48085"],
}
config = UnaryConfig(1, 2)
unary = UnaryBuilder(all_members, config).set_leader_state(0, 1).build()
retry = Retry(unary, RetryConfig.new_fixed(timedelta(milliseconds=100), 3))
try:
await retry.propose(Command(), use_fast_path=False)
except CurpError as e:
assert e.inner.HasField("ShuttingDown")

@pytest.mark.asyncio
async def test_retry_propose_return_retry_error():
"""
Test retry propose return retry error
"""
all_members = {
0: ["127.0.0.1:48081"],
1: ["127.0.0.1:48082"],
2: ["127.0.0.1:48083"],
3: ["127.0.0.1:48084"],
4: ["127.0.0.1:48085"],
}
config = UnaryConfig(1, 2)
unary = UnaryBuilder(all_members, config).set_leader_state(0, 1).build()
retry = Retry(unary, RetryConfig.new_fixed(timedelta(milliseconds=100), 3))
try:
await retry.propose(Command(), use_fast_path=False)
except CurpError as e:
assert e.inner.HasField("Internal")
164 changes: 164 additions & 0 deletions src/curp/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
"""
Retry client
"""

from __future__ import annotations
import logging
import time
from enum import Enum
from datetime import timedelta
from dataclasses import dataclass
from src.curp.unary import Unary
from src.rpc.type import CurpError
from api.curp.curp_command_pb2 import CurpError as _CurpError
from api.xline.xline_command_pb2 import Command, CommandResponse, SyncResponse


class BackoffConfig(Enum):
"""
Backoff config
"""

# A fixed delay backoff
FIXED = 1
# A exponential delay backoff
EXPONENTIAL = 2


@dataclass
class RetryConfig:
"""
Retry config to control the retry policy
"""

# Backoff config
backoff: BackoffConfig
# Initial delay
delay: timedelta
# Control the max delay of exponential
max_delay: timedelta | None
# Retry count
count: int

@classmethod
def new_fixed(cls, delay: timedelta, count: int) -> RetryConfig:
"""
Backoff config
"""
if count <= 0:
raise RuntimeError("retry count should be larger than 0")
return cls(BackoffConfig.FIXED, delay, None, count)

@classmethod
def new_exponential(cls, delay: timedelta, max_delay: timedelta, count: int) -> RetryConfig:
"""
Backoff config
"""
if count <= 0:
raise RuntimeError("retry count should be larger than 0")
return cls(BackoffConfig.EXPONENTIAL, delay, max_delay, count)

def init_backoff(self) -> Backoff:
"""
Create a backoff process
"""
return Backoff(self, self.delay, self.count)


@dataclass
class Backoff:
"""
Backoff tool
"""

# The retry config
config: RetryConfig
# Current delay
cur_delay: timedelta
# Total RPC count
count: int

def next_delay(self) -> timedelta | None:
"""
Get the next delay duration, None means the end.
"""
if self.count == 0:
return None
self.count -= 1
cur = self.cur_delay
if self.config.backoff == BackoffConfig.EXPONENTIAL:
if self.config.max_delay is None:
raise RuntimeError("max_delay must be set, if use exponential backoff")
self.cur_delay = min(cur * 2, self.config.max_delay)
return cur


@dataclass
class Retry:
"""
The retry client automatically retry the requests of the inner client api
"""

# Inner client
inner: Unary
# Retry config
config: RetryConfig

async def propose(self, cmd: Command, use_fast_path: bool) -> tuple[CommandResponse, SyncResponse | None]:
"""
Send propose to the whole cluster, `use_fast_path` set to `false` to fallback into ordered
requests (event the requests are commutative).
"""
backoff = self.config.init_backoff()
last_err: CurpError | None = None

while True:
delay = backoff.next_delay()
if delay is None:
break

propose_id = self.inner.gen_propose_id()

try:
return await self.inner.repeatable_propose(propose_id, cmd, use_fast_path)
except CurpError as e:
# some errors that should not retry
if (
e.inner.HasField("Duplicated")
or e.inner.HasField("ShuttingDown")
or e.inner.HasField("InvalidConfig")
or e.inner.HasField("NodeNotExists")
or e.inner.HasField("NodeAlreadyExists")
or e.inner.HasField("LearnerNotCatchUp")
):
raise e
# some errors that could have a retry
if (
e.inner.HasField("ExpiredClientId")
or e.inner.HasField("KeyConflict")
or e.inner.HasField("Internal")
):
pass
# update leader state if we got a rpc transport error
if e.inner.HasField("RpcTransport"):
try:
await self.inner.fetch_leader_id()
except CurpError as err:
logging.warning("fetch leader failed, error %s", err)
# update the cluster state if got WrongClusterVersion
if e.inner.HasField("WrongClusterVersion"):
# the inner client should automatically update cluster state when fetch_cluster
try:
await self.inner.fetch_cluster()
except CurpError as err:
logging.warning("fetch leader failed, error %s", err)
# update the leader state if got Redirect
if e.inner.HasField("redirect"):
leader_id = e.inner.redirect.leader_id
term = e.inner.redirect.term
self.inner.update_leader(leader_id, term)

last_err = e
time.sleep(delay.total_seconds())

raise CurpError(_CurpError(Internal=f"request timeout, last error: {last_err}"))
31 changes: 31 additions & 0 deletions src/curp/retry_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""
Retry test
"""

from datetime import timedelta
from src.curp.retry import RetryConfig


def test_fixed_backoff_works():
"""
Test fixed backoff works
"""
config = RetryConfig.new_fixed(timedelta(seconds=1), 3)
backoff = config.init_backoff()
assert backoff.next_delay() == timedelta(seconds=1)
assert backoff.next_delay() == timedelta(seconds=1)
assert backoff.next_delay() == timedelta(seconds=1)
assert backoff.next_delay() is None


def test_exponential_backoff_works():
"""
TestExponentialBackoffWorks
"""
config = RetryConfig.new_exponential(timedelta(seconds=1), timedelta(seconds=5), 4)
backoff = config.init_backoff()
assert backoff.next_delay() == timedelta(seconds=1)
assert backoff.next_delay() == timedelta(seconds=2)
assert backoff.next_delay() == timedelta(seconds=4)
assert backoff.next_delay() == timedelta(seconds=5) # 8 > 5
assert backoff.next_delay() is None
6 changes: 6 additions & 0 deletions src/curp/unary.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,12 @@ async def fetch_cluster(self) -> FetchClusterResponse:
raise curp_err

raise CurpError(_CurpError(RpcTransport=Empty()))

def update_leader(self, leader_id: int | None, term: int) -> bool:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that there is no lock protects check_and_update_leader.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may add an additional lock here to protect update_leader.

"""
Update leader
"""
return self.check_and_update_leader(self.__state, leader_id, term)

def check_and_update(self, res: FetchClusterResponse) -> None:
"""
Expand Down