Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BaseException at I/O corrupts Connection #2499

Closed
ikonst opened this issue Dec 10, 2022 · 52 comments
Closed

BaseException at I/O corrupts Connection #2499

ikonst opened this issue Dec 10, 2022 · 52 comments

Comments

@ikonst
Copy link

ikonst commented Dec 10, 2022

tl;dr repeat of #360

My codebase uses gevent (w/monkey-patching) for concurrent workers, and schedules a gevent.Timeout to force a deadline on the workers. Since a timeout causes an exception to be raised from an arbitrary "yield point", there's risk of corrupting shared state if code is not written to be exception-safe.

The code talks to redis over a client that's shared between the workers. Socket I/O is a "yield point". Sending a command successfully, but then failing to read the entire response off the socket, gets Connection into an undefined state: a subsequent command would try to parse a response intended for a previous command, and in many cases would fail. Here's what a typical error would look like:

  File "site-packages/redis/client.py", line 3401, in parse_response
    result = Redis.parse_response(
  File "site-packages/redis/client.py", line 774, in parse_response
    return self.response_callbacks[command_name](response, **options)
  File "site-packages/redis/client.py", line 528, in <lambda>
    'SET': lambda r: r and nativestr(r) == 'OK',
  File "site-packages/redis/_compat.py", line 135, in nativestr
    return x if isinstance(x, str) else x.decode('utf-8', 'replace')
AttributeError: 'int' object has no attribute 'decode'

As you can see, we're trying to parse an integer response to a previous command (e.g. ":12345\r\n") as a string response to a SET command.

The except Exception block in redis.connection.Connection.read_response is intended to handle all sorts of parsing errors by disconnecting the connection:

except Exception:
self.disconnect()
raise

but perhaps it could be changed to except: since e.g. gevent.Timeout is intentionally a BaseException to suggest that it should not be suppressed (and we are indeed not suppressing it).

@ikonst
Copy link
Author

ikonst commented Dec 10, 2022

Digging through history:

@ikonst
Copy link
Author

ikonst commented Dec 11, 2022

P.S. I can think of a few alternatives to self.disconnect() where the onus on recovery is on the next caller rather than the current one (thus we don't have to do risky things in an exception handler).

  1. After send_command, increase a self._expected_responses counter. In parse_response, decrease it. Before send_command, ensure it's down to zero by running _ = self.parse_response() as many times as needed.
  2. Without maintaining a counter, try and flush the socket by reading all that's on it before send_command. I suspect it's race-y since nothing guarantees that the previous command's response won't arrive right after that flush.

I suspect we chose to self.disconnect() because overhead of connecting to redis is low, and an exception might suggest some other form of persistent corruption?

Update: After considering I've realized this is fundamentally unsafe to continue using the socket, since we don't know how much data was sent by sendall nor how much was read off the socket by recv.

@ikonst
Copy link
Author

ikonst commented Dec 11, 2022

I need to better understand motivation of #2104 since it's directly at odds with this issue. It appears to focus on PubSub. Since PubSub makes the connection receive ad-hoc responses (in addition to responses to commands), a disconnection probably causes a brief data loss.

Since PubSub responses are designed to be unambiguous — response is a list whose first element indicates message type, e.g. "subscribe" for a response to user command, or "message" for a server-pushed message — a possible compromise is that PubSub triggers a special mode in Connection which disables disconnects on BaseException.

@kristjanvalur
Copy link
Contributor

Generally you want to avoid logic on BaseExceptions. They are either fatal (thread exit) or intended as signalling exceptions, such as timeout. Code running in BaseException handlers can cause trouble when it, itself, raises exceptions.

Typically what happens is that the BaseException is unhandled and then dealt with at a higher level, such as application level, by discarding the connection. It is most unusual to do this at such a low level.

I think that the motivation behind the change in the Synchronous api was to better reflect that coding practice. Leave BaseExceptions in place, because they usually should be left alone. Handlers then do the right thing at the place where the error is caught.

Normally, I'd suggest you catch the gevent.Timout exception higher up. You are catching it somewhere, otherwise you'd not be able to timeout. You can then decide what to do. Drop the connection? Retry reading the response? It is an application level decision. The low level api should not be making that choice for you.
However, timeout semantics in the Synchronous api are all over the place and it may well be that the behaviour you describe is best.

@Chronial
Copy link
Contributor

Chronial commented Dec 11, 2022

Generally you want to avoid logic on BaseExceptions.

This is simply not true and does not reflect a sensible coding practice. A BaseException means that the stackframe was left at an unexpected point and should be handled exactly as such. If you have code for which this matters, you need to react to the BaseException.

Typically what happens is that the BaseException is unhandled and then dealt with at a higher level, such as application level, by discarding the connection. It is most unusual to do this at such a low level.

You seem to be missing the point of the exception handler in the send_packed_command function: Transmission of data failed in the middle of a command, the redis protocol (which the Connection object encapsulates) is now in an undefined state. The connection is completely useless, because you are de-synchronized from the server. There is nothing you can do to remedy this – the connection needs to be closed.

If you had actually taken the time to look at the history of the code you where modifying in #2104as @ikonst did – you would already know all of this.

Normally, I'd suggest you catch the gevent.Timout exception higher up. [...] You can then decide what to do. Drop the connection?

You can not "deal with [this] at a higher level", because you now lack the necessary information to understand what happened. There is no way to know from the outside that the connection is garbage. To fix this, instead of closing the connection, the Exception handler could set a is_broken_needs_closing flag on the Connection object, but I fail to see what the point of that would be?

In #2103 you wrote

The Timout should leave the connection in the state that it was, [...]

which is wishing for the impossible. You interrupted an operation while it was executing – unless you have a transactional system like in a database, you can't go back to the "state that it was".

I understand your use-case in #2104 – since the connection is mostly idle during pubsub, you don't think you'll interrupt an ongoing transmission, but assume that the timeout will happen while the connection is idle. But:

  • This is only ever possible in this one specific case of pubsub: Usually redis connections are not "idle" during the execution of command. While fixing your specific issue, you just strictly broke the code for everybody else
  • You also broke your own use case, because you might still interrupt the code while data is being transmitted, which would again leave the connection in a broken state. Your are simply hoping for that not to happen

Your whole approach is broken: You can't just use an external package to force timeout exceptions into network protocols and assume that connections will remain in a usable state. redis-py needs to handle the timeout itself. I just had a look the API here, and you will find that it does exactly that: The get_message method of PubSub takes a timeout parameter. That is what you should have been using all along instead of breaking the low-level implementation of the Connection class.

@ikonst
Copy link
Author

ikonst commented Dec 11, 2022

Code running in BaseException handlers can cause trouble when it, itself, raises exceptions.

I agree that exception handling is sensitive for BaseException. However, finally clauses effectively do that, so this is done more often that it may seem.

Also, not sure if we can rely on docs for best practices, but asyncio.CancelledError says "In almost all situations the exception must be re-raised." (not "never catch this").

Normally, I'd suggest you catch the gevent.Timout exception higher up. You are catching it somewhere, otherwise you'd not be able to timeout. You can then decide what to do. Drop the connection? Retry reading the response? It is an application level decision. The low level api should not be making that choice for you.

This is unfortunately not possible, since the application code has a RedisClient object but has no access to the underlying connection. In the case of a connection pool, it doesn't even know which connection is the "undefined state" one. (The exception is an arbitrary one, so it can't carry additional context to the app code.)

@kristjanvalur
Copy link
Contributor

If the application doesn't have the connection, someone else has. This code exists on the Connection object. The connection itself should not decide that it has failed if it encounters, e.g. a timeout. It is whoever is using the connection, in whatever manner it is doing that, who should. If you have a Redis which is picking arbitrary connections out of a pool, and returning them, it is this Redis which knows how it is using the connections and that a Timeout is actually fatal. How does the rest of your callstack look? There is almost certainly a better place, higher up, where it can better be decided that an exception invalidates this connection.

@Chronial
Copy link
Contributor

The connection itself should not decide that it has failed if it encounters, e.g. a timeout

Did you actually read what I wrote? The connection is the object that gets irreparably broken, which only the object itself can know. Why should the object not "decide" that?

This is not about timeouts. This is about low-level networking operations failing. In your case because you forced a timeout exception into them.

@kristjanvalur
Copy link
Contributor

kristjanvalur commented Dec 12, 2022

A forced timeout is not a failing network exception. It is resumable. Even under gevent. The connection is still in a perfectly valid state. The read_response() method can be re-invoked on the Connection after a timeout. The connection itself is not "irrepairably broken".

Recall, a Connection can be used for three different things, IIRC:

  1. It can be used in a connection pool, where one is picked for executing each command, and retrieving a response.
  2. It can be used directly, by an application, without a pool.
  3. It can be used by a PubSub instance. Again, a single connection, managed throughout.

In only the first case must it be closed and flushed if a full command-response is not performed. This is not something which the Connection itself should be deciding, it is whichever of the layers above, which is using the connection (i.e. ConnectionPool), should be doing.

Having said that, there are actually two different timeouts in operation in Redis. One is the "socket timeout" which is applied to each operation, and ususally considered in the same way as other socket errors, i.e., "something happened on the network and we should just consider it as fatal", and then there is the user "timeout" which is a single operation timeout, applied to each operation from the api, but for the user to decide how to respond to. This does complicate things. In the Async code I have cleaned up the handling of these two so that they don't interfere with each other and are processed in a cleaner way.

@Chronial
Copy link
Contributor

Chronial commented Dec 12, 2022

A forced timeout is not a failing network exception. It is resumable. Even under gevent. The connection is still in a perfectly valid state. The read_response() method can be re-invoked on the Connection after a timeout. The connection itself is not "irrepairably broken".

This is not correct. We are also not really talking about "timeouts" – the code you changed had nothing to do with timeouts. What you are claiming (with your words and your code change) is: "Any BaseException that is not an Exception thrown during socket operations leaves the socket and redis protocol in a valid state". Once again, that is not true.

Recall, a Connection can be used for three different things, IIRC: [...] In only the [connection pool] case must it be closed and flushed [...]

This is not correct. I do not see any connection pools in the code example I provided (3 years ago) in #1128, do you?

Having said that, there are actually two different timeouts in operation in Redis.

Just to be very clear here: You forcing exceptions into the middle of IO operations is not a timeout. It is simply an unexpected exception.

But this discussion doesn't really seem to go anywhere. Can you maybe instead just answer two simple questions:

  1. Why can you not just use the already existing get_message API for your timed wait requirements?
  2. How do you suggest code like my example in Connections are left in an invalid state on interrupt #1128 is supposed to be written after your change? Of course in reality, the brpop is deep with in a call tree that does many redis operations and many other things.

@kristjanvalur
Copy link
Contributor

There is a very aggressive tone in this thread which makes me disinclined to respond. However, please allow me to answer your comments in turn in a calm voice.

  1. Base exceptions have nothing to do with the state of the socket. They are usually signalling exceptions. No base exception you have mentioned thus far will leave the connection in an unknown state. One should normally only handle exceptions that are known, and leave others. Socket errors, for example, are handled.
  2. I'm sorry if I haven't read up on defects provided three years ago. But let's see... Yes, your Redis instance is using a ConnectionPool.
  3. Generally, exceptions should be handled at the layer where they are meaningful. the Connection object only has knowledge about certain exceptions which it knows that make the connection defunct, such as SocketError, et al.
  4. The changes in the Synchronous API part were made as a clean-up effort, to make the synchronous logic better reflect the Asynchronous and to adhere to good architecture.
  5. My suggestion here is that the Redis object be fixed to not resubmit connections in an incomplete state to the pool.

This regression should, of course, have been caught at the time of my PR, and probably would have been if unittest coverage had been adequate. There aren't many regression tests present. The changes were made in good faith.

If you would kindly provide me with a Traceback which demonstrates your error, I'll be happy to create a PR, with regression tests, as appropriate. The callstack would help guide me to make the correct design. Please put a breakpoint at the place where previously there would have been a "disconnect()" call.

We should, of course, ensure that the software works as intended, but it is important to do so in an architecturally sound way. The logic of a Redis instance should not be pushed down into the bowels of a Connection object.

Cheers!

@ikonst
Copy link
Author

ikonst commented Dec 12, 2022

@kristjanvalur, agreed. I also noticed the aggressive tone (e.g. "Your whole approach is broken", "this discussion doesn't really seem to go anywhere"). It understandably comes out of frustration over this issue being reintroduced twice (in 3.0 and in 4.2), but let's assume best intentions, good faith, and keep looking forward. Overall not a lot of people actually contribute to open source.

If you would kindly provide me with a Traceback which demonstrates your error, I'll be happy to create a PR, with regression tests, as appropriate. The callstack would help guide me to make the correct design. Please put a breakpoint at the place where previously there would have been a "disconnect()" call.

I'm adding a regression test in #2505. Let's get it merged before we address the problem.

You forcing exceptions into the middle of IO operations is not a timeout.

FWIW in event-loop async frameworks like asyncio or gevent, I'm not sure "IO timeouts" and "generic timeouts" are materially different.

Why can you not just use the already existing get_message API for your timed wait requirements?

If what I say above ☝️ is right and timeouts are timeouts, then fundamentally some operations are safe to interrupt and some aren't. get_message might be safe because it's "passive" (pull) rather than "active" (request-response) and whatever message you're interrupted while reading, you'd read later.

In effect, this reenforces @kristjanvalur's point that the knowledge on whether something is interruptible belongs to a higher layer.

@Chronial
Copy link
Contributor

@kristjanvalur

I'm sorry if I haven't read up on defects provided three years ago.

When you changed the exception handler, did it not seem strange to you that someone would handle a BaseException there? You are not supposed to "read up on defects provided three years ago". But I think it's sensible that if you change a piece of code that you don't fully understand, to have a look at its history first to at least try to understand why it's there.

But @ikonst is of course correct – me airing my grievances over this by participating in this discussion in an overly aggressive tone is not a productive approach. I apologize for that.

In case @ikonst's regression test in 2500 doesn't clarify the situation:

No base exception you have mentioned thus far will leave the connection in an unknown state.

The KeyboardInterrupt raised in the example in #1128 leaves the connection of r in an unknown state. This is not caused by the connection pool – passing single_connection_client=True to the Redis constructor doesn't change the outcome.

If you would kindly provide me with a Traceback which demonstrates your error, I'll be happy to create a PR, with regression tests, as appropriate. The callstack would help guide me to make the correct design. Please put a breakpoint at the place where previously there would have been a "disconnect()" call.

That's the traceback:

Traceback (most recent call last):
  File "<stdin>", line 3, in <module>
  File ".../lib/python3.9/site-packages/redis/commands/core.py", line 2512, in brpop
    return self.execute_command("BRPOP", *keys)
  File ".../lib/python3.9/site-packages/redis/client.py", line 1258, in execute_command
    return conn.retry.call_with_retry(
  File ".../lib/python3.9/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
  File ".../lib/python3.9/site-packages/redis/client.py", line 1259, in <lambda>
    lambda: self._send_command_parse_response(
  File ".../lib/python3.9/site-packages/redis/client.py", line 1235, in _send_command_parse_response
    return self.parse_response(conn, command_name, **options)
  File ".../lib/python3.9/site-packages/redis/client.py", line 1275, in parse_response
    response = connection.read_response()
  File ".../lib/python3.9/site-packages/redis/connection.py", line 812, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
  File ".../lib/python3.9/site-packages/redis/connection.py", line 318, in read_response
    raw = self._buffer.readline()
  File ".../lib/python3.9/site-packages/redis/connection.py", line 249, in readline
    self._read_from_socket()
  File ".../lib/python3.9/site-packages/redis/connection.py", line 192, in _read_from_socket
    data = self._sock.recv(socket_read_size)
KeyboardInterrupt

The disconnect call used to here.

A good argument could by made that it is actually not enough, as an exception injected into _send_command_parse_response before the call to read_response will also break the connection and was never handled. I guess that was never reported because the probability of that happening is way lower than it happening during the IO calls.

@ikonst
Copy link
Author

ikonst commented Dec 12, 2022

First, let's agree that all things being equal, we'd rather execute the least code possible to handle a BaseException. If a complex recovery is necessary, it's best to "earmark" that connection as needing extra care, rather than perform that extra care in the error handler. Overall, writing interrupt-safe code is hard.

The RESP protocol, while elegantly simple, doesn't seem to address cancellations. It's like the "touch-move" rule in chess.

When a send is interrupted:

  • If it returns -1, it means nothing was sent over the wire and we're in a defined state.
  • If it returns 0 < n < len(payload), then we're in undefined state. There's no way to signal a proverbial "^C" over RESP. For example, if we meant to send "EXISTS foo\r\n", we might've only sent "EXI", and next time the app code calls us, it might be "GET bar\r\n" and so Redis will effectively get "EXIGET bar\r\n". We could address it by having a write buffer and using the next opportunity to first flush the commands we couldn't send before, but that risks sending a command that'd be stale according to the app logic, so I'm not sure we'd be following the principle of least astonishment here.
  • If it returns len(payload), then we're in a defined state, but the client should be mindful to skip the response on the next operation. Generalized, a client (who sent the command) should keep a counter of expected responses for that connection, and skip all but the last one.

(This is all made complicated by the fact we use sendall with a pipeline of commands, but let's solve for the simple case before we even go there.)

If we chose to add an 'expected responses' counter, it'd have to be per-connection rather than a single counter on Redis itself. Architecturally it'd be probably easier to add a property to Connection than keep a separate mapping in Redis. But alas I don't see it addressing the 0 < send("command\r\n") < len("command\r\n") scenario, and the low overhead of reconnecting probably trumps the complexity.

--

As for PubSub, the situation there is a bit better:

  • The PubSub RESP is designed in such a way that the responses "echo" the request, so you can tell if the response pertains to your request or not. This is obviously required since with PubSub you'd be getting pushed messages in addition to responses.
  • For the same reason, get_message is probably interruptible, and I could see @kristjanvalur 's surprise at getting disconnected even though the connection is in defined state.
  • I'm still not seeing how 0 < send("command\r\n") < len("command\r\n") is addressed there.

--

P.S. I went through this with the assumption of interruption points being I/O & yields (in gevent) or awaits (in asyncio). I'm sure a KeyboardInterrupt that can happen at any opcode boundary brings surprises of its own.

@Chronial
Copy link
Contributor

What I don't understand about all of this is why there should be special handling to make get_message more interruptible. It is never safely interruptible: You always risk dropping messages. Since we are dropping messages anyways, we could also just disconnect the connection and keep things simple.

For the use-case of not blocking indefinitely on get_message, it already takes a timeout parameter that respects the timeout and doesn't loose messages.

@ikonst
Copy link
Author

ikonst commented Dec 12, 2022

What I don't understand about all of this is why there should be special handling to make get_message more interruptible. It is never safely interruptible: You always risk dropping messages.

IMO get_message is safely interruptible (in the gevent/asyncio case, not sure about KeyboardInterrupt). It reads messages through the parser, which has a read buffer. Once bytes are read from the socket, they're put on the buffer: no bytes are lost, so you'd never miss any messages.

For the use-case of not blocking indefinitely on get_message, it already takes a timeout parameter that respects the timeout and doesn't loose messages.

It's true that you can pass timeout to get_message, but it's not inconceivable that a timeout would occur as part of a broader desire to restrict the execution of a particular block of code. (Internally, all event-loop timeouts are the same.)

with Timeout(worker_timeout_sec):
  do_some_work()
  ...
  message = pubsub.get_message()
  ..

Saying that one must use timeout= just because it's been made available is a bit of "you're holding it wrong" :)

@Chronial
Copy link
Contributor

Chronial commented Dec 12, 2022

IMO get_message is safely interruptible (in the gevent/asyncio case, not sure about KeyboardInterrupt). It reads messages through the parser, which has a read buffer. Once bytes are read from the socket, they're put on the buffer: no bytes are lost, so you'd never miss any messages.

Are you sure about that? Both get_message and listen of the asyncio implementation have a second await after the data is read from any buffers – these can raise the CancelledError and the message would be lost. Or am I missing something?

For gevent I had too look deeper. Having never used it, I assume gevent will only raise the Timeout on IO-Operations? Unfortunately, receiving of a single message is split across multiple IO operations: The wire format looks like this:

*3
$7
message
$6
second
$5
Hello

This is read in a loop, with multiple IO operations, which can thus be interrupted in the middle. So it's actually worse than I thought: It will not only drop the message, but actually also corrupt the connection. The asyncio implementation is the same.

@kristjanvalur
Copy link
Contributor

You say "corrupt the connection" a lot. This is only true in the particular use case, seen in the "Send command, read response" pattern implemented by the Redis class at that layer. In a sense, it isn't the connection which is corrupt, it is simply unusable in the context of this particular API, because the api itself (send command, receive response) is not resumable. There are other patterns possible in using a Connection object and in those, the connection isn't corrupt even if it is interrupted during IO.

Much of the work I recently did with Async IO was in ensuring that all of the code were interruptible, so that timeouts could be safely handled at the highest possible level, and without littering lower level code with various explicit timeout handlers. This is the way things are usually done with Async, and incidentally, also how Gevent and Stackless Python do things. I am one of the core developers of Stackless, and very familiar with interruptible IO.

Now I fully accept that in the process I may have caused things to break. Such is the way of software development. And I'm happy to try to fix them. But I'm sure you understand if I prefer to fix them in "a better" way if possible, one which is friendly to all use cases and doesn't result in unnecessary coupling between layers.

@Chronial
Copy link
Contributor

Chronial commented Dec 12, 2022

Corrupt is not the right word – the correct word is "desynchronized" and once the connection (also more precisely: the protocol) is in that state it is not usable by anyone anymore. There are protocols that allow resynchronization, a popular example is utf-8, but the redis protocol does not have that feature.

Let's assume your usage example with pubsub:

Imagine someone else sends the message hi\r\nthere\r\n+how\r\nare\r\nyou on channel "channel1". The server will put this data into our socket: *3\r\n$7\r\nmessage\r\n$8\r\nchannel1\r\n$33\r\nhi\r\nthere\r\n+how\r\nare\r\nyou\r\n. The redis client now calls readline on the stream. This will retrieve the *3, next it does a readline to get the $7. This is now followed by a readexactly(7 + 2) to get message etc. But after reading $33, we get interrupted and the function is exited prematurely via exception.

The stream now contains hi\r\nthere\r\n+how\r\nare\r\nyou\r\n. On the next get_message or listen call on the PubSub object, we now do readline() on the stream. This gets us hi. Since the first byte of that is h, redis-py now raises InvalidResponse. The same will happen again for the next get_messagethere is also invalid. The next call shows how this can get much worse, because the next line in the buffer +how looks almost like a valid redis response. This will raise a ValueError in the attempt to parse how as an int. Depending on what's in the buffer, this can get a lot worse.

Do you not agree that this a fatal problem and should never be allowed to happen?

@ikonst
Copy link
Author

ikonst commented Dec 12, 2022

I thought that at this stage we're all in agreement about the nature of the problems:

  1. Sending incomplete commands.
  2. Reading incomplete responses.

... and yes, about (2) I stand corrected. Looking at the code, while there is a "read buffer", there are multiple opportunities to drop data. 😔 A safer implementation would perhaps take a "cursor" into the SocketBuffer and read data w/o mutating the SocketBuffer.bytes_read, only advancing it once an entire response was read.

@ikonst
Copy link
Author

ikonst commented Dec 12, 2022

p.s. re get_message, unless I'm missing something, then passing timeout= is simply wrapping *Parser.read_response in an async timeout, i.e. the timeout can occur at any place a cancellation can happen, since internally it does nothing more than asyncio.Handle.cancel():

if read_timeout is not None:
async with async_timeout.timeout(read_timeout):
response = await self._parser.read_response(
disable_decoding=disable_decoding
)

So whatever safety get_message lacks w.r.t gevent exceptions or asyncio cancellations, it also lacks if you're "correctly" pass timeout to it. You can't win.

@Chronial
Copy link
Contributor

Chronial commented Dec 12, 2022

Note that the lines you quoted were recently added by @kristjanvalur, but this was already broken before – just somewhere else.

But the synchronous implementation (and thus gevent) does not suffer from this and is implemented correctly. It first waits up to timeout for available data and then reads the message without interruption.

The same could probably be done in the async implementation. This would also get rid of the whole async-timeout dependency.

@ikonst
Copy link
Author

ikonst commented Dec 12, 2022

But the synchronous implementation (and thus gevent) does not suffer from this and is implemented correctly. It first waits up to timeout for available data and then reads the message without interruption.

Hmm, it waits up to timeout for some available data, then proceeds to read the entire message, possibly blocking, but probably not for long if at all, since redis is fast?

That is, if you schedule a gevent timeout, it could still trip one of the recvs made inside the parser, but if you abstain from this "risky approach" and pass timeout then you should be safe, even if maybe a bit blocking?

@Chronial
Copy link
Contributor

Hmm, it waits up to timeout for some available data, then proceeds to read the entire message, possibly blocking, but probably not for long if at all, since redis is fast?

Yeah, but in general, calling that API I would not expect it return after timeout, but simply not to wait any longer than timeout – which is exactly what it does.

That is, if you schedule a gevent timeout, it could still trip one of the recvs made inside the parser, but if you abstain from this "risky approach" and pass timeout then you should be safe, even if maybe a bit blocking?

Exactly.

@ikonst
Copy link
Author

ikonst commented Dec 12, 2022

... which is exactly what it does.

In general, yes, I agree. A timeout sensibly applies to the recv but not to the parsing. (In this case, it doesn't quite apply, since there are still recvs in the parser.)

@kristjanvalur
Copy link
Contributor

Re: your discussion about timeouts in the synchronous code. Yes, this is the reason why I removed the "can_read()" implementation from the Async code. There is no can_read() anymore, only direct recv calls which then can timeout. Timeouts are correctly passed up through the call stack and handled at the site where a timeout is requested. No pre-check for readibility followed by a subsequent read.
can_read() is unfortunatly needed in blocking code. But since its presence means that not all recv() calls are covered by TimeoutError handlers, it will not necessarily play nice with gevent code.

@Chronial
Copy link
Contributor

Chronial commented Dec 13, 2022

In general, yes, I agree. A timeout sensibly applies to the recv but not to the parsing. (In this case, it doesn't quite apply, since there are still recvs in the parser.)

Think about this: If a large message was send a minute ago and you call get_message(timeout=0.01) – would you not expect to receive the message even if transferring it takes longer than 10 ms? IMO the timeout should strictly apply only to the idle time waiting for new messages.

@ikonst
Copy link
Author

ikonst commented Dec 13, 2022

@Chronial Fair enough, it does make sense. I didn't use Redis for pub-sub, but I used SQS and its WaitTimeSeconds works more like how you describe IIUC.

In my case, I'm scheduling the timeout far above in the call-stack, in some generalized "framework" code where I don't know about Redis or anything else. It is to implement deadline propagation. The timeout is "wrapping" an entire HTTP request handler, and imposes the deadline reported by the caller.

@Chronial
Copy link
Contributor

In my case, I'm scheduling the timeout far above in the call-stack, in some generalized "framework" code where I don't know about Redis or anything else. It is to implement deadline propagation. The timeout is "wrapping" an entire HTTP request handler, and imposes the deadline reported by the caller.

That's similar to what we do, too (as part of redis-tasks), and is IMO a sensible reason to force timeout exceptions deep into callstacks. I think redis-py should support that and be well-behaved in such a context. But I'm not convinced that it makes sense to invest effort and code complexity to try and keep a forcefully interrupted connection alive in a usable state. The last years seem to show that disconnecting such connections is good enough.

@ikonst
Copy link
Author

ikonst commented Dec 13, 2022

At least in the stateless (not-pub-sub) connection case, clearly so (and anyhow there's no safe way to do it given that you cannot send a command atomically or abort a command that's half-sent).

@kristjanvalur
Copy link
Contributor

If get_message is broken, it needs fixing. And without the "can_read" hack. With async, the message reader needs to maintain state, such as it does with Hiredis.

@kristjanvalur
Copy link
Contributor

So, looking at the code, I concur, that the Python parser is non-restartable. If interrupted while parsing a message, it will not be able to retry.
I think it is possible to fix this, however, in a way similar to how the Hiredis parser works.

@Chronial
Copy link
Contributor

Chronial commented Dec 13, 2022

If get_message is broken, it needs fixing. And without the "can_read" hack.

can_read is not a hack, but the correct way to implement get_message. The timeout should only affect the waiting for a new message, never the receiving or parsing of the message.

I think it is possible to fix this, however, in a way similar to how the Hiredis parser works.

What makes you sure that the Hiredis parser is not affected by this? You seem to be assuming that interrupted async socket reads have a guarantee against data-loss – do you have any source for this?

My expectation would be that the cancellation exception can be raised at any await and any return value that would have been passed through that await will disappear.

@ikonst
Copy link
Author

ikonst commented Dec 13, 2022

@Chronial Implemented correctly, an exception should only be raised at an await if it's still in a waiting state, precisely for the reason you're mentioning. I can't find any documentation in asyncio to indicate whether it's guaranteed to work like this.

That's how the underlying send syscall works: if it gets interrupted by a signal, it'd return the bytes read so far rather than -1 as not to lose data (you can probably check errno == -EINTR ?).

@kristjanvalur
Copy link
Contributor

Interrupted async socket reads are guaranteed against data loss.
The interrupt never happens at the system call boundary. The IO is non-blocking. The interruption happens in the event loop where a waiting task is awoken, without doing any system IO. Trust me on this, I´ve been doing asynchronous non-blocking socket IO in python for almost twenty years now :)
When doing timeout code with async IO, be that asyncio, stackless python, gevent, there should be no need to perform can_read like hacks. all of them can work with event-loop driven timeout handlers.

I'll gladly admit to being slightly overzealous when simplifying the PythonParser. Being so used to Hiredis parser I failed to notice that it is non-restartable in case of an interrupt. But fear not, I'll be fixing that presently. All is good.

@Chronial
Copy link
Contributor

... there should be no need to perform can_read like hacks. all of them can work with event-loop driven timeout handlers.

The correct API for get_message is still that the timeout should only affect the waiting period, not the receiving, as I explained above. If you are distracted by the fact that the parameter is called timeout, think of it as wait_timeout or wait_for instead. I don't think actually renaming it is worth it, since that would be a breaking change.

I do not see how you could possibly implement that API with the can_read function. Am I missing something?

@kristjanvalur
Copy link
Contributor

kristjanvalur commented Dec 14, 2022

When you say "the correct api", could you back that up somehow? What convention is this that a timeout, applied to an operation, should affect only some internal part of it and not the whole operation?

The business with the "can_read()" is IMHO completely broken because using that makes no guarantees that a message can be retrieved within the timeout period. It is a necessary evil for synchronous code to help make at least some expectation of sanity without heavy bookkeeping of timeout "remaining" for each individual timeout call.

The difference between "waiting" and "reading" is essentially none. the whole message appears as a single entity. There is no utility in trying to distinguish between a "read" time and a "wait" time. If you process an operation within a timout, you expect it to finish within that time.

@Chronial
Copy link
Contributor

When you say "the correct api", could you back that up somehow? What convention is this that a timeout, applied to an operation, should affect only some internal part of it and not the whole operation?

get_message does not need a timeout parameter – if you want to timeout the operation, just wrap the call in a timeout. As I said above, the parameter is not perfectly named. @ikonst gave an example for this convention.

The difference between "waiting" and "reading" is essentially none. the whole message appears as a single entity. There is no utility in trying to distinguish between a "read" time and a "wait" time.

I feel like I always need to refer back to my comments multiple times for you to actually read them. That's not nice.

@ikonst
Copy link
Author

ikonst commented Dec 14, 2022

In the "happy case" the difference between "wait_for" and "timeout" should be negligible, so we should pick what's easier to implement. If can_read is a thorn in our side, IMO it's OK to get rid of it.

In the pathological case, where you reach the 'can read' point within 10ms, but then 'read entire message' phase stalls for 10 minutes... I'd personally think the caller would be happier with the "timeout" behavior rather than the "wait_for" behavior, e.g. if the motivation was "I need to ping another server once in a while, so don't hold me up too long".

--

On a side note, I've mentioned AWS SQS as a precedent, where you provide a WaitTimeSeconds parameter to the server. It's hard to tell how it's implemented on their server-side, but their docs give us a hint:

Reduce empty responses by allowing Amazon SQS to wait until a message is available in a queue before sending a response. Unless the connection times out, the response to the ReceiveMessage request contains at least one of the available messages, up to the maximum number of messages specified in the ReceiveMessage action. In rare cases, you might receive empty responses even when a queue still contains messages, especially if you specify a low value for the ReceiveMessageWaitTimeSeconds parameter.

This suggests to me that internally they limit the execution time of waiting and message retrieval. Of course it shouldn't matter that much to us what AWS chose to do in an unrelated tool...

@kristjanvalur
Copy link
Contributor

Okay, I believe I have found a good solution. Please see #2506
It basically reverts the change to allow BaseExceptions through, and automatically disconnects the Connection on any error, unless a `disconnect_on_error=False argument is provided.

I have also provided #2510 and #2512, two alternative ways to make PythonParser restartable, both for sync and async code, which should ensure correct timeout handling in async code where we want to be able to resume (PubSub), and making it possible to do the same in sync code. Note that HiredisParser was already restartable.

@kristjanvalur
Copy link
Contributor

kristjanvalur commented Dec 15, 2022

In general, yes, I agree. A timeout sensibly applies to the recv but not to the parsing. (In this case, it doesn't quite apply, since there are still recvs in the parser.)

Think about this: If a large message was send a minute ago and you call get_message(timeout=0.01) – would you not expect to receive the message even if transferring it takes longer than 10 ms? IMO the timeout should strictly apply only to the idle time waiting for new messages.

Sorry for not responding to all the comments. I find it stressful to keep up with a conversation which has so much negative energy in it. But I'm doing my best. To answer your question:

No, I emphatically would not. I would expect the call to wait 10ms and then return. An API which accepts a timeout and then chooses to ignore the timeout is not very useful. For all we know, the whole message may never arrive, because the connection was cut short in the middle of transmission. The fact that the current sync api behaves in this way should, IMHO, be considered a wart which we need not duplicate if we can avoid it.

Please note that I am talking about an operation timeout. You, providing a timeout to an operation explicitly. This should be considered differently to the socket timeout which is sometimes applied as a default to all socket operations and is a kind of emergency brake. when the "socket timeout" triggers, one should consider the connection defunct and discard it. But an "operation timeout" only means that an operation didn't succeed in time, and should be retried.

I guess that we could improve the "socket timeout" to just apply to individual read operations, and be fatal, rather than affect the entire message parsing. That way it would not trip for each read operation (in your long transfer example) even if if would trip for the entire transfer.

Would you like me to add a PR where we move the fatal "socket timeout" down to individual operations and thus deal with a trickling operation? I can do that but would prefer to do that once #2510 or #2512 are merged or rejected. Or, I could piggyback this on them.

@Andrew-Chen-Wang
Copy link
Contributor

For future readers who encounters an error of can_read being missing, simply install hiredis and redis-py will automatically pick up on it.

@kristjanvalur
Copy link
Contributor

For future readers who encounters an error of can_read being missing, simply install hiredis and redis-py will automatically pick up on it.

I don't understand, can_read was removed from both PythonParser and HiredisParser. It was an internal method, not really part of the API and so one really shouldn't encounter it as missing, unless one was doing something strange.

@Andrew-Chen-Wang
Copy link
Contributor

Yes they're both removed; however, PythonParser raises this exception. Back when we were maintaining aioredis, we encountered several issues with the PythonParser and frequently abandoned it.

Feel free to take a look here for the impl that led to this error (to reproduce, simply uninstall hiredis when running pytest): Andrew-Chen-Wang/django-async-redis#5

@kristjanvalur
Copy link
Contributor

Feel free to take a look here for the impl that led to this error (to reproduce, simply uninstall hiredis when running pytest): Andrew-Chen-Wang/django-async-redis#5

Could you furnish me with a call stack or simple repro steps for the problem? I'm not a Django guy. There is no mention of can_read() in any of redis/asyncio package anywhere, neither is it being used or defined.

@Andrew-Chen-Wang
Copy link
Contributor

Andrew-Chen-Wang commented Jan 6, 2023

Feel free to take a look here for the impl that led to this error (to reproduce, simply uninstall hiredis when running pytest): Andrew-Chen-Wang/django-async-redis#5

Could you furnish me with a call stack or simple repro steps for the problem? I'm not a Django guy. There is no mention of can_read() in any of redis/asyncio package anywhere, neither is it being used or defined.

stack is too large. Instead, here are repro steps, assuming Redis is running on localhost:

  1. git clone https://github.com/Andrew-Chen-Wang/django-async-redis.git && git checkout new-redis-package
  2. virtualenv venv && source venv/bin/activate && pip install -r requirements.txt
  3. pip uninstall hiredis in order to test PythonParser
  4. pytest tests/

@kristjanvalur
Copy link
Contributor

Ran the tests and didn't see the problem. 4.4.1 was released yesterday, so maybe that is the reason. 4.4.0 shouldn't have had this issue either.
There are two other problems with the testsuite:

  • the redis cache isn't cleared between runs so some of the tests checking for keys are seeing old keys. Probably simplest to make those tests that rely on it start by clearing the keyspace.
  • A bunch of tests get the "got Future attached to a different loop" error. This happens with connection caching in redis, since you may get a conenction from a previous test run. You will want to either clear the connection cache, or make your event loop (pytest-asynco) context last the entire session:
@pytest.fixture(scope="session")
def event_loop():
    policy = asyncio.get_event_loop_policy()
    loop = policy.new_event_loop()
    yield loop
    loop.close()

Cheers!

@Andrew-Chen-Wang
Copy link
Contributor

Thanks for taking a look at the library @kristjanvalur and yes those were definitely the problems in general! Appreciate your time and effort:)

I'm also unable to reproduce the steps, before your suggestions, that led me to can_read error initially. Will come back with call stack if I'm able to reproduce it again.

@Chronial
Copy link
Contributor

Chronial commented May 8, 2023

This affected ChatGPT?

#2665 seems to be the async version of this bug report.

@ikonst
Copy link
Author

ikonst commented Jul 17, 2023

@Chronial Definitely an attention-grabbing post mortem these days :D

When it happened back in our servers at Lyft's Bikes & Scooters, it was very intermittent — it took maybe 1 day for a single python node to get into a broken state (for a single connection in a pool, so even the same node didn't always exhibit a problem), and often someone would term that node to "fix" it, and move on.

Then one day I took a deep dive, pinned py-redis < 4.4 and added a local regression test to prevent a kind soul from upgrading blindly, but not even a post mortem...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants