Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/lightning/mydump/csv_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) {
var buf []byte
for {
buf = append(buf, parser.buf...)
if len(buf) > LargestEntryLimit {
if parser.checkRowLen && parser.pos-parser.rowStartPos+int64(len(buf)) > int64(LargestEntryLimit) {
return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit")
}
parser.buf = nil
Expand Down Expand Up @@ -442,7 +442,7 @@ outside:
// end of a line, the substring can still be dropped by rule 2.
if len(parser.startingBy) > 0 && !foundStartingByThisLine {
oldPos := parser.pos
content, _, err := parser.ReadUntilTerminator()
content, _, err := parser.readUntilTerminator()
if err != nil {
if !(errors.Cause(err) == io.EOF) {
return nil, err
Expand Down Expand Up @@ -629,6 +629,8 @@ func (parser *CSVParser) replaceEOF(err error, replaced error) error {

// ReadRow reads a row from the datafile.
func (parser *CSVParser) ReadRow() error {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
row := &parser.lastRow
row.Length = 0
row.RowID++
Expand Down Expand Up @@ -679,6 +681,8 @@ func (parser *CSVParser) ReadRow() error {

// ReadColumns reads the columns of this CSV file.
func (parser *CSVParser) ReadColumns() error {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
columns, err := parser.readRecord(nil)
if err != nil {
return errors.Trace(err)
Expand All @@ -705,6 +709,12 @@ func (parser *CSVParser) ReadColumns() error {
// Note that the terminator string pattern may be the content of a field, which
// means it's inside quotes. Caller should make sure to handle this case.
func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) {
parser.beginRowLenCheck()
defer parser.endRowLenCheck()
return parser.readUntilTerminator()
}

func (parser *CSVParser) readUntilTerminator() ([]byte, int64, error) {
var ret []byte
for {
content, firstByte, err := parser.readUntil(&parser.newLineByteSet)
Expand Down
57 changes: 43 additions & 14 deletions pkg/lightning/mydump/csv_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,20 +838,49 @@ func TestTooLargeRow(t *testing.T) {
Delimiter: `"`,
},
}
var testCase bytes.Buffer
testCase.WriteString("a,b,c,d")
// WARN: will take up 10KB memory here.
mydump.LargestEntryLimit = 10 * 1024
for i := 0; i < mydump.LargestEntryLimit; i++ {
testCase.WriteByte('d')
}
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
bak := mydump.LargestEntryLimit
t.Cleanup(func() {
mydump.LargestEntryLimit = bak
})
mydump.LargestEntryLimit = 1024
t.Run("too long field", func(t *testing.T) {
var dataBuf bytes.Buffer
dataBuf.WriteString("a,b,c,d")
for i := 0; i < mydump.LargestEntryLimit; i++ {
dataBuf.WriteByte('d')
}
require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit)
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
})

t.Run("field is short, but whole row too long", func(t *testing.T) {
var dataBuf bytes.Buffer
for i := 0; i < 16; i++ {
if i > 0 {
dataBuf.WriteByte(',')
}
for j := 0; j < mydump.LargestEntryLimit/16; j++ {
dataBuf.WriteByte('d')
}
}
for i := 0; i < mydump.LargestEntryLimit-dataBuf.Len()+16; i++ {
dataBuf.WriteByte('d')
}
require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit)
charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace)
require.NoError(t, err)
parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor)
require.NoError(t, err)
e := parser.ReadRow()
require.Error(t, e)
require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit")
})
}

func TestSpecialChars(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions pkg/lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,17 @@ type blockParser struct {

// cache
remainBuf *bytes.Buffer
// holds the cached parsable data after last readBlock. all data inside is
// unparsed at the moment of the return of last readBlock, for current unparsed
// data, use buf.
appendBuf *bytes.Buffer

// the Logger associated with this parser for reporting failure
Logger log.Logger
metrics *metric.Metrics

checkRowLen bool
rowStartPos int64
}

func makeBlockParser(
Expand Down Expand Up @@ -187,6 +193,15 @@ func NewChunkParser(
}
}

func (parser *blockParser) beginRowLenCheck() {
parser.checkRowLen = true
parser.rowStartPos = parser.pos
}

func (parser *blockParser) endRowLenCheck() {
parser.checkRowLen = false
}

// SetPos changes the reported position and row ID.
func (parser *blockParser) SetPos(pos int64, rowID int64) error {
p, err := parser.reader.Seek(pos, io.SeekStart)
Expand Down