Skip to content

Commit

Permalink
Merge branch 'ex13' into ex14
Browse files Browse the repository at this point in the history
  • Loading branch information
Rconybea committed Feb 12, 2024
2 parents 1f22460 + d415ce5 commit 0820712
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 48 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2252,15 +2252,15 @@ inflate_zstream::inflate_chunk2() {
/* fallthru */
case Z_DATA_ERROR:
case Z_MEM_ERROR:
throw std::runtime_error(tostr("zstreambuf::inflate_chunk: error [", err, "] from zlib inflate"));
throw std::runtime_error(tostr("inflate_zstream::inflate_chunk: error [", err, "] from zlib inflate"));
}
uint8_t * z_post = zstream_.next_in;
uint8_t * uc_post = zstream_.next_out;
return pair<span_type, span_type>(span_type(z_pre, z_post),
span_type(uc_pre, uc_post));
} /*inflate_chunk2*/
}
```

5. class `deflate_zstream`
Expand Down Expand Up @@ -2385,7 +2385,7 @@ deflate_zstream::deflate_chunk2(bool final_flag) {
(final_flag ? Z_FINISH : 0) /*flush*/);
if (err == Z_STREAM_ERROR)
throw runtime_error("basic_zstreambuf::sync: impossible zlib deflate returned Z_STREAM_ERROR");
throw runtime_error("deflate_zstream::sync: impossible zlib deflate returned Z_STREAM_ERROR");
uint8_t * uc_post = zstream_.next_in;
uint8_t * z_post = zstream_.next_out;
Expand Down
4 changes: 2 additions & 2 deletions compression/inflate_zstream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ inflate_zstream::inflate_chunk () {
/* fallthru */
case Z_DATA_ERROR:
case Z_MEM_ERROR:
throw std::runtime_error(tostr("zstreambuf::inflate_chunk: error [", err, "] from zlib inflate"));
throw std::runtime_error(tostr("inflate_zstream::inflate_chunk: error [", err, "] from zlib inflate"));
}

uint8_t * z_post = zstream_.next_in;
uint8_t * uc_post = zstream_.next_out;

return pair<span_type, span_type>(span_type(z_pre, z_post),
span_type(uc_pre, uc_post));
} /*inflate_chunk2*/
}
52 changes: 49 additions & 3 deletions zstream/include/zstream/zstream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,31 @@

#include "zstreambuf.hpp"
#include <iostream>
#include <fstream>

/* note: We want to allow out-of-memory-order initialization here.
* 1. We (presumably) must initialize .rdbuf before passing it to basic_iostream's ctor
* 2. Since we inherit basic_iostream, its memory will precede .rdbuf
*
* Example 1 (compress)
*
* // zstream = basic_zstream<char>, in this file following basic_zstream decl
* zstream zs(64*1024, "path/to/foo.gz", ios::out);
*
* zs << "some text to be compressed" << endl;
*
* zs.close();
*
* Example 2 (uncompress)
*
* zstream zs(64*1024, "path/to/foo.gz", ios::in);
*
* while (!zs.eof()) {
* std::string x;
* zs >> x;
*
* cout << "input: [" << x << "]" << endl;
* }
*/
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wreorder"
Expand All @@ -21,12 +42,33 @@ class basic_zstream : public std::basic_iostream<CharT, Traits> {
using off_type = typename Traits::off_type;
using zstreambuf_type = basic_zstreambuf<CharT, Traits>;

static constexpr std::streamsize c_default_buffer_size = 64 * 1024;

public:
basic_zstream(std::streamsize buf_z, std::unique_ptr<std::streambuf> native_sbuf)
:
rdbuf_(buf_z, std::move(native_sbuf)),
basic_zstream(std::streamsize buf_z,
std::unique_ptr<std::streambuf> native_sbuf,
std::ios::openmode mode)
: rdbuf_(buf_z, std::move(native_sbuf), mode),
std::basic_iostream<CharT, Traits>(&rdbuf_)
{}
/* convenience ctor; apply default buffer size */
basic_zstream(std::unique_ptr<std::streambuf> native_sbuf,
std::ios::openmode mode)
: basic_zstream(c_default_buffer_size, std::move(native_sbuf), mode) {}
/* convenience ctor; creates filebuf attached to filename and opens it */
basic_zstream(std::streamsize buf_z,
char const * filename,
std::ios::openmode mode = std::ios::in)
: rdbuf_(buf_z,
std::unique_ptr<std::streambuf>((new std::filebuf())->open(filename,
std::ios::binary | mode)),
mode),
std::basic_iostream<CharT, Traits>(&rdbuf_)
{}
/* convenience ctor; apply default buffer size */
basic_zstream(char const * filename,
std::ios::openmode mode = std::ios::in)
: basic_zstream(c_default_buffer_size, filename, mode) {}
~basic_zstream() = default;

zstreambuf_type * rdbuf() { return &rdbuf_; }
Expand All @@ -53,6 +95,10 @@ class basic_zstream : public std::basic_iostream<CharT, Traits> {
this->rdbuf_.close();
}

# ifndef NDEBUG
void set_debug_flag(bool x) { rdbuf_.set_debug_flag(x); }
# endif

private:
basic_zstreambuf<CharT, Traits> rdbuf_;
}; /*basic_zstream*/
Expand Down
83 changes: 78 additions & 5 deletions zstream/include/zstream/zstreambuf.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ operator<< (std::ostream & os, hex_view const & ins) {
return os;
}

/* implementation of streambuf that provides output to, and input from, a compressed stream */
/* implementation of streambuf that provides output to, and input from, a compressed stream
*/
template <typename CharT, typename Traits = std::char_traits<CharT>>
class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
public:
Expand All @@ -75,8 +76,10 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {

public:
basic_zstreambuf(size_type buf_z = 64 * 1024,
std::unique_ptr<std::streambuf> native_sbuf = std::unique_ptr<std::streambuf>())
std::unique_ptr<std::streambuf> native_sbuf = std::unique_ptr<std::streambuf>(),
std::ios::openmode mode = std::ios::in)
:
openmode_{mode},
in_zs_{aligned_upper_bound(buf_z), alignment()},
out_zs_{aligned_upper_bound(buf_z), alignment()},
native_sbuf_{std::move(native_sbuf)}
Expand Down Expand Up @@ -108,6 +111,12 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
this->sync_impl(true /*final_flag*/);

this->closed_flag_ = true;

/* .native_sbuf may need to flush (e.g. if it's actually a filebuf).
* The only way to invoke that behavior through the basic_streambuf api
* is to invoke destructor, so that's what we do here
*/
this->native_sbuf_.reset();
}
}

Expand Down Expand Up @@ -137,6 +146,11 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
std::swap(native_sbuf_, x.native_sbuf_);
}

# ifndef NDEBUG
/* control per-instance debug output */
void set_debug_flag(bool x) { debug_flag_ = x; }
# endif

protected:
/* estimates #of characters n available for input -- .underflow() will not be called
* or throw exception until at least n chars are extracted.
Expand All @@ -158,9 +172,17 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
virtual int_type underflow() override final {
/* control here: .input buffer (i.e. .in_zs.uc_input_buf) has been entirely consumed */

# ifndef NDEBUG
if (debug_flag_)
std::cerr << "zstreambuf::underflow: enter" << std::endl;
# endif

if ((openmode_ & std::ios::in) == 0)
throw std::runtime_error("basic_zstreambuf::underflow: expected ios::in bit set when reading from streambuf");

std::streambuf * nsbuf = native_sbuf_.get();

/* any previous output from .in_zs has already been consumed (otherwise not in underflow state) */
/* any previous output from .in_zs must have already been consumed (otherwise not in underflow state) */
in_zs_.uc_consume_all();

while (true) {
Expand All @@ -176,6 +198,11 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {

/* .in_zs needs to know how much we filled */
in_zs_.z_produce(zspan.prefix(n));

# ifndef NDEBUG
if(debug_flag_)
std::cerr << "zstreambuf::underflow: read " << n << " compressed bytes (allowing space for " << zspan.size() << ")" << std::endl;
# endif
} else {
/* it's possible previous inflate_chunk filled uncompressed output
* without consuming any compressed input, in which case can have z_avail empty
Expand Down Expand Up @@ -217,6 +244,11 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
*/
virtual int
sync() override final {
# ifndef NDEBUG
if (debug_flag_)
std::cerr << "zstreambuf::sync: enter" << std::endl;
# endif

return this->sync_impl(false /*!final_flag*/);
}

Expand All @@ -225,9 +257,19 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
*/
virtual std::streamsize
xsputn(CharT const * s, std::streamsize n_arg) override final {
# ifndef NDEBUG
if (debug_flag_) {
std::cerr << "zstreambuf::xsputn: enter" << std::endl;
std::cerr << hex_view(s, s+n_arg, true) << std::endl;
}
# endif

if (closed_flag_)
throw std::runtime_error("basic_zstreambuf::xsputn: attempted write to closed stream");

if ((openmode_ & std::ios::out) == 0)
throw std::runtime_error("basic_zstreambuf::xsputn: expected ios::out bit set when writing to streambuf");

std::streamsize n = n_arg;

std::size_t i_loop = 0;
Expand Down Expand Up @@ -274,14 +316,26 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
*
* final_flag = true: compressed stream is irrevocably complete -- no further output may be written
* final_flag = false: after .sync_impl() returns may still have un-synced output in .output_zs
*
* TODO: sync for input (e.g. consider tailing a file)
*/
int
sync_impl(bool final_flag) {
# ifndef NDEBUG
if (debug_flag_)
std::cerr << "zstreambuf::sync_impl: enter: :final_flag " << final_flag << std::endl;
# endif

if (closed_flag_) {
/* implies attempt to write more output after call to .close() promised not to */
return -1;
}

if ((openmode_ & std::ios::out) == 0) {
/* nothing to do if not using stream for output */
return 0;
}

std::streambuf * nsbuf = native_sbuf_.get();

/* consume all available uncompressed output
Expand All @@ -299,8 +353,14 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
out_zs_.deflate_chunk(final_flag);
auto zspan = out_zs_.z_contents();

if (nsbuf->sputn(reinterpret_cast<char *>(zspan.lo()), zspan.size()) < static_cast<std::streamsize>(zspan.size()))
throw std::runtime_error("zstreambuf::sync_impl: partial write!");
std::streamsize n_written = nsbuf->sputn(reinterpret_cast<char *>(zspan.lo()),
zspan.size());

if (n_written < static_cast<std::streamsize>(zspan.size())) {
throw std::runtime_error(tostr("zstreambuf::sync_impl: partial write",
" :attempted ", zspan.size(),
" :wrote ", n_written));
}

out_zs_.z_consume(zspan);

Expand Down Expand Up @@ -369,6 +429,15 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {
* .pbeg, .pend ------------> .out_zs -------------------------------> .native_sbuf
*/

/* we need to know if intending to use this zstreambuf for output:
* (i) compressing an empty input sequence produces non-empty output (since will create a 20-byte gzip header)
* Therefore:
* (a) zstream("foo.gz", ios::out) should create valid foo.gz representing an empty sequence.
* (b) .sync_impl(true) needs to know whether to do this, since it will also be called when intending
* this zstreambuf for input only
*/
std::ios::openmode openmode_;

/* set irrevocably on .close() */
bool closed_flag_ = false;

Expand All @@ -384,6 +453,10 @@ class basic_zstreambuf : public std::basic_streambuf<CharT, Traits> {

/* i/o for compressed data */
std::unique_ptr<std::streambuf> native_sbuf_;

# ifndef NDEBUG
bool debug_flag_ = false;
# endif
}; /*basic_zstreambuf*/

using zstreambuf = basic_zstreambuf<char>;
Expand Down
Loading

0 comments on commit 0820712

Please sign in to comment.