From 4c6216d8c2f171a27aca601c5f7b25ae130678ae Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 27 Dec 2024 17:50:57 +0800 Subject: [PATCH 1/3] change --- pkg/lightning/mydump/csv_parser.go | 7 ++- pkg/lightning/mydump/csv_parser_test.go | 57 +++++++++++++++++++------ pkg/lightning/mydump/parser.go | 15 +++++++ 3 files changed, 64 insertions(+), 15 deletions(-) diff --git a/pkg/lightning/mydump/csv_parser.go b/pkg/lightning/mydump/csv_parser.go index 8d48496089bd9..edf98f5deacfa 100644 --- a/pkg/lightning/mydump/csv_parser.go +++ b/pkg/lightning/mydump/csv_parser.go @@ -398,7 +398,8 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) { var buf []byte for { buf = append(buf, parser.buf...) - if len(buf) > LargestEntryLimit { + roughRowSize := parser.pos - parser.rowStartPos + int64(len(buf)) + if parser.checkRowLen && roughRowSize > int64(LargestEntryLimit) { return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit") } parser.buf = nil @@ -629,6 +630,8 @@ func (parser *CSVParser) replaceEOF(err error, replaced error) error { // ReadRow reads a row from the datafile. func (parser *CSVParser) ReadRow() error { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() row := &parser.lastRow row.Length = 0 row.RowID++ @@ -679,6 +682,8 @@ func (parser *CSVParser) ReadRow() error { // ReadColumns reads the columns of this CSV file. func (parser *CSVParser) ReadColumns() error { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() columns, err := parser.readRecord(nil) if err != nil { return errors.Trace(err) diff --git a/pkg/lightning/mydump/csv_parser_test.go b/pkg/lightning/mydump/csv_parser_test.go index 5fc60ee073e8e..30b5b082bfa34 100644 --- a/pkg/lightning/mydump/csv_parser_test.go +++ b/pkg/lightning/mydump/csv_parser_test.go @@ -838,20 +838,49 @@ func TestTooLargeRow(t *testing.T) { Delimiter: `"`, }, } - var testCase bytes.Buffer - testCase.WriteString("a,b,c,d") - // WARN: will take up 10KB memory here. - mydump.LargestEntryLimit = 10 * 1024 - for i := 0; i < mydump.LargestEntryLimit; i++ { - testCase.WriteByte('d') - } - charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) - require.NoError(t, err) - parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(testCase.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor) - require.NoError(t, err) - e := parser.ReadRow() - require.Error(t, e) - require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + bak := mydump.LargestEntryLimit + t.Cleanup(func() { + mydump.LargestEntryLimit = bak + }) + mydump.LargestEntryLimit = 1024 + t.Run("too long field", func(t *testing.T) { + var dataBuf bytes.Buffer + dataBuf.WriteString("a,b,c,d") + for i := 0; i < mydump.LargestEntryLimit; i++ { + dataBuf.WriteByte('d') + } + require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit) + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + require.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor) + require.NoError(t, err) + e := parser.ReadRow() + require.Error(t, e) + require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + }) + + t.Run("field is short, but whole row too long", func(t *testing.T) { + var dataBuf bytes.Buffer + for i := 0; i < 16; i++ { + if i > 0 { + dataBuf.WriteByte(',') + } + for j := 0; j < mydump.LargestEntryLimit/16; j++ { + dataBuf.WriteByte('d') + } + } + for i := 0; i < mydump.LargestEntryLimit-dataBuf.Len()+16; i++ { + dataBuf.WriteByte('d') + } + require.Greater(t, dataBuf.Len(), mydump.LargestEntryLimit) + charsetConvertor, err := mydump.NewCharsetConvertor(cfg.DataCharacterSet, cfg.DataInvalidCharReplace) + require.NoError(t, err) + parser, err := mydump.NewCSVParser(context.Background(), &cfg.CSV, mydump.NewStringReader(dataBuf.String()), int64(config.ReadBlockSize), ioWorkersForCSV, false, charsetConvertor) + require.NoError(t, err) + e := parser.ReadRow() + require.Error(t, e) + require.Contains(t, e.Error(), "size of row cannot exceed the max value of txn-entry-size-limit") + }) } func TestSpecialChars(t *testing.T) { diff --git a/pkg/lightning/mydump/parser.go b/pkg/lightning/mydump/parser.go index 959301984ad07..a47ee4ba76ff0 100644 --- a/pkg/lightning/mydump/parser.go +++ b/pkg/lightning/mydump/parser.go @@ -61,11 +61,17 @@ type blockParser struct { // cache remainBuf *bytes.Buffer + // holds the cached parsable data after last readBlock. all data inside is + // unparsed at the moment of the return of last readBlock, for current unparsed + // data, use buf. appendBuf *bytes.Buffer // the Logger associated with this parser for reporting failure Logger log.Logger metrics *metric.Metrics + + checkRowLen bool + rowStartPos int64 } func makeBlockParser( @@ -187,6 +193,15 @@ func NewChunkParser( } } +func (parser *blockParser) beginRowLenCheck() { + parser.checkRowLen = true + parser.rowStartPos = parser.pos +} + +func (parser *blockParser) endRowLenCheck() { + parser.checkRowLen = false +} + // SetPos changes the reported position and row ID. func (parser *blockParser) SetPos(pos int64, rowID int64) error { p, err := parser.reader.Seek(pos, io.SeekStart) From 94b09ca3eee37a92dc09ba5c5666dade8fb1a050 Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 27 Dec 2024 17:54:19 +0800 Subject: [PATCH 2/3] change --- pkg/lightning/mydump/csv_parser.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/lightning/mydump/csv_parser.go b/pkg/lightning/mydump/csv_parser.go index edf98f5deacfa..93f00dda23379 100644 --- a/pkg/lightning/mydump/csv_parser.go +++ b/pkg/lightning/mydump/csv_parser.go @@ -443,7 +443,7 @@ outside: // end of a line, the substring can still be dropped by rule 2. if len(parser.startingBy) > 0 && !foundStartingByThisLine { oldPos := parser.pos - content, _, err := parser.ReadUntilTerminator() + content, _, err := parser.readUntilTerminator() if err != nil { if !(errors.Cause(err) == io.EOF) { return nil, err @@ -710,6 +710,12 @@ func (parser *CSVParser) ReadColumns() error { // Note that the terminator string pattern may be the content of a field, which // means it's inside quotes. Caller should make sure to handle this case. func (parser *CSVParser) ReadUntilTerminator() ([]byte, int64, error) { + parser.beginRowLenCheck() + defer parser.endRowLenCheck() + return parser.readUntilTerminator() +} + +func (parser *CSVParser) readUntilTerminator() ([]byte, int64, error) { var ret []byte for { content, firstByte, err := parser.readUntil(&parser.newLineByteSet) From 86e2d6d61774dfab6abde1b25831a09343093b1b Mon Sep 17 00:00:00 2001 From: D3Hunter Date: Fri, 27 Dec 2024 17:59:41 +0800 Subject: [PATCH 3/3] change --- pkg/lightning/mydump/csv_parser.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/lightning/mydump/csv_parser.go b/pkg/lightning/mydump/csv_parser.go index 93f00dda23379..673888f147419 100644 --- a/pkg/lightning/mydump/csv_parser.go +++ b/pkg/lightning/mydump/csv_parser.go @@ -398,8 +398,7 @@ func (parser *CSVParser) readUntil(chars *byteSet) ([]byte, byte, error) { var buf []byte for { buf = append(buf, parser.buf...) - roughRowSize := parser.pos - parser.rowStartPos + int64(len(buf)) - if parser.checkRowLen && roughRowSize > int64(LargestEntryLimit) { + if parser.checkRowLen && parser.pos-parser.rowStartPos+int64(len(buf)) > int64(LargestEntryLimit) { return buf, 0, errors.New("size of row cannot exceed the max value of txn-entry-size-limit") } parser.buf = nil