diff --git a/br/pkg/lightning/mydump/csv_parser.go b/br/pkg/lightning/mydump/csv_parser.go index 61cc71616ceb1..2094d66652aea 100644 --- a/br/pkg/lightning/mydump/csv_parser.go +++ b/br/pkg/lightning/mydump/csv_parser.go @@ -344,7 +344,7 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) { var buf []byte for { buf = append(buf, parser.buf...) - if len(buf) > LargestEntryLimit { + if parser.checkRowLen && parser.pos-parser.rowStartPos+int64(len(buf)) > int64(LargestEntryLimit) { return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit") } parser.buf = nil @@ -513,6 +513,8 @@ func (parser *CSVParser) replaceEOF(err error, replaced error) error { // ReadRow reads a row from the datafile. func (parser *CSVParser) ReadRow() error { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() row := &parser.lastRow row.Length = 0 row.RowID++ @@ -563,6 +565,8 @@ func (parser *CSVParser) ReadRow() error { // ReadColumns reads the columns of this CSV file. func (parser *CSVParser) ReadColumns() error { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() columns, err := parser.readRecord(nil) if err != nil { return errors.Trace(err) @@ -582,6 +586,8 @@ func (parser *CSVParser) ReadColumns() error { // returns the file offset beyond the terminator. // This function is used in strict-format dividing a CSV file. func (parser *CSVParser) ReadUntilTerminator() (int64, error) { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() for { _, firstByte, err := parser.readUntil(&parser.newLineByteSet) if err != nil { diff --git a/br/pkg/lightning/mydump/csv_parser_test.go b/br/pkg/lightning/mydump/csv_parser_test.go index da06c15ed39d9..da76c7157d03b 100644 --- a/br/pkg/lightning/mydump/csv_parser_test.go +++ b/br/pkg/lightning/mydump/csv_parser_test.go @@ -688,20 +688,49 @@ func TestTooLargeRow(t *testing.T) { Delimiter: `"`, }, } - var testCase bytes.Buffer - testCase.WriteString("a,b,c,d") - // WARN: will take up 10KB memory here. - mydump.LargestEntryLimit = 10 * 1024 - for i := 0; i < mydump.LargestEntryLimit; i++ { - testCase.WriteByte('d') - } - charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) - require.NoError(t, err) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor) - require.NoError(t, err) - e := parser.ReadRow() - require.Error(t, e) - require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + bak := mydump.LargestEntryLimit + t.Cleanup(func() { + mydump.LargestEntryLimit = bak + }) + mydump.LargestEntryLimit = 1024 + t.Run("too long field", func(t *testing.T) { + var dataBuf bytes.Buffer + dataBuf.WriteString("a,b,c,d") + for i := 0; i < mydump.LargestEntryLimit; i++ { + dataBuf.WriteByte('d') + } + require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit) + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + require.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor) + require.NoError(t, err) + e := parser.ReadRow() + require.Error(t, e) + require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + }) + + t.Run("field is short, but whole row too long", func(t *testing.T) { + var dataBuf bytes.Buffer + for i := 0; i < 16; i++ { + if i > 0 { + dataBuf.WriteByte(',') + } + for j := 0; j < mydump.LargestEntryLimit/16; j++ { + dataBuf.WriteByte('d') + } + } + for i := 0; i < mydump.LargestEntryLimit-dataBuf.Len()+16; i++ { + dataBuf.WriteByte('d') + } + require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit) + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + require.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkers, false, charsetConvertor) + require.NoError(t, err) + e := parser.ReadRow() + require.Error(t, e) + require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + }) } func TestSpecialChars(t *testing.T) { diff --git a/br/pkg/lightning/mydump/parser.go b/br/pkg/lightning/mydump/parser.go index 0ac82ce189d71..53d478d2749f4 100644 --- a/br/pkg/lightning/mydump/parser.go +++ b/br/pkg/lightning/mydump/parser.go @@ -54,11 +54,17 @@ type blockParser struct { // cache remainBuf *bytes.Buffer + // holds the cached parsable data after last readBlock. all data inside is + // unparsed at the moment of the return of last readBlock, for current unparsed + // data, use buf. appendBuf *bytes.Buffer // the Logger associated with this parser for reporting failure Logger log.Logger metrics *metric.Metrics + + checkRowLen bool + rowStartPos int64 } func makeBlockParser( @@ -164,6 +170,15 @@ func NewChunkParser( } } +func (parser *blockParser) beginRowLenCheck() { + parser.checkRowLen = true + parser.rowStartPos = parser.pos +} + +func (parser *blockParser) endRowLenCheck() { + parser.checkRowLen = false +} + // SetPos changes the reported position and row ID. func (parser *blockParser) SetPos(pos int64, rowID int64) error { p, err := parser.reader.Seek(pos, io.SeekStart)