Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't call rb_str_set_len while released the GVL. #88

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
290 changes: 174 additions & 116 deletions ext/zlib/zlib.c
Original file line number Diff line number Diff line change
Expand Up @@ -674,9 +674,7 @@ zstream_expand_buffer(struct zstream *z)
rb_obj_reveal(z->buf, rb_cString);
}

rb_mutex_unlock(z->mutex);
rb_protect(rb_yield, z->buf, &state);
rb_mutex_lock(z->mutex);
rb_protect(rb_yield, z->buf, &state);

if (ZSTREAM_REUSE_BUFFER_P(z)) {
rb_str_modify(z->buf);
Expand Down Expand Up @@ -720,15 +718,14 @@ zstream_expand_buffer_into(struct zstream *z, unsigned long size)
}
}

static void *
zstream_expand_buffer_protect(void *ptr)
static int
zstream_expand_buffer_protect(struct zstream *z)
{
struct zstream *z = (struct zstream *)ptr;
int state = 0;

rb_protect((VALUE (*)(VALUE))zstream_expand_buffer, (VALUE)z, &state);

return (void *)(VALUE)state;
return state;
}

static int
Expand Down Expand Up @@ -1023,57 +1020,14 @@ zstream_ensure_end(VALUE v)
}

static void *
zstream_run_func(void *ptr)
zstream_run_once(void *_arguments)
{
struct zstream_run_args *args = (struct zstream_run_args *)ptr;
int err, state, flush = args->flush;
struct zstream *z = args->z;
uInt n;

err = Z_OK;
while (!args->interrupt) {
n = z->stream.avail_out;
err = z->func->run(&z->stream, flush);
rb_str_set_len(z->buf, ZSTREAM_BUF_FILLED(z) + (n - z->stream.avail_out));

if (err == Z_STREAM_END) {
z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
z->flags |= ZSTREAM_FLAG_FINISHED;
break;
}

if (err != Z_OK && err != Z_BUF_ERROR)
break;

if (z->stream.avail_out > 0) {
z->flags |= ZSTREAM_FLAG_IN_STREAM;
break;
}

if (z->stream.avail_in == 0 && z->func == &inflate_funcs) {
/* break here because inflate() return Z_BUF_ERROR when avail_in == 0. */
/* but deflate() could be called with avail_in == 0 (there's hidden buffer
in zstream->state) */
z->flags |= ZSTREAM_FLAG_IN_STREAM;
break;
}

if (args->stream_output) {
state = (int)(VALUE)rb_thread_call_with_gvl(zstream_expand_buffer_protect,
(void *)z);
}
else {
state = zstream_expand_buffer_non_stream(z);
}
struct zstream_run_args *arguments = (struct zstream_run_args *)_arguments;
struct zstream *z = arguments->z;

if (state) {
err = Z_OK; /* buffer expanded but stream processing was stopped */
args->jump_state = state;
break;
}
}
uintptr_t error = z->func->run(&z->stream, arguments->flush);

return (void *)(VALUE)err;
return (void*)error;
}

/*
Expand All @@ -1088,6 +1042,86 @@ zstream_unblock_func(void *ptr)
args->interrupt = 1;
}

static VALUE
zstream_run_once_begin(VALUE _arguments)
{
struct zstream_run_args *arguments = (struct zstream_run_args *)_arguments;
struct zstream *z = arguments->z;

rb_str_locktmp(z->buf);

#ifndef RB_NOGVL_UBF_ASYNC_SAFE
return (VALUE)rb_thread_call_without_gvl(zstream_run_once, (void *)arguments, zstream_unblock_func, (void *)arguments);
#else
return (VALUE)rb_nogvl(zstream_run_once, (void *)arguments, zstream_unblock_func, (void *)arguments, RB_NOGVL_UBF_ASYNC_SAFE);
#endif
}

static VALUE
zstream_run_once_ensure(VALUE _arguments)
{
struct zstream_run_args *arguments = (struct zstream_run_args *)_arguments;
struct zstream *z = arguments->z;

rb_str_unlocktmp(z->buf);

return Qnil;
}

static int
zstream_run_func(struct zstream_run_args *args)
{
struct zstream *z = args->z;
int state;
uInt n;

int err = Z_OK;
while (!args->interrupt) {
n = z->stream.avail_out;

err = (int)(VALUE)rb_ensure(zstream_run_once_begin, (VALUE)args, zstream_run_once_ensure, (VALUE)args);

rb_str_set_len(z->buf, ZSTREAM_BUF_FILLED(z) + (n - z->stream.avail_out));

if (err == Z_STREAM_END) {
z->flags &= ~ZSTREAM_FLAG_IN_STREAM;
z->flags |= ZSTREAM_FLAG_FINISHED;
break;
}

if (err != Z_OK && err != Z_BUF_ERROR)
break;

if (z->stream.avail_out > 0) {
z->flags |= ZSTREAM_FLAG_IN_STREAM;
break;
}

if (z->stream.avail_in == 0 && z->func == &inflate_funcs) {
/* break here because inflate() return Z_BUF_ERROR when avail_in == 0. */
/* but deflate() could be called with avail_in == 0 (there's hidden buffer
in zstream->state) */
z->flags |= ZSTREAM_FLAG_IN_STREAM;
break;
}

if (args->stream_output) {
state = zstream_expand_buffer_protect(z);
}
else {
state = zstream_expand_buffer_non_stream(z);
}

if (state) {
err = Z_OK; /* buffer expanded but stream processing was stopped */
args->jump_state = state;
break;
}
}

return err;
}

static VALUE
zstream_run_try(VALUE value_arg)
{
Expand Down Expand Up @@ -1126,14 +1160,7 @@ zstream_run_try(VALUE value_arg)
}

loop:
#ifndef RB_NOGVL_UBF_ASYNC_SAFE
err = (int)(VALUE)rb_thread_call_without_gvl(zstream_run_func, (void *)args,
zstream_unblock_func, (void *)args);
#else
err = (int)(VALUE)rb_nogvl(zstream_run_func, (void *)args,
zstream_unblock_func, (void *)args,
RB_NOGVL_UBF_ASYNC_SAFE);
#endif
err = zstream_run_func(args);

/* retry if no exception is thrown */
if (err == Z_OK && args->interrupt) {
Expand Down Expand Up @@ -1184,7 +1211,6 @@ zstream_run_ensure(VALUE value_arg)

/* Remove ZSTREAM_IN_PROGRESS flag to signal that this zstream is not in use. */
z->flags &= ~ZSTREAM_IN_PROGRESS;
rb_mutex_unlock(z->mutex);

return Qnil;
}
Expand All @@ -1201,7 +1227,7 @@ zstream_run(struct zstream *z, Bytef *src, long len, int flush)
.jump_state = 0,
.stream_output = !ZSTREAM_IS_GZFILE(z) && rb_block_given_p(),
};
rb_mutex_lock(z->mutex);

rb_ensure(zstream_run_try, (VALUE)&args, zstream_run_ensure, (VALUE)&args);
if (args.jump_state)
rb_jump_tag(args.jump_state);
Expand Down Expand Up @@ -1770,6 +1796,22 @@ do_deflate(struct zstream *z, VALUE src, int flush)
}
}

struct rb_zlib_deflate_arguments {
struct zstream *z;
VALUE src;
int flush;
};

static VALUE
rb_deflate_deflate_body(VALUE args)
{
struct rb_zlib_deflate_arguments *arguments = (struct rb_zlib_deflate_arguments *)args;

do_deflate(arguments->z, arguments->src, arguments->flush);

return zstream_detach_buffer(arguments->z);
}

/*
* Document-method: Zlib::Deflate#deflate
*
Expand Down Expand Up @@ -1801,11 +1843,10 @@ rb_deflate_deflate(int argc, VALUE *argv, VALUE obj)
{
struct zstream *z = get_zstream(obj);
VALUE src, flush;

rb_scan_args(argc, argv, "11", &src, &flush);
do_deflate(z, src, ARG_FLUSH(flush));
struct rb_zlib_deflate_arguments arguments = {z, src, ARG_FLUSH(flush)};

return zstream_detach_buffer(z);
return rb_mutex_synchronize(z->mutex, rb_deflate_deflate_body, (VALUE)&arguments);
}

/*
Expand Down Expand Up @@ -2101,56 +2142,19 @@ rb_inflate_add_dictionary(VALUE obj, VALUE dictionary)
return obj;
}

/*
* Document-method: Zlib::Inflate#inflate
*
* call-seq:
* inflate(deflate_string, buffer: nil) -> String
* inflate(deflate_string, buffer: nil) { |chunk| ... } -> nil
*
* Inputs +deflate_string+ into the inflate stream and returns the output from
* the stream. Calling this method, both the input and the output buffer of
* the stream are flushed. If string is +nil+, this method finishes the
* stream, just like Zlib::ZStream#finish.
*
* If a block is given consecutive inflated chunks from the +deflate_string+
* are yielded to the block and +nil+ is returned.
*
* If a :buffer keyword argument is given and not nil:
*
* * The :buffer keyword should be a String, and will used as the output buffer.
* Using this option can reuse the memory required during inflation.
* * When not passing a block, the return value will be the same object as the
* :buffer keyword argument.
* * When passing a block, the yielded chunks will be the same value as the
* :buffer keyword argument.
*
* Raises a Zlib::NeedDict exception if a preset dictionary is needed to
* decompress. Set the dictionary by Zlib::Inflate#set_dictionary and then
* call this method again with an empty string to flush the stream:
*
* inflater = Zlib::Inflate.new
*
* begin
* out = inflater.inflate compressed
* rescue Zlib::NeedDict
* # ensure the dictionary matches the stream's required dictionary
* raise unless inflater.adler == Zlib.adler32(dictionary)
*
* inflater.set_dictionary dictionary
* inflater.inflate ''
* end
*
* # ...
*
* inflater.close
*
* See also Zlib::Inflate.new
*/
struct rb_zlib_inflate_arguments {
struct zstream *z;
int argc;
VALUE *argv;
};

static VALUE
rb_inflate_inflate(int argc, VALUE* argv, VALUE obj)
rb_inflate_inflate_body(VALUE _arguments)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My preference would be to only prefix with underscore if the argument is not used at all. But I'll leave that up to you.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to leave it as I've done it. I understand your position. For me, _arguments is immediately converted to a known type. _arguments should not be used in the function except for this one conversion.

{
struct zstream *z = get_zstream(obj);
struct rb_zlib_inflate_arguments *arguments = (struct rb_zlib_inflate_arguments*)_arguments;
struct zstream *z = arguments->z;
int argc = arguments->argc;
VALUE *argv = arguments->argv;
VALUE dst, src, opts, buffer = Qnil;

if (OPTHASH_GIVEN_P(opts)) {
Expand Down Expand Up @@ -2205,6 +2209,60 @@ rb_inflate_inflate(int argc, VALUE* argv, VALUE obj)
return dst;
}

/*
* Document-method: Zlib::Inflate#inflate
*
* call-seq:
* inflate(deflate_string, buffer: nil) -> String
* inflate(deflate_string, buffer: nil) { |chunk| ... } -> nil
*
* Inputs +deflate_string+ into the inflate stream and returns the output from
* the stream. Calling this method, both the input and the output buffer of
* the stream are flushed. If string is +nil+, this method finishes the
* stream, just like Zlib::ZStream#finish.
*
* If a block is given consecutive inflated chunks from the +deflate_string+
* are yielded to the block and +nil+ is returned.
*
* If a :buffer keyword argument is given and not nil:
*
* * The :buffer keyword should be a String, and will used as the output buffer.
* Using this option can reuse the memory required during inflation.
* * When not passing a block, the return value will be the same object as the
* :buffer keyword argument.
* * When passing a block, the yielded chunks will be the same value as the
* :buffer keyword argument.
*
* Raises a Zlib::NeedDict exception if a preset dictionary is needed to
* decompress. Set the dictionary by Zlib::Inflate#set_dictionary and then
* call this method again with an empty string to flush the stream:
*
* inflater = Zlib::Inflate.new
*
* begin
* out = inflater.inflate compressed
* rescue Zlib::NeedDict
* # ensure the dictionary matches the stream's required dictionary
* raise unless inflater.adler == Zlib.adler32(dictionary)
*
* inflater.set_dictionary dictionary
* inflater.inflate ''
* end
*
* # ...
*
* inflater.close
*
* See also Zlib::Inflate.new
*/
static VALUE
rb_inflate_inflate(int argc, VALUE* argv, VALUE obj)
{
struct zstream *z = get_zstream(obj);
struct rb_zlib_inflate_arguments arguments = {z, argc, argv};
return rb_mutex_synchronize(z->mutex, rb_inflate_inflate_body, (VALUE)&arguments);
}

/*
* call-seq: << string
*
Expand Down
4 changes: 2 additions & 2 deletions test/zlib/test_zlib.rb
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ def test_recursive_deflate
zd = Zlib::Deflate.new

s = SecureRandom.random_bytes(1024**2)
assert_raise(Zlib::InProgressError) do
assert_raise(ThreadError) do
zd.deflate(s) do
zd.deflate(s)
end
Expand All @@ -563,7 +563,7 @@ def test_recursive_inflate

s = Zlib.deflate(SecureRandom.random_bytes(1024**2))

assert_raise(Zlib::InProgressError) do
assert_raise(ThreadError) do
zi.inflate(s) do
zi.inflate(s)
end
Expand Down