diff --git a/pkg/lightning/mydump/csv_parser.go b/pkg/lightning/mydump/csv_parser.go index 8d48496089bd9..673888f147419 100644 --- a/pkg/lightning/mydump/csv_parser.go +++ b/pkg/lightning/mydump/csv_parser.go @@ -398,7 +398,7 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) { var buf []byte for { buf = append(buf, parser.buf...) - if len(buf) > LargestEntryLimit { + if parser.checkRowLen && parser.pos-parser.rowStartPos+int64(len(buf)) > int64(LargestEntryLimit) { return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit") } parser.buf = nil @@ -442,7 +442,7 @@ outside: // end of a line, the substring can still be dropped by rule 2. if len(parser.startingBy) > 0 && !foundStartingByThisLine { oldPos := parser.pos - content, _, err := parser.ReadUntilTerminator() + content, _, err := parser.readUntilTerminator() if err != nil { if !(errors.Cause(err) == io.EOF) { return nil, err @@ -629,6 +629,8 @@ func (parser *CSVParser) replaceEOF(err error, replaced error) error { // ReadRow reads a row from the datafile. func (parser *CSVParser) ReadRow() error { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() row := &parser.lastRow row.Length = 0 row.RowID++ @@ -679,6 +681,8 @@ func (parser *CSVParser) ReadRow() error { // ReadColumns reads the columns of this CSV file. func (parser *CSVParser) ReadColumns() error { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() columns, err := parser.readRecord(nil) if err != nil { return errors.Trace(err) @@ -705,6 +709,12 @@ func (parser *CSVParser) ReadColumns() error { // Note that the terminator string pattern may be the content of a field, which // means it's inside quotes. Caller should make sure to handle this case. func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() + return parser.readUntilTerminator() +} + +func (parser *CSVParser) readUntilTerminator() ([]byte, int64, error) { var ret []byte for { content, firstByte, err := parser.readUntil(&parser.newLineByteSet) diff --git a/pkg/lightning/mydump/csv_parser_test.go b/pkg/lightning/mydump/csv_parser_test.go index 5fc60ee073e8e..30b5b082bfa34 100644 --- a/pkg/lightning/mydump/csv_parser_test.go +++ b/pkg/lightning/mydump/csv_parser_test.go @@ -838,20 +838,49 @@ func TestTooLargeRow(t *testing.T) { Delimiter: `"`, }, } - var testCase bytes.Buffer - testCase.WriteString("a,b,c,d") - // WARN: will take up 10KB memory here. - mydump.LargestEntryLimit = 10 * 1024 - for i := 0; i < mydump.LargestEntryLimit; i++ { - testCase.WriteByte('d') - } - charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) - require.NoError(t, err) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor) - require.NoError(t, err) - e := parser.ReadRow() - require.Error(t, e) - require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + bak := mydump.LargestEntryLimit + t.Cleanup(func() { + mydump.LargestEntryLimit = bak + }) + mydump.LargestEntryLimit = 1024 + t.Run("too long field", func(t *testing.T) { + var dataBuf bytes.Buffer + dataBuf.WriteString("a,b,c,d") + for i := 0; i < mydump.LargestEntryLimit; i++ { + dataBuf.WriteByte('d') + } + require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit) + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + require.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor) + require.NoError(t, err) + e := parser.ReadRow() + require.Error(t, e) + require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + }) + + t.Run("field is short, but whole row too long", func(t *testing.T) { + var dataBuf bytes.Buffer + for i := 0; i < 16; i++ { + if i > 0 { + dataBuf.WriteByte(',') + } + for j := 0; j < mydump.LargestEntryLimit/16; j++ { + dataBuf.WriteByte('d') + } + } + for i := 0; i < mydump.LargestEntryLimit-dataBuf.Len()+16; i++ { + dataBuf.WriteByte('d') + } + require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit) + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + require.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor) + require.NoError(t, err) + e := parser.ReadRow() + require.Error(t, e) + require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + }) } func TestSpecialChars(t *testing.T) { diff --git a/pkg/lightning/mydump/parser.go b/pkg/lightning/mydump/parser.go index 959301984ad07..a47ee4ba76ff0 100644 --- a/pkg/lightning/mydump/parser.go +++ b/pkg/lightning/mydump/parser.go @@ -61,11 +61,17 @@ type blockParser struct { // cache remainBuf *bytes.Buffer + // holds the cached parsable data after last readBlock. all data inside is + // unparsed at the moment of the return of last readBlock, for current unparsed + // data, use buf. appendBuf *bytes.Buffer // the Logger associated with this parser for reporting failure Logger log.Logger metrics *metric.Metrics + + checkRowLen bool + rowStartPos int64 } func makeBlockParser( @@ -187,6 +193,15 @@ func NewChunkParser( } } +func (parser *blockParser) beginRowLenCheck() { + parser.checkRowLen = true + parser.rowStartPos = parser.pos +} + +func (parser *blockParser) endRowLenCheck() { + parser.checkRowLen = false +} + // SetPos changes the reported position and row ID. func (parser *blockParser) SetPos(pos int64, rowID int64) error { p, err := parser.reader.Seek(pos, io.SeekStart)