Skip to content

Commit

Permalink
RecordReader: properly return io.ErrUnexpectedEOF
Browse files Browse the repository at this point in the history
This error was stripped and instead io.EOF was returned for both regex
reading and delim reading. Properly returning ErrUnexpectedEOF allows
for actual error messages.
  • Loading branch information
twmb committed May 5, 2022
1 parent 779f582 commit 2018d20
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 7 deletions.
23 changes: 17 additions & 6 deletions pkg/kgo/record_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,21 +1461,21 @@ func (r *RecordReader) next(rec *Record) error {
default:
err = r.readDelim(fn.read.delim) // we *always* fall back to delim parsing
}
if err != nil {
if !errors.Is(err, io.EOF) {
return err
}
switch err {
default:
return err
case nil:
case io.EOF, io.ErrUnexpectedEOF:
r.done = true
// We guarantee that all noread parses are at
// the front, so if we io.EOF on the first
// non-noread, then we bubble it up.
if len(r.buf) == 0 && (i == 0 || r.fns[i-1].read.noread) {
return io.EOF
}
if i != len(r.fns)-1 {
if i != len(r.fns)-1 || err == io.ErrUnexpectedEOF {
return io.ErrUnexpectedEOF
}
err = nil
}

if fn.parse == nil {
Expand Down Expand Up @@ -1522,6 +1522,9 @@ func (r *RecordReader) readRe(re *regexp.Regexp) error {
reader := reReader{r: r}
loc := re.FindReaderIndex(&reader)
if loc == nil {
if reader.err == io.EOF && len(reader.peek) > 0 {
return fmt.Errorf("regexp text mismatch, saw %q", reader.peek)
}
return reader.err
}
n := loc[1] // we ensure the regexp begins with ^, so we only need the end
Expand Down Expand Up @@ -1568,13 +1571,21 @@ func (r *RecordReader) readDelim(d []byte) error {
for {
peek, err := r.r.Peek(len(d))
if err != nil {
// If we peek an io.EOF, we were looking for our delim
// and hit the end. This is unexpected.
if err == io.EOF {
err = io.ErrUnexpectedEOF
}
return err
}
if !bytes.Equal(peek, d) {
// We did not find our delim. Skip the first char
// then continue again.
r.buf = append(r.buf, peek[0])
r.r.Discard(1)
continue
}
// We found our delim. We discard it and return.
r.r.Discard(len(d))
return nil
}
Expand Down
87 changes: 86 additions & 1 deletion pkg/kgo/record_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func TestRecordReader(t *testing.T) {
layout string
in string
exp []*Record
expErr bool
}{
{
layout: "%v",
Expand Down Expand Up @@ -415,6 +416,81 @@ func TestRecordReader(t *testing.T) {
},
},

{
layout: `%v`,
in: "foo\n",
exp: []*Record{
StringRecord("foo\n"),
},
},

{
layout: `%v{re#...#}`,
in: "abc123",
exp: []*Record{
StringRecord("abc"),
StringRecord("123"),
},
},

{
layout: `%v{re#....#}`,
in: "abc123",
exp: []*Record{
StringRecord("abc1"),
},
expErr: true,
},

{
layout: `%v{re#....#}`,
in: "abc",
exp: []*Record{},
expErr: true,
},

{
layout: `%v\n`,
in: "foo",
expErr: true,
},

{
layout: `%V{4}%v`,
in: "fo",
expErr: true,
},

{
layout: `asdf%vasdf`,
in: "asdf123addd",
expErr: true,
},

{
layout: `asdf%v`,
in: "asd",
expErr: true,
},

{
layout: `asdf%v`,
in: "asddd",
expErr: true,
},

{
layout: `asdf%v`,
in: "asddd",
expErr: true,
},

{
layout: `%o{hex8}`,
in: "az",
expErr: true,
},

//
} {
t.Run(test.layout, func(t *testing.T) {
Expand All @@ -433,7 +509,16 @@ func TestRecordReader(t *testing.T) {
t.Errorf("%d:\ngot %#v\nexp %#v", i, rec, exp)
}
}
if _, err := r.ReadRecord(); !errors.Is(err, io.EOF) {

_, err = r.ReadRecord()
// If we are expecting an error, we expect this final read to
// not be io.EOF.
if test.expErr {
if errors.Is(err, io.EOF) {
t.Error("was expecting an error, got io.EOF")
}
return
} else if !errors.Is(err, io.EOF) {
t.Errorf("got err %v != io.EOF after exhausting records", err)
}
})
Expand Down

0 comments on commit 2018d20

Please sign in to comment.