From aa1d3869c6c358fcea6b3cf883908b12a820a28f Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 8 May 2018 17:11:17 +0000 Subject: [PATCH 1/6] importccl: move import reader proc to own file Release note: none. --- pkg/ccl/importccl/csv.go | 519 ------------------------ pkg/ccl/importccl/read_import_proc.go | 546 ++++++++++++++++++++++++++ 2 files changed, 546 insertions(+), 519 deletions(-) create mode 100644 pkg/ccl/importccl/read_import_proc.go diff --git a/pkg/ccl/importccl/csv.go b/pkg/ccl/importccl/csv.go index 0fa94e772274..f7a6b3627136 100644 --- a/pkg/ccl/importccl/csv.go +++ b/pkg/ccl/importccl/csv.go @@ -11,17 +11,9 @@ package importccl import ( "bytes" "context" - "encoding/csv" - "io" "io/ioutil" - "math/rand" - "runtime" "sort" "strconv" - "sync" - "time" - - "golang.org/x/sync/errgroup" "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/ccl/backupccl" @@ -31,13 +23,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/jobs" "github.com/cockroachdb/cockroach/pkg/sql/parser" - "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" - "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -164,318 +152,6 @@ func MakeSimpleTableDescriptor( return &tableDesc, nil } -type ctxProvider struct { - context.Context -} - -func (c ctxProvider) Ctx() context.Context { - return c -} - -// groupWorkers creates num worker go routines in an error group. -func groupWorkers(ctx context.Context, num int, f func(context.Context) error) error { - group, ctx := errgroup.WithContext(ctx) - for i := 0; i < num; i++ { - group.Go(func() error { - return f(ctx) - }) - } - return group.Wait() -} - -// readCSV sends records on ch from CSV listed by dataFiles. The key part -// of dataFiles is the unique index of the CSV file among all CSV files in -// the IMPORT. comma, if non-zero, specifies the field separator. comment, -// if non-zero, specifies the comment character. It returns the number of rows -// read. progressFn, if not nil, is periodically invoked with a percentage of -// the total progress of reading through all of the files. This percentage -// attempts to use the Size() method of ExportStorage to determine how many -// bytes must be read of the CSV files, and reports the percent of bytes read -// among all dataFiles. If any Size() fails for any file, then progress is -// reported only after each file has been read. -func readCSV( - ctx context.Context, - opts roachpb.CSVOptions, - expectedCols int, - dataFiles map[int32]string, - recordCh chan<- csvRecord, - progressFn func(float32) error, - settings *cluster.Settings, -) (int64, error) { - const batchSize = 500 - expectedColsExtra := expectedCols + 1 - done := ctx.Done() - var count int64 - if opts.Comma == 0 { - opts.Comma = ',' - } - - var totalBytes, readBytes int64 - // Attempt to fetch total number of bytes for all files. - for _, dataFile := range dataFiles { - conf, err := storageccl.ExportStorageConfFromURI(dataFile) - if err != nil { - return 0, err - } - es, err := storageccl.MakeExportStorage(ctx, conf, settings) - if err != nil { - return 0, err - } - sz, err := es.Size(ctx, "") - es.Close() - if sz <= 0 { - // Don't log dataFile here because it could leak auth information. - log.Infof(ctx, "could not fetch file size; falling back to per-file progress: %v", err) - totalBytes = 0 - break - } - totalBytes += sz - } - updateFromFiles := progressFn != nil && totalBytes == 0 - updateFromBytes := progressFn != nil && totalBytes > 0 - - currentFile := 0 - for dataFileIndex, dataFile := range dataFiles { - currentFile++ - select { - case <-done: - return 0, ctx.Err() - default: - } - - err := func() error { - conf, err := storageccl.ExportStorageConfFromURI(dataFile) - if err != nil { - return err - } - es, err := storageccl.MakeExportStorage(ctx, conf, settings) - if err != nil { - return err - } - defer es.Close() - f, err := es.ReadFile(ctx, "") - if err != nil { - return err - } - bc := byteCounter{r: f} - cr := csv.NewReader(&bc) - cr.Comma = opts.Comma - cr.FieldsPerRecord = -1 - cr.LazyQuotes = true - cr.Comment = opts.Comment - - batch := csvRecord{ - file: dataFile, - fileIndex: dataFileIndex, - rowOffset: 1, - r: make([][]string, 0, batchSize), - } - - for i := 1; ; i++ { - record, err := cr.Read() - if err == io.EOF || len(batch.r) >= batchSize { - // if the batch isn't empty, we need to flush it. - if len(batch.r) > 0 { - select { - case <-done: - return ctx.Err() - case recordCh <- batch: - count += int64(len(batch.r)) - } - } - // progressBytes is the number of read bytes at which to report job progress. A - // low value may cause excessive updates in the job table which can lead to - // very large rows due to MVCC saving each version. - const progressBytes = 100 << 20 - if updateFromBytes && (err == io.EOF || bc.n > progressBytes) { - readBytes += bc.n - bc.n = 0 - if err := progressFn(float32(readBytes) / float32(totalBytes)); err != nil { - return err - } - } - if err == io.EOF { - break - } - batch.rowOffset = i - batch.r = make([][]string, 0, batchSize) - } - if err != nil { - return errors.Wrapf(err, "row %d: reading CSV record", i) - } - // Ignore the first N lines. - if uint32(i) <= opts.Skip { - continue - } - if len(record) == expectedCols { - // Expected number of columns. - } else if len(record) == expectedColsExtra && record[expectedCols] == "" { - // Line has the optional trailing comma, ignore the empty field. - record = record[:expectedCols] - } else { - return errors.Errorf("row %d: expected %d fields, got %d", i, expectedCols, len(record)) - } - batch.r = append(batch.r, record) - } - return nil - }() - if err != nil { - return 0, errors.Wrap(err, dataFile) - } - if updateFromFiles { - if err := progressFn(float32(currentFile) / float32(len(dataFiles))); err != nil { - return 0, err - } - } - } - return count, nil -} - -type byteCounter struct { - r io.Reader - n int64 -} - -func (b *byteCounter) Read(p []byte) (int, error) { - n, err := b.r.Read(p) - b.n += int64(n) - return n, err -} - -type csvRecord struct { - r [][]string - file string - fileIndex int32 - rowOffset int -} - -// convertRecord converts CSV records into KV pairs and sends them on the -// kvCh chan. -func convertRecord( - ctx context.Context, - recordCh <-chan csvRecord, - kvCh chan<- []roachpb.KeyValue, - nullif *string, - tableDesc *sqlbase.TableDescriptor, -) error { - done := ctx.Done() - - const kvBatchSize = 1000 - padding := 2 * (len(tableDesc.Indexes) + len(tableDesc.Families)) - visibleCols := tableDesc.VisibleColumns() - - ri, err := sqlbase.MakeRowInserter(nil /* txn */, tableDesc, nil, /* fkTables */ - tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) - if err != nil { - return errors.Wrap(err, "make row inserter") - } - - var txCtx transform.ExprTransformContext - evalCtx := tree.EvalContext{SessionData: &sessiondata.SessionData{Location: time.UTC}} - // Although we don't yet support DEFAULT expressions on visible columns, - // we do on hidden columns (which is only the default _rowid one). This - // allows those expressions to run. - cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(tableDesc.Columns, tableDesc, &txCtx, &evalCtx) - if err != nil { - return errors.Wrap(err, "process default columns") - } - - datums := make([]tree.Datum, len(visibleCols), len(cols)) - - // Check for a hidden column. This should be the unique_rowid PK if present. - hidden := -1 - for i, col := range cols { - if col.Hidden { - if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || hidden != -1 { - return errors.New("unexpected hidden column") - } - hidden = i - datums = append(datums, nil) - } - } - if len(datums) != len(cols) { - return errors.New("unexpected hidden column") - } - - kvBatch := make([]roachpb.KeyValue, 0, kvBatchSize+padding) - - computedIVarContainer := sqlbase.RowIndexedVarContainer{ - Mapping: ri.InsertColIDtoRowIndex, - Cols: tableDesc.Columns, - } - - for batch := range recordCh { - for batchIdx, record := range batch.r { - rowNum := batch.rowOffset + batchIdx - for i, v := range record { - col := visibleCols[i] - if nullif != nil && v == *nullif { - datums[i] = tree.DNull - } else { - var err error - datums[i], err = tree.ParseDatumStringAs(col.Type.ToDatumType(), v, &evalCtx) - if err != nil { - return errors.Wrapf(err, "%s: row %d: parse %q as %s", batch.file, rowNum, col.Name, col.Type.SQLString()) - } - } - } - if hidden >= 0 { - // We don't want to call unique_rowid() for the hidden PK column because - // it is not idempotent. The sampling from the first stage will be useless - // during the read phase, producing a single range split with all of the - // data. Instead, we will call our own function that mimics that function, - // but more-or-less guarantees that it will not interfere with the numbers - // that will be produced by it. The lower 15 bits mimic the node id, but as - // the CSV file number. The upper 48 bits are the line number and mimic the - // timestamp. It would take a file with many more than 2**32 lines to even - // begin approaching what unique_rowid would return today, so we assume it - // to be safe. Since the timestamp is won't overlap, it is safe to use any - // number in the node id portion. The 15 bits in that portion should account - // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, - // the data is xor'd so the final bits are flipped instead of set. - datums[hidden] = tree.NewDInt(builtins.GenerateUniqueID(batch.fileIndex, uint64(rowNum))) - } - - // TODO(justin): we currently disallow computed columns in import statements. - var computeExprs []tree.TypedExpr - var computedCols []sqlbase.ColumnDescriptor - - row, err := sql.GenerateInsertRow( - defaultExprs, computeExprs, cols, computedCols, evalCtx, tableDesc, datums, &computedIVarContainer) - if err != nil { - return errors.Wrapf(err, "generate insert row: %s: row %d", batch.file, rowNum) - } - if err := ri.InsertRow( - ctx, - inserter(func(kv roachpb.KeyValue) { - kv.Value.InitChecksum(kv.Key) - kvBatch = append(kvBatch, kv) - }), - row, - true, /* ignoreConflicts */ - sqlbase.SkipFKs, - false, /* traceKV */ - ); err != nil { - return errors.Wrapf(err, "insert row: %s: row %d", batch.file, rowNum) - } - if len(kvBatch) >= kvBatchSize { - select { - case kvCh <- kvBatch: - case <-done: - return ctx.Err() - } - kvBatch = make([]roachpb.KeyValue, 0, kvBatchSize+padding) - } - } - } - select { - case kvCh <- kvBatch: - case <-done: - return ctx.Err() - } - return nil -} - const csvDatabaseName = "csv" func finalizeCSVBackup( @@ -858,200 +534,6 @@ func doDistributedCSVTransform( return finalizeCSVBackup(ctx, &backupDesc, parentID, tableDesc, es, p.ExecCfg()) } -var csvOutputTypes = []sqlbase.ColumnType{ - {SemanticType: sqlbase.ColumnType_BYTES}, - {SemanticType: sqlbase.ColumnType_BYTES}, -} - -func newReadImportDataProcessor( - flowCtx *distsqlrun.FlowCtx, spec distsqlrun.ReadImportDataSpec, output distsqlrun.RowReceiver, -) (distsqlrun.Processor, error) { - cp := &readImportDataProcessor{ - flowCtx: flowCtx, - inputFormat: spec.Format, - sampleSize: spec.SampleSize, - tableDesc: spec.TableDesc, - uri: spec.Uri, - output: output, - settings: flowCtx.Settings, - registry: flowCtx.JobRegistry, - progress: spec.Progress, - } - - // Check if this was was sent by an older node. - if spec.Format.Format == roachpb.IOFileFormat_Unknown { - spec.Format.Format = roachpb.IOFileFormat_CSV - spec.Format.Csv = spec.LegacyCsvOptions - } - if err := cp.out.Init(&distsqlrun.PostProcessSpec{}, csvOutputTypes, flowCtx.NewEvalCtx(), output); err != nil { - return nil, err - } - return cp, nil -} - -type readImportDataProcessor struct { - flowCtx *distsqlrun.FlowCtx - sampleSize int32 - tableDesc sqlbase.TableDescriptor - uri map[int32]string - inputFormat roachpb.IOFileFormat - out distsqlrun.ProcOutputHelper - output distsqlrun.RowReceiver - settings *cluster.Settings - registry *jobs.Registry - progress distsqlrun.JobProgress -} - -var _ distsqlrun.Processor = &readImportDataProcessor{} - -func (cp *readImportDataProcessor) OutputTypes() []sqlbase.ColumnType { - return csvOutputTypes -} - -func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) { - ctx, span := tracing.ChildSpan(ctx, "readCSVProcessor") - defer tracing.FinishSpan(span) - - if wg != nil { - defer wg.Done() - } - - group, gCtx := errgroup.WithContext(ctx) - done := gCtx.Done() - recordCh := make(chan csvRecord) - kvCh := make(chan []roachpb.KeyValue) - sampleCh := make(chan sqlbase.EncDatumRow) - - // Read CSV into CSV records - group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "readcsv") - defer tracing.FinishSpan(span) - defer close(recordCh) - - job, err := cp.registry.LoadJob(gCtx, cp.progress.JobID) - if err != nil { - return err - } - - progFn := func(pct float32) error { - return job.Progressed(ctx, func(ctx context.Context, details jobs.Details) float32 { - d := details.(*jobs.Payload_Import).Import - slotpct := pct * cp.progress.Contribution - if len(d.Tables[0].SamplingProgress) > 0 { - d.Tables[0].SamplingProgress[cp.progress.Slot] = slotpct - } else { - d.Tables[0].ReadProgress[cp.progress.Slot] = slotpct - } - return d.Tables[0].Completed() - }) - } - - _, err = readCSV(sCtx, cp.inputFormat.Csv, len(cp.tableDesc.VisibleColumns()), - cp.uri, recordCh, progFn, cp.settings) - return err - }) - // Convert CSV records to KVs - group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "convertcsv") - defer tracing.FinishSpan(span) - - defer close(kvCh) - return groupWorkers(sCtx, runtime.NumCPU(), func(ctx context.Context) error { - return convertRecord(ctx, recordCh, kvCh, cp.inputFormat.Csv.NullEncoding, &cp.tableDesc) - }) - }) - // Sample KVs - group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "samplecsv") - defer tracing.FinishSpan(span) - - defer close(sampleCh) - var fn sampleFunc - if cp.sampleSize == 0 { - fn = sampleAll - } else { - sr := sampleRate{ - rnd: rand.New(rand.NewSource(rand.Int63())), - sampleSize: float64(cp.sampleSize), - } - fn = sr.sample - } - - // Populate the split-point spans which have already been imported. - var completedSpans roachpb.SpanGroup - job, err := cp.registry.LoadJob(gCtx, cp.progress.JobID) - if err != nil { - return err - } - if details, ok := job.Payload().Details.(*jobs.Payload_Import); ok { - completedSpans.Add(details.Import.Tables[0].SpanProgress...) - } - - typeBytes := sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_BYTES} - for kvBatch := range kvCh { - for _, kv := range kvBatch { - // Allow KV pairs to be dropped if they belong to a completed span. - if completedSpans.Contains(kv.Key) { - continue - } - if fn(kv) { - row := sqlbase.EncDatumRow{ - sqlbase.DatumToEncDatum(typeBytes, tree.NewDBytes(tree.DBytes(kv.Key))), - sqlbase.DatumToEncDatum(typeBytes, tree.NewDBytes(tree.DBytes(kv.Value.RawBytes))), - } - select { - case <-done: - return sCtx.Err() - case sampleCh <- row: - } - } - } - } - return nil - }) - // Send sampled KVs to dist sql - group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "sendcsvkv") - defer tracing.FinishSpan(span) - - for row := range sampleCh { - cs, err := cp.out.EmitRow(sCtx, row) - if err != nil { - return err - } - if cs != distsqlrun.NeedMoreRows { - return errors.New("unexpected closure of consumer") - } - } - return nil - }) - if err := group.Wait(); err != nil { - distsqlrun.DrainAndClose(ctx, cp.output, err, func(context.Context) {} /* pushTrailingMeta */) - return - } - - cp.out.Close() -} - -type sampleFunc func(roachpb.KeyValue) bool - -// sampleRate is a sampleFunc that samples a row with a probability of the -// row's size / the sample size. -type sampleRate struct { - rnd *rand.Rand - sampleSize float64 -} - -func (s sampleRate) sample(kv roachpb.KeyValue) bool { - sz := float64(len(kv.Key) + len(kv.Value.RawBytes)) - prob := sz / s.sampleSize - return prob > s.rnd.Float64() -} - -func sampleAll(kv roachpb.KeyValue) bool { - return true -} - type importResumer struct { settings *cluster.Settings res roachpb.BulkOpSummary @@ -1179,6 +661,5 @@ func importResumeHook(typ jobs.Type, settings *cluster.Settings) jobs.Resumer { func init() { sql.AddPlanHook(importPlanHook) - distsqlrun.NewReadImportDataProcessor = newReadImportDataProcessor jobs.AddResumeHook(importResumeHook) } diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go new file mode 100644 index 000000000000..4ad49f7501b6 --- /dev/null +++ b/pkg/ccl/importccl/read_import_proc.go @@ -0,0 +1,546 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package importccl + +import ( + "context" + "encoding/csv" + "io" + "math/rand" + "runtime" + "sync" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" + "github.com/cockroachdb/cockroach/pkg/sql/jobs" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" +) + +type ctxProvider struct { + context.Context +} + +func (c ctxProvider) Ctx() context.Context { + return c +} + +// groupWorkers creates num worker go routines in an error group. +func groupWorkers(ctx context.Context, num int, f func(context.Context) error) error { + group, ctx := errgroup.WithContext(ctx) + for i := 0; i < num; i++ { + group.Go(func() error { + return f(ctx) + }) + } + return group.Wait() +} + +// readCSV sends records on ch from CSV listed by dataFiles. The key part +// of dataFiles is the unique index of the CSV file among all CSV files in +// the IMPORT. comma, if non-zero, specifies the field separator. comment, +// if non-zero, specifies the comment character. It returns the number of rows +// read. progressFn, if not nil, is periodically invoked with a percentage of +// the total progress of reading through all of the files. This percentage +// attempts to use the Size() method of ExportStorage to determine how many +// bytes must be read of the CSV files, and reports the percent of bytes read +// among all dataFiles. If any Size() fails for any file, then progress is +// reported only after each file has been read. +func readCSV( + ctx context.Context, + opts roachpb.CSVOptions, + expectedCols int, + dataFiles map[int32]string, + recordCh chan<- csvRecord, + progressFn func(float32) error, + settings *cluster.Settings, +) (int64, error) { + const batchSize = 500 + expectedColsExtra := expectedCols + 1 + done := ctx.Done() + var count int64 + if opts.Comma == 0 { + opts.Comma = ',' + } + + var totalBytes, readBytes int64 + // Attempt to fetch total number of bytes for all files. + for _, dataFile := range dataFiles { + conf, err := storageccl.ExportStorageConfFromURI(dataFile) + if err != nil { + return 0, err + } + es, err := storageccl.MakeExportStorage(ctx, conf, settings) + if err != nil { + return 0, err + } + sz, err := es.Size(ctx, "") + es.Close() + if sz <= 0 { + // Don't log dataFile here because it could leak auth information. + log.Infof(ctx, "could not fetch file size; falling back to per-file progress: %v", err) + totalBytes = 0 + break + } + totalBytes += sz + } + updateFromFiles := progressFn != nil && totalBytes == 0 + updateFromBytes := progressFn != nil && totalBytes > 0 + + currentFile := 0 + for dataFileIndex, dataFile := range dataFiles { + currentFile++ + select { + case <-done: + return 0, ctx.Err() + default: + } + + err := func() error { + conf, err := storageccl.ExportStorageConfFromURI(dataFile) + if err != nil { + return err + } + es, err := storageccl.MakeExportStorage(ctx, conf, settings) + if err != nil { + return err + } + defer es.Close() + f, err := es.ReadFile(ctx, "") + if err != nil { + return err + } + bc := byteCounter{r: f} + cr := csv.NewReader(&bc) + cr.Comma = opts.Comma + cr.FieldsPerRecord = -1 + cr.LazyQuotes = true + cr.Comment = opts.Comment + + batch := csvRecord{ + file: dataFile, + fileIndex: dataFileIndex, + rowOffset: 1, + r: make([][]string, 0, batchSize), + } + + for i := 1; ; i++ { + record, err := cr.Read() + if err == io.EOF || len(batch.r) >= batchSize { + // if the batch isn't empty, we need to flush it. + if len(batch.r) > 0 { + select { + case <-done: + return ctx.Err() + case recordCh <- batch: + count += int64(len(batch.r)) + } + } + // progressBytes is the number of read bytes at which to report job progress. A + // low value may cause excessive updates in the job table which can lead to + // very large rows due to MVCC saving each version. + const progressBytes = 100 << 20 + if updateFromBytes && (err == io.EOF || bc.n > progressBytes) { + readBytes += bc.n + bc.n = 0 + if err := progressFn(float32(readBytes) / float32(totalBytes)); err != nil { + return err + } + } + if err == io.EOF { + break + } + batch.rowOffset = i + batch.r = make([][]string, 0, batchSize) + } + if err != nil { + return errors.Wrapf(err, "row %d: reading CSV record", i) + } + // Ignore the first N lines. + if uint32(i) <= opts.Skip { + continue + } + if len(record) == expectedCols { + // Expected number of columns. + } else if len(record) == expectedColsExtra && record[expectedCols] == "" { + // Line has the optional trailing comma, ignore the empty field. + record = record[:expectedCols] + } else { + return errors.Errorf("row %d: expected %d fields, got %d", i, expectedCols, len(record)) + } + batch.r = append(batch.r, record) + } + return nil + }() + if err != nil { + return 0, errors.Wrap(err, dataFile) + } + if updateFromFiles { + if err := progressFn(float32(currentFile) / float32(len(dataFiles))); err != nil { + return 0, err + } + } + } + return count, nil +} + +type byteCounter struct { + r io.Reader + n int64 +} + +func (b *byteCounter) Read(p []byte) (int, error) { + n, err := b.r.Read(p) + b.n += int64(n) + return n, err +} + +type csvRecord struct { + r [][]string + file string + fileIndex int32 + rowOffset int +} + +// convertRecord converts CSV records into KV pairs and sends them on the +// kvCh chan. +func convertRecord( + ctx context.Context, + recordCh <-chan csvRecord, + kvCh chan<- []roachpb.KeyValue, + nullif *string, + tableDesc *sqlbase.TableDescriptor, +) error { + done := ctx.Done() + + const kvBatchSize = 1000 + padding := 2 * (len(tableDesc.Indexes) + len(tableDesc.Families)) + visibleCols := tableDesc.VisibleColumns() + + ri, err := sqlbase.MakeRowInserter(nil /* txn */, tableDesc, nil, /* fkTables */ + tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) + if err != nil { + return errors.Wrap(err, "make row inserter") + } + + var txCtx transform.ExprTransformContext + evalCtx := tree.EvalContext{SessionData: &sessiondata.SessionData{Location: time.UTC}} + // Although we don't yet support DEFAULT expressions on visible columns, + // we do on hidden columns (which is only the default _rowid one). This + // allows those expressions to run. + cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(tableDesc.Columns, tableDesc, &txCtx, &evalCtx) + if err != nil { + return errors.Wrap(err, "process default columns") + } + + datums := make([]tree.Datum, len(visibleCols), len(cols)) + + // Check for a hidden column. This should be the unique_rowid PK if present. + hidden := -1 + for i, col := range cols { + if col.Hidden { + if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || hidden != -1 { + return errors.New("unexpected hidden column") + } + hidden = i + datums = append(datums, nil) + } + } + if len(datums) != len(cols) { + return errors.New("unexpected hidden column") + } + + kvBatch := make([]roachpb.KeyValue, 0, kvBatchSize+padding) + + computedIVarContainer := sqlbase.RowIndexedVarContainer{ + Mapping: ri.InsertColIDtoRowIndex, + Cols: tableDesc.Columns, + } + + for batch := range recordCh { + for batchIdx, record := range batch.r { + rowNum := batch.rowOffset + batchIdx + for i, v := range record { + col := visibleCols[i] + if nullif != nil && v == *nullif { + datums[i] = tree.DNull + } else { + var err error + datums[i], err = tree.ParseDatumStringAs(col.Type.ToDatumType(), v, &evalCtx) + if err != nil { + return errors.Wrapf(err, "%s: row %d: parse %q as %s", batch.file, rowNum, col.Name, col.Type.SQLString()) + } + } + } + if hidden >= 0 { + // We don't want to call unique_rowid() for the hidden PK column because + // it is not idempotent. The sampling from the first stage will be useless + // during the read phase, producing a single range split with all of the + // data. Instead, we will call our own function that mimics that function, + // but more-or-less guarantees that it will not interfere with the numbers + // that will be produced by it. The lower 15 bits mimic the node id, but as + // the CSV file number. The upper 48 bits are the line number and mimic the + // timestamp. It would take a file with many more than 2**32 lines to even + // begin approaching what unique_rowid would return today, so we assume it + // to be safe. Since the timestamp is won't overlap, it is safe to use any + // number in the node id portion. The 15 bits in that portion should account + // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, + // the data is xor'd so the final bits are flipped instead of set. + datums[hidden] = tree.NewDInt(builtins.GenerateUniqueID(batch.fileIndex, uint64(rowNum))) + } + + // TODO(justin): we currently disallow computed columns in import statements. + var computeExprs []tree.TypedExpr + var computedCols []sqlbase.ColumnDescriptor + + row, err := sql.GenerateInsertRow( + defaultExprs, computeExprs, cols, computedCols, evalCtx, tableDesc, datums, &computedIVarContainer) + if err != nil { + return errors.Wrapf(err, "generate insert row: %s: row %d", batch.file, rowNum) + } + if err := ri.InsertRow( + ctx, + inserter(func(kv roachpb.KeyValue) { + kv.Value.InitChecksum(kv.Key) + kvBatch = append(kvBatch, kv) + }), + row, + true, /* ignoreConflicts */ + sqlbase.SkipFKs, + false, /* traceKV */ + ); err != nil { + return errors.Wrapf(err, "insert row: %s: row %d", batch.file, rowNum) + } + if len(kvBatch) >= kvBatchSize { + select { + case kvCh <- kvBatch: + case <-done: + return ctx.Err() + } + kvBatch = make([]roachpb.KeyValue, 0, kvBatchSize+padding) + } + } + } + select { + case kvCh <- kvBatch: + case <-done: + return ctx.Err() + } + return nil +} + +var csvOutputTypes = []sqlbase.ColumnType{ + {SemanticType: sqlbase.ColumnType_BYTES}, + {SemanticType: sqlbase.ColumnType_BYTES}, +} + +func newReadImportDataProcessor( + flowCtx *distsqlrun.FlowCtx, spec distsqlrun.ReadImportDataSpec, output distsqlrun.RowReceiver, +) (distsqlrun.Processor, error) { + cp := &readImportDataProcessor{ + flowCtx: flowCtx, + inputFromat: spec.Format, + sampleSize: spec.SampleSize, + tableDesc: spec.TableDesc, + uri: spec.Uri, + output: output, + settings: flowCtx.Settings, + registry: flowCtx.JobRegistry, + progress: spec.Progress, + } + + // Check if this was was sent by an older node. + if spec.Format.Format == roachpb.IOFileFormat_Unknown { + spec.Format.Format = roachpb.IOFileFormat_CSV + spec.Format.Csv = spec.LegacyCsvOptions + } + if err := cp.out.Init(&distsqlrun.PostProcessSpec{}, csvOutputTypes, flowCtx.NewEvalCtx(), output); err != nil { + return nil, err + } + return cp, nil +} + +type readImportDataProcessor struct { + flowCtx *distsqlrun.FlowCtx + sampleSize int32 + tableDesc sqlbase.TableDescriptor + uri map[int32]string + inputFromat roachpb.IOFileFormat + out distsqlrun.ProcOutputHelper + output distsqlrun.RowReceiver + settings *cluster.Settings + registry *jobs.Registry + progress distsqlrun.JobProgress +} + +var _ distsqlrun.Processor = &readImportDataProcessor{} + +func (cp *readImportDataProcessor) OutputTypes() []sqlbase.ColumnType { + return csvOutputTypes +} + +func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) { + ctx, span := tracing.ChildSpan(ctx, "readCSVProcessor") + defer tracing.FinishSpan(span) + + if wg != nil { + defer wg.Done() + } + + group, gCtx := errgroup.WithContext(ctx) + done := gCtx.Done() + recordCh := make(chan csvRecord) + kvCh := make(chan []roachpb.KeyValue) + sampleCh := make(chan sqlbase.EncDatumRow) + + // Read CSV into CSV records + group.Go(func() error { + sCtx, span := tracing.ChildSpan(gCtx, "readcsv") + defer tracing.FinishSpan(span) + defer close(recordCh) + + job, err := cp.registry.LoadJob(gCtx, cp.progress.JobID) + if err != nil { + return err + } + + progFn := func(pct float32) error { + return job.Progressed(ctx, func(ctx context.Context, details jobs.Details) float32 { + d := details.(*jobs.Payload_Import).Import + slotpct := pct * cp.progress.Contribution + if len(d.Tables[0].SamplingProgress) > 0 { + d.Tables[0].SamplingProgress[cp.progress.Slot] = slotpct + } else { + d.Tables[0].ReadProgress[cp.progress.Slot] = slotpct + } + return d.Tables[0].Completed() + }) + } + + _, err = readCSV(sCtx, cp.inputFromat.Csv, len(cp.tableDesc.VisibleColumns()), + cp.uri, recordCh, progFn, cp.settings) + return err + }) + // Convert CSV records to KVs + group.Go(func() error { + sCtx, span := tracing.ChildSpan(gCtx, "convertcsv") + defer tracing.FinishSpan(span) + + defer close(kvCh) + return groupWorkers(sCtx, runtime.NumCPU(), func(ctx context.Context) error { + return convertRecord(ctx, recordCh, kvCh, cp.inputFromat.Csv.NullEncoding, &cp.tableDesc) + }) + }) + // Sample KVs + group.Go(func() error { + sCtx, span := tracing.ChildSpan(gCtx, "samplecsv") + defer tracing.FinishSpan(span) + + defer close(sampleCh) + var fn sampleFunc + if cp.sampleSize == 0 { + fn = sampleAll + } else { + sr := sampleRate{ + rnd: rand.New(rand.NewSource(rand.Int63())), + sampleSize: float64(cp.sampleSize), + } + fn = sr.sample + } + + // Populate the split-point spans which have already been imported. + var completedSpans roachpb.SpanGroup + job, err := cp.registry.LoadJob(gCtx, cp.progress.JobID) + if err != nil { + return err + } + if details, ok := job.Payload().Details.(*jobs.Payload_Import); ok { + completedSpans.Add(details.Import.Tables[0].SpanProgress...) + } + + typeBytes := sqlbase.ColumnType{SemanticType: sqlbase.ColumnType_BYTES} + for kvBatch := range kvCh { + for _, kv := range kvBatch { + // Allow KV pairs to be dropped if they belong to a completed span. + if completedSpans.Contains(kv.Key) { + continue + } + if fn(kv) { + row := sqlbase.EncDatumRow{ + sqlbase.DatumToEncDatum(typeBytes, tree.NewDBytes(tree.DBytes(kv.Key))), + sqlbase.DatumToEncDatum(typeBytes, tree.NewDBytes(tree.DBytes(kv.Value.RawBytes))), + } + select { + case <-done: + return sCtx.Err() + case sampleCh <- row: + } + } + } + } + return nil + }) + // Send sampled KVs to dist sql + group.Go(func() error { + sCtx, span := tracing.ChildSpan(gCtx, "sendcsvkv") + defer tracing.FinishSpan(span) + + for row := range sampleCh { + cs, err := cp.out.EmitRow(sCtx, row) + if err != nil { + return err + } + if cs != distsqlrun.NeedMoreRows { + return errors.New("unexpected closure of consumer") + } + } + return nil + }) + if err := group.Wait(); err != nil { + distsqlrun.DrainAndClose(ctx, cp.output, err, func(context.Context) {} /* pushTrailingMeta */) + return + } + + cp.out.Close() +} + +type sampleFunc func(roachpb.KeyValue) bool + +// sampleRate is a sampleFunc that samples a row with a probability of the +// row's size / the sample size. +type sampleRate struct { + rnd *rand.Rand + sampleSize float64 +} + +func (s sampleRate) sample(kv roachpb.KeyValue) bool { + sz := float64(len(kv.Key) + len(kv.Value.RawBytes)) + prob := sz / s.sampleSize + return prob > s.rnd.Float64() +} + +func sampleAll(kv roachpb.KeyValue) bool { + return true +} + +func init() { + distsqlrun.NewReadImportDataProcessor = newReadImportDataProcessor +} From 369bdc75fb84c2623478f5191691a0c531778e12 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 9 May 2018 13:35:16 +0000 Subject: [PATCH 2/6] importccl: refactor csv-specific logic vs generic file handling This refactor splits up the readCSV function, pulling the csv-speciifc logic into a csv helper called by the more generic readInputFiles function, that does the non-format-specific work of iterating and opening files and managing progress. This also refactors the processor Run a bit to remove the csv-specific logic of converter workers draining the recordCh that the reader fills, instead moving that to a csv-reading helper struct, using an interface that hopefully will allow other implementations for other formats. Release note: none. --- pkg/ccl/importccl/csv_test.go | 3 +- pkg/ccl/importccl/read_import_proc.go | 312 +++++++++++++++----------- 2 files changed, 178 insertions(+), 137 deletions(-) diff --git a/pkg/ccl/importccl/csv_test.go b/pkg/ccl/importccl/csv_test.go index e5a04028ddd9..d740fafa82cc 100644 --- a/pkg/ccl/importccl/csv_test.go +++ b/pkg/ccl/importccl/csv_test.go @@ -706,10 +706,11 @@ func BenchmarkConvertRecord(b *testing.B) { } }() + c := &csvInputReader{recordCh: recordCh, tableDesc: tableDesc} // start up workers. for i := 0; i < runtime.NumCPU(); i++ { group.Go(func() error { - return convertRecord(ctx, recordCh, kvCh, nil, tableDesc) + return c.convertRecord(ctx, kvCh) }) } const batchSize = 500 diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 4ad49f7501b6..17016d3b298d 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -35,6 +35,8 @@ import ( "github.com/pkg/errors" ) +type readFileFunc func(context.Context, io.Reader, int32, string, func(finished bool) error) error + type ctxProvider struct { context.Context } @@ -54,43 +56,33 @@ func groupWorkers(ctx context.Context, num int, f func(context.Context) error) e return group.Wait() } -// readCSV sends records on ch from CSV listed by dataFiles. The key part -// of dataFiles is the unique index of the CSV file among all CSV files in -// the IMPORT. comma, if non-zero, specifies the field separator. comment, -// if non-zero, specifies the comment character. It returns the number of rows -// read. progressFn, if not nil, is periodically invoked with a percentage of -// the total progress of reading through all of the files. This percentage +// readInputFile reads each of the passed dataFiles using the passed func. The +// key part of dataFiles is the unique index of the data file among all files in +// the IMPORT. progressFn, if not nil, is periodically invoked with a percentage +// of the total progress of reading through all of the files. This percentage // attempts to use the Size() method of ExportStorage to determine how many -// bytes must be read of the CSV files, and reports the percent of bytes read +// bytes must be read of the input files, and reports the percent of bytes read // among all dataFiles. If any Size() fails for any file, then progress is // reported only after each file has been read. -func readCSV( +func readInputFiles( ctx context.Context, - opts roachpb.CSVOptions, - expectedCols int, dataFiles map[int32]string, - recordCh chan<- csvRecord, + fileFunc readFileFunc, progressFn func(float32) error, settings *cluster.Settings, -) (int64, error) { - const batchSize = 500 - expectedColsExtra := expectedCols + 1 +) error { done := ctx.Done() - var count int64 - if opts.Comma == 0 { - opts.Comma = ',' - } var totalBytes, readBytes int64 // Attempt to fetch total number of bytes for all files. for _, dataFile := range dataFiles { conf, err := storageccl.ExportStorageConfFromURI(dataFile) if err != nil { - return 0, err + return err } es, err := storageccl.MakeExportStorage(ctx, conf, settings) if err != nil { - return 0, err + return err } sz, err := es.Size(ctx, "") es.Close() @@ -110,96 +102,53 @@ func readCSV( currentFile++ select { case <-done: - return 0, ctx.Err() + return ctx.Err() default: } - - err := func() error { - conf, err := storageccl.ExportStorageConfFromURI(dataFile) - if err != nil { - return err - } - es, err := storageccl.MakeExportStorage(ctx, conf, settings) - if err != nil { - return err - } - defer es.Close() - f, err := es.ReadFile(ctx, "") - if err != nil { - return err - } - bc := byteCounter{r: f} - cr := csv.NewReader(&bc) - cr.Comma = opts.Comma - cr.FieldsPerRecord = -1 - cr.LazyQuotes = true - cr.Comment = opts.Comment - - batch := csvRecord{ - file: dataFile, - fileIndex: dataFileIndex, - rowOffset: 1, - r: make([][]string, 0, batchSize), - } - - for i := 1; ; i++ { - record, err := cr.Read() - if err == io.EOF || len(batch.r) >= batchSize { - // if the batch isn't empty, we need to flush it. - if len(batch.r) > 0 { - select { - case <-done: - return ctx.Err() - case recordCh <- batch: - count += int64(len(batch.r)) - } - } - // progressBytes is the number of read bytes at which to report job progress. A - // low value may cause excessive updates in the job table which can lead to - // very large rows due to MVCC saving each version. - const progressBytes = 100 << 20 - if updateFromBytes && (err == io.EOF || bc.n > progressBytes) { - readBytes += bc.n - bc.n = 0 - if err := progressFn(float32(readBytes) / float32(totalBytes)); err != nil { - return err - } - } - if err == io.EOF { - break + conf, err := storageccl.ExportStorageConfFromURI(dataFile) + if err != nil { + return err + } + es, err := storageccl.MakeExportStorage(ctx, conf, settings) + if err != nil { + return err + } + defer es.Close() + f, err := es.ReadFile(ctx, "") + if err != nil { + return err + } + defer f.Close() + bc := byteCounter{r: f} + + wrappedProgressFn := func(finished bool) error { return nil } + if updateFromBytes { + const progressBytes = 100 << 20 + wrappedProgressFn = func(finished bool) error { + // progressBytes is the number of read bytes at which to report job progress. A + // low value may cause excessive updates in the job table which can lead to + // very large rows due to MVCC saving each version. + if finished || bc.n > progressBytes { + readBytes += bc.n + bc.n = 0 + if err := progressFn(float32(readBytes) / float32(totalBytes)); err != nil { + return err } - batch.rowOffset = i - batch.r = make([][]string, 0, batchSize) - } - if err != nil { - return errors.Wrapf(err, "row %d: reading CSV record", i) - } - // Ignore the first N lines. - if uint32(i) <= opts.Skip { - continue - } - if len(record) == expectedCols { - // Expected number of columns. - } else if len(record) == expectedColsExtra && record[expectedCols] == "" { - // Line has the optional trailing comma, ignore the empty field. - record = record[:expectedCols] - } else { - return errors.Errorf("row %d: expected %d fields, got %d", i, expectedCols, len(record)) } - batch.r = append(batch.r, record) + return nil } - return nil - }() - if err != nil { - return 0, errors.Wrap(err, dataFile) + } + + if err := fileFunc(ctx, &bc, dataFileIndex, dataFile, wrappedProgressFn); err != nil { + return errors.Wrap(err, dataFile) } if updateFromFiles { if err := progressFn(float32(currentFile) / float32(len(dataFiles))); err != nil { - return 0, err + return err } } } - return count, nil + return nil } type byteCounter struct { @@ -213,6 +162,112 @@ func (b *byteCounter) Read(p []byte) (int, error) { return n, err } +type csvInputReader struct { + expectedCols int + recordCh chan csvRecord + opts roachpb.CSVOptions + tableDesc *sqlbase.TableDescriptor +} + +func newCSVInputReader( + ctx context.Context, + opts roachpb.CSVOptions, + tableDesc *sqlbase.TableDescriptor, + expectedCols int, +) *csvInputReader { + return &csvInputReader{ + opts: opts, + expectedCols: expectedCols, + tableDesc: tableDesc, + recordCh: make(chan csvRecord), + } +} + +func (c *csvInputReader) start( + ctx context.Context, group *errgroup.Group, kvCh chan []roachpb.KeyValue, +) { + group.Go(func() error { + sCtx, span := tracing.ChildSpan(ctx, "convertcsv") + defer tracing.FinishSpan(span) + + defer close(kvCh) + return groupWorkers(sCtx, runtime.NumCPU(), func(ctx context.Context) error { + return c.convertRecord(ctx, kvCh) + }) + }) +} + +func (c *csvInputReader) inputFinished() { + close(c.recordCh) +} + +func (c *csvInputReader) readFile( + ctx context.Context, + input io.Reader, + inputIdx int32, + inputName string, + progressFn func(finished bool) error, +) error { + done := ctx.Done() + cr := csv.NewReader(input) + if c.opts.Comma != 0 { + cr.Comma = c.opts.Comma + } + cr.FieldsPerRecord = -1 + cr.LazyQuotes = true + cr.Comment = c.opts.Comment + + const batchSize = 500 + + batch := csvRecord{ + file: inputName, + fileIndex: inputIdx, + rowOffset: 1, + r: make([][]string, 0, batchSize), + } + + var count int64 + for i := 1; ; i++ { + record, err := cr.Read() + if err == io.EOF || len(batch.r) >= batchSize { + // if the batch isn't empty, we need to flush it. + if len(batch.r) > 0 { + select { + case <-done: + return ctx.Err() + case c.recordCh <- batch: + count += int64(len(batch.r)) + } + } + if progressErr := progressFn(err == io.EOF); progressErr != nil { + return progressErr + } + if err == io.EOF { + break + } + batch.rowOffset = i + batch.r = make([][]string, 0, batchSize) + } + if err != nil { + return errors.Wrapf(err, "row %d: reading CSV record", i) + } + // Ignore the first N lines. + if uint32(i) <= c.opts.Skip { + continue + } + if len(record) == c.expectedCols { + // Expected number of columns. + } else if len(record) == c.expectedCols+1 && record[c.expectedCols] == "" { + // Line has the optional trailing comma, ignore the empty field. + record = record[:c.expectedCols] + } else { + return errors.Errorf("row %d: expected %d fields, got %d", i, c.expectedCols, len(record)) + } + batch.r = append(batch.r, record) + } + return nil +} + type csvRecord struct { r [][]string file string @@ -222,21 +277,15 @@ type csvRecord struct { // convertRecord converts CSV records into KV pairs and sends them on the // kvCh chan. -func convertRecord( - ctx context.Context, - recordCh <-chan csvRecord, - kvCh chan<- []roachpb.KeyValue, - nullif *string, - tableDesc *sqlbase.TableDescriptor, -) error { +func (c *csvInputReader) convertRecord(ctx context.Context, kvCh chan<- []roachpb.KeyValue) error { done := ctx.Done() const kvBatchSize = 1000 - padding := 2 * (len(tableDesc.Indexes) + len(tableDesc.Families)) - visibleCols := tableDesc.VisibleColumns() + padding := 2 * (len(c.tableDesc.Indexes) + len(c.tableDesc.Families)) + visibleCols := c.tableDesc.VisibleColumns() - ri, err := sqlbase.MakeRowInserter(nil /* txn */, tableDesc, nil, /* fkTables */ - tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) + ri, err := sqlbase.MakeRowInserter(nil /* txn */, c.tableDesc, nil, /* fkTables */ + c.tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) if err != nil { return errors.Wrap(err, "make row inserter") } @@ -246,7 +295,7 @@ func convertRecord( // Although we don't yet support DEFAULT expressions on visible columns, // we do on hidden columns (which is only the default _rowid one). This // allows those expressions to run. - cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(tableDesc.Columns, tableDesc, &txCtx, &evalCtx) + cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(c.tableDesc.Columns, c.tableDesc, &txCtx, &evalCtx) if err != nil { return errors.Wrap(err, "process default columns") } @@ -272,15 +321,15 @@ func convertRecord( computedIVarContainer := sqlbase.RowIndexedVarContainer{ Mapping: ri.InsertColIDtoRowIndex, - Cols: tableDesc.Columns, + Cols: c.tableDesc.Columns, } - for batch := range recordCh { + for batch := range c.recordCh { for batchIdx, record := range batch.r { rowNum := batch.rowOffset + batchIdx for i, v := range record { col := visibleCols[i] - if nullif != nil && v == *nullif { + if c.opts.NullEncoding != nil && v == *c.opts.NullEncoding { datums[i] = tree.DNull } else { var err error @@ -312,7 +361,7 @@ func convertRecord( var computedCols []sqlbase.ColumnDescriptor row, err := sql.GenerateInsertRow( - defaultExprs, computeExprs, cols, computedCols, evalCtx, tableDesc, datums, &computedIVarContainer) + defaultExprs, computeExprs, cols, computedCols, evalCtx, c.tableDesc, datums, &computedIVarContainer) if err != nil { return errors.Wrapf(err, "generate insert row: %s: row %d", batch.file, rowNum) } @@ -398,7 +447,7 @@ func (cp *readImportDataProcessor) OutputTypes() []sqlbase.ColumnType { } func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) { - ctx, span := tracing.ChildSpan(ctx, "readCSVProcessor") + ctx, span := tracing.ChildSpan(ctx, "readImportDataProcessor") defer tracing.FinishSpan(span) if wg != nil { @@ -407,23 +456,25 @@ func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) group, gCtx := errgroup.WithContext(ctx) done := gCtx.Done() - recordCh := make(chan csvRecord) kvCh := make(chan []roachpb.KeyValue) sampleCh := make(chan sqlbase.EncDatumRow) - // Read CSV into CSV records + c := newCSVInputReader(ctx, cp.inputFromat.Csv, &cp.tableDesc, len(cp.tableDesc.VisibleColumns())) + c.start(ctx, group, kvCh) + + // Read input files into kvs group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "readcsv") + sCtx, span := tracing.ChildSpan(gCtx, "readImportFiles") defer tracing.FinishSpan(span) - defer close(recordCh) + defer c.inputFinished() - job, err := cp.registry.LoadJob(gCtx, cp.progress.JobID) + job, err := cp.registry.LoadJob(sCtx, cp.progress.JobID) if err != nil { return err } progFn := func(pct float32) error { - return job.Progressed(ctx, func(ctx context.Context, details jobs.Details) float32 { + return job.Progressed(sCtx, func(ctx context.Context, details jobs.Details) float32 { d := details.(*jobs.Payload_Import).Import slotpct := pct * cp.progress.Contribution if len(d.Tables[0].SamplingProgress) > 0 { @@ -435,23 +486,12 @@ func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) }) } - _, err = readCSV(sCtx, cp.inputFromat.Csv, len(cp.tableDesc.VisibleColumns()), - cp.uri, recordCh, progFn, cp.settings) - return err + return readInputFiles(sCtx, cp.uri, c.readFile, progFn, cp.settings) }) - // Convert CSV records to KVs - group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "convertcsv") - defer tracing.FinishSpan(span) - defer close(kvCh) - return groupWorkers(sCtx, runtime.NumCPU(), func(ctx context.Context) error { - return convertRecord(ctx, recordCh, kvCh, cp.inputFromat.Csv.NullEncoding, &cp.tableDesc) - }) - }) // Sample KVs group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "samplecsv") + sCtx, span := tracing.ChildSpan(gCtx, "sampleImportKVs") defer tracing.FinishSpan(span) defer close(sampleCh) @@ -500,7 +540,7 @@ func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) }) // Send sampled KVs to dist sql group.Go(func() error { - sCtx, span := tracing.ChildSpan(gCtx, "sendcsvkv") + sCtx, span := tracing.ChildSpan(gCtx, "sendImportKVs") defer tracing.FinishSpan(span) for row := range sampleCh { From e8e7ebfcdf9535cea7805e796c34dabf2a858d95 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 9 May 2018 13:38:12 +0000 Subject: [PATCH 3/6] importccl: rename csv.go to be more accurate Releaase note: none. --- pkg/ccl/importccl/{csv.go => import_stmt.go} | 0 pkg/ccl/importccl/{csv_test.go => import_stmt_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename pkg/ccl/importccl/{csv.go => import_stmt.go} (100%) rename pkg/ccl/importccl/{csv_test.go => import_stmt_test.go} (100%) diff --git a/pkg/ccl/importccl/csv.go b/pkg/ccl/importccl/import_stmt.go similarity index 100% rename from pkg/ccl/importccl/csv.go rename to pkg/ccl/importccl/import_stmt.go diff --git a/pkg/ccl/importccl/csv_test.go b/pkg/ccl/importccl/import_stmt_test.go similarity index 100% rename from pkg/ccl/importccl/csv_test.go rename to pkg/ccl/importccl/import_stmt_test.go From f065fdb4e3f4f7b66bc1455afeb8a062f0e2292e Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 9 May 2018 13:41:12 +0000 Subject: [PATCH 4/6] importccl: move csv reader to own file Release note: none. --- pkg/ccl/importccl/read_import_csv.go | 263 ++++++++++++++++++++++++++ pkg/ccl/importccl/read_import_proc.go | 241 ----------------------- 2 files changed, 263 insertions(+), 241 deletions(-) create mode 100644 pkg/ccl/importccl/read_import_csv.go diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go new file mode 100644 index 000000000000..8857c584f402 --- /dev/null +++ b/pkg/ccl/importccl/read_import_csv.go @@ -0,0 +1,263 @@ +// Copyright 2017 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package importccl + +import ( + "context" + "encoding/csv" + "io" + "runtime" + "time" + + "golang.org/x/sync/errgroup" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/pkg/errors" +) + +type csvInputReader struct { + expectedCols int + recordCh chan csvRecord + opts roachpb.CSVOptions + tableDesc *sqlbase.TableDescriptor +} + +func newCSVInputReader( + ctx context.Context, + opts roachpb.CSVOptions, + tableDesc *sqlbase.TableDescriptor, + expectedCols int, +) *csvInputReader { + return &csvInputReader{ + opts: opts, + expectedCols: expectedCols, + tableDesc: tableDesc, + recordCh: make(chan csvRecord), + } +} + +func (c *csvInputReader) start( + ctx context.Context, group *errgroup.Group, kvCh chan []roachpb.KeyValue, +) { + group.Go(func() error { + sCtx, span := tracing.ChildSpan(ctx, "convertcsv") + defer tracing.FinishSpan(span) + + defer close(kvCh) + return groupWorkers(sCtx, runtime.NumCPU(), func(ctx context.Context) error { + return c.convertRecord(ctx, kvCh) + }) + }) +} + +func (c *csvInputReader) inputFinished() { + close(c.recordCh) +} + +func (c *csvInputReader) readFile( + ctx context.Context, + input io.Reader, + inputIdx int32, + inputName string, + progressFn func(finished bool) error, +) error { + done := ctx.Done() + cr := csv.NewReader(input) + if c.opts.Comma != 0 { + cr.Comma = c.opts.Comma + } + cr.FieldsPerRecord = -1 + cr.LazyQuotes = true + cr.Comment = c.opts.Comment + + const batchSize = 500 + + batch := csvRecord{ + file: inputName, + fileIndex: inputIdx, + rowOffset: 1, + r: make([][]string, 0, batchSize), + } + + var count int64 + for i := 1; ; i++ { + record, err := cr.Read() + if err == io.EOF || len(batch.r) >= batchSize { + // if the batch isn't empty, we need to flush it. + if len(batch.r) > 0 { + select { + case <-done: + return ctx.Err() + case c.recordCh <- batch: + count += int64(len(batch.r)) + } + } + if progressErr := progressFn(err == io.EOF); progressErr != nil { + return progressErr + } + if err == io.EOF { + break + } + batch.rowOffset = i + batch.r = make([][]string, 0, batchSize) + } + if err != nil { + return errors.Wrapf(err, "row %d: reading CSV record", i) + } + // Ignore the first N lines. + if uint32(i) <= c.opts.Skip { + continue + } + if len(record) == c.expectedCols { + // Expected number of columns. + } else if len(record) == c.expectedCols+1 && record[c.expectedCols] == "" { + // Line has the optional trailing comma, ignore the empty field. + record = record[:c.expectedCols] + } else { + return errors.Errorf("row %d: expected %d fields, got %d", i, c.expectedCols, len(record)) + } + batch.r = append(batch.r, record) + } + return nil +} + +type csvRecord struct { + r [][]string + file string + fileIndex int32 + rowOffset int +} + +// convertRecord converts CSV records into KV pairs and sends them on the +// kvCh chan. +func (c *csvInputReader) convertRecord(ctx context.Context, kvCh chan<- []roachpb.KeyValue) error { + done := ctx.Done() + + const kvBatchSize = 1000 + padding := 2 * (len(c.tableDesc.Indexes) + len(c.tableDesc.Families)) + visibleCols := c.tableDesc.VisibleColumns() + + ri, err := sqlbase.MakeRowInserter(nil /* txn */, c.tableDesc, nil, /* fkTables */ + c.tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) + if err != nil { + return errors.Wrap(err, "make row inserter") + } + + var txCtx transform.ExprTransformContext + evalCtx := tree.EvalContext{SessionData: &sessiondata.SessionData{Location: time.UTC}} + // Although we don't yet support DEFAULT expressions on visible columns, + // we do on hidden columns (which is only the default _rowid one). This + // allows those expressions to run. + cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(c.tableDesc.Columns, c.tableDesc, &txCtx, &evalCtx) + if err != nil { + return errors.Wrap(err, "process default columns") + } + + datums := make([]tree.Datum, len(visibleCols), len(cols)) + + // Check for a hidden column. This should be the unique_rowid PK if present. + hidden := -1 + for i, col := range cols { + if col.Hidden { + if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || hidden != -1 { + return errors.New("unexpected hidden column") + } + hidden = i + datums = append(datums, nil) + } + } + if len(datums) != len(cols) { + return errors.New("unexpected hidden column") + } + + kvBatch := make([]roachpb.KeyValue, 0, kvBatchSize+padding) + + computedIVarContainer := sqlbase.RowIndexedVarContainer{ + Mapping: ri.InsertColIDtoRowIndex, + Cols: c.tableDesc.Columns, + } + + for batch := range c.recordCh { + for batchIdx, record := range batch.r { + rowNum := batch.rowOffset + batchIdx + for i, v := range record { + col := visibleCols[i] + if c.opts.NullEncoding != nil && v == *c.opts.NullEncoding { + datums[i] = tree.DNull + } else { + var err error + datums[i], err = tree.ParseDatumStringAs(col.Type.ToDatumType(), v, &evalCtx) + if err != nil { + return errors.Wrapf(err, "%s: row %d: parse %q as %s", batch.file, rowNum, col.Name, col.Type.SQLString()) + } + } + } + if hidden >= 0 { + // We don't want to call unique_rowid() for the hidden PK column because + // it is not idempotent. The sampling from the first stage will be useless + // during the read phase, producing a single range split with all of the + // data. Instead, we will call our own function that mimics that function, + // but more-or-less guarantees that it will not interfere with the numbers + // that will be produced by it. The lower 15 bits mimic the node id, but as + // the CSV file number. The upper 48 bits are the line number and mimic the + // timestamp. It would take a file with many more than 2**32 lines to even + // begin approaching what unique_rowid would return today, so we assume it + // to be safe. Since the timestamp is won't overlap, it is safe to use any + // number in the node id portion. The 15 bits in that portion should account + // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, + // the data is xor'd so the final bits are flipped instead of set. + datums[hidden] = tree.NewDInt(builtins.GenerateUniqueID(batch.fileIndex, uint64(rowNum))) + } + + // TODO(justin): we currently disallow computed columns in import statements. + var computeExprs []tree.TypedExpr + var computedCols []sqlbase.ColumnDescriptor + + row, err := sql.GenerateInsertRow( + defaultExprs, computeExprs, cols, computedCols, evalCtx, c.tableDesc, datums, &computedIVarContainer) + if err != nil { + return errors.Wrapf(err, "generate insert row: %s: row %d", batch.file, rowNum) + } + if err := ri.InsertRow( + ctx, + inserter(func(kv roachpb.KeyValue) { + kv.Value.InitChecksum(kv.Key) + kvBatch = append(kvBatch, kv) + }), + row, + true, /* ignoreConflicts */ + sqlbase.SkipFKs, + false, /* traceKV */ + ); err != nil { + return errors.Wrapf(err, "insert row: %s: row %d", batch.file, rowNum) + } + if len(kvBatch) >= kvBatchSize { + select { + case kvCh <- kvBatch: + case <-done: + return ctx.Err() + } + kvBatch = make([]roachpb.KeyValue, 0, kvBatchSize+padding) + } + } + } + select { + case kvCh <- kvBatch: + case <-done: + return ctx.Err() + } + return nil +} diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 17016d3b298d..3b88d4d54732 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -10,25 +10,18 @@ package importccl import ( "context" - "encoding/csv" "io" "math/rand" - "runtime" "sync" - "time" "golang.org/x/sync/errgroup" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/jobs" - "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" - "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -162,240 +155,6 @@ func (b *byteCounter) Read(p []byte) (int, error) { return n, err } -type csvInputReader struct { - expectedCols int - recordCh chan csvRecord - opts roachpb.CSVOptions - tableDesc *sqlbase.TableDescriptor -} - -func newCSVInputReader( - ctx context.Context, - opts roachpb.CSVOptions, - tableDesc *sqlbase.TableDescriptor, - expectedCols int, -) *csvInputReader { - return &csvInputReader{ - opts: opts, - expectedCols: expectedCols, - tableDesc: tableDesc, - recordCh: make(chan csvRecord), - } -} - -func (c *csvInputReader) start( - ctx context.Context, group *errgroup.Group, kvCh chan []roachpb.KeyValue, -) { - group.Go(func() error { - sCtx, span := tracing.ChildSpan(ctx, "convertcsv") - defer tracing.FinishSpan(span) - - defer close(kvCh) - return groupWorkers(sCtx, runtime.NumCPU(), func(ctx context.Context) error { - return c.convertRecord(ctx, kvCh) - }) - }) -} - -func (c *csvInputReader) inputFinished() { - close(c.recordCh) -} - -func (c *csvInputReader) readFile( - ctx context.Context, - input io.Reader, - inputIdx int32, - inputName string, - progressFn func(finished bool) error, -) error { - done := ctx.Done() - cr := csv.NewReader(input) - if c.opts.Comma != 0 { - cr.Comma = c.opts.Comma - } - cr.FieldsPerRecord = -1 - cr.LazyQuotes = true - cr.Comment = c.opts.Comment - - const batchSize = 500 - - batch := csvRecord{ - file: inputName, - fileIndex: inputIdx, - rowOffset: 1, - r: make([][]string, 0, batchSize), - } - - var count int64 - for i := 1; ; i++ { - record, err := cr.Read() - if err == io.EOF || len(batch.r) >= batchSize { - // if the batch isn't empty, we need to flush it. - if len(batch.r) > 0 { - select { - case <-done: - return ctx.Err() - case c.recordCh <- batch: - count += int64(len(batch.r)) - } - } - if progressErr := progressFn(err == io.EOF); progressErr != nil { - return progressErr - } - if err == io.EOF { - break - } - batch.rowOffset = i - batch.r = make([][]string, 0, batchSize) - } - if err != nil { - return errors.Wrapf(err, "row %d: reading CSV record", i) - } - // Ignore the first N lines. - if uint32(i) <= c.opts.Skip { - continue - } - if len(record) == c.expectedCols { - // Expected number of columns. - } else if len(record) == c.expectedCols+1 && record[c.expectedCols] == "" { - // Line has the optional trailing comma, ignore the empty field. - record = record[:c.expectedCols] - } else { - return errors.Errorf("row %d: expected %d fields, got %d", i, c.expectedCols, len(record)) - } - batch.r = append(batch.r, record) - } - return nil -} - -type csvRecord struct { - r [][]string - file string - fileIndex int32 - rowOffset int -} - -// convertRecord converts CSV records into KV pairs and sends them on the -// kvCh chan. -func (c *csvInputReader) convertRecord(ctx context.Context, kvCh chan<- []roachpb.KeyValue) error { - done := ctx.Done() - - const kvBatchSize = 1000 - padding := 2 * (len(c.tableDesc.Indexes) + len(c.tableDesc.Families)) - visibleCols := c.tableDesc.VisibleColumns() - - ri, err := sqlbase.MakeRowInserter(nil /* txn */, c.tableDesc, nil, /* fkTables */ - c.tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) - if err != nil { - return errors.Wrap(err, "make row inserter") - } - - var txCtx transform.ExprTransformContext - evalCtx := tree.EvalContext{SessionData: &sessiondata.SessionData{Location: time.UTC}} - // Although we don't yet support DEFAULT expressions on visible columns, - // we do on hidden columns (which is only the default _rowid one). This - // allows those expressions to run. - cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(c.tableDesc.Columns, c.tableDesc, &txCtx, &evalCtx) - if err != nil { - return errors.Wrap(err, "process default columns") - } - - datums := make([]tree.Datum, len(visibleCols), len(cols)) - - // Check for a hidden column. This should be the unique_rowid PK if present. - hidden := -1 - for i, col := range cols { - if col.Hidden { - if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || hidden != -1 { - return errors.New("unexpected hidden column") - } - hidden = i - datums = append(datums, nil) - } - } - if len(datums) != len(cols) { - return errors.New("unexpected hidden column") - } - - kvBatch := make([]roachpb.KeyValue, 0, kvBatchSize+padding) - - computedIVarContainer := sqlbase.RowIndexedVarContainer{ - Mapping: ri.InsertColIDtoRowIndex, - Cols: c.tableDesc.Columns, - } - - for batch := range c.recordCh { - for batchIdx, record := range batch.r { - rowNum := batch.rowOffset + batchIdx - for i, v := range record { - col := visibleCols[i] - if c.opts.NullEncoding != nil && v == *c.opts.NullEncoding { - datums[i] = tree.DNull - } else { - var err error - datums[i], err = tree.ParseDatumStringAs(col.Type.ToDatumType(), v, &evalCtx) - if err != nil { - return errors.Wrapf(err, "%s: row %d: parse %q as %s", batch.file, rowNum, col.Name, col.Type.SQLString()) - } - } - } - if hidden >= 0 { - // We don't want to call unique_rowid() for the hidden PK column because - // it is not idempotent. The sampling from the first stage will be useless - // during the read phase, producing a single range split with all of the - // data. Instead, we will call our own function that mimics that function, - // but more-or-less guarantees that it will not interfere with the numbers - // that will be produced by it. The lower 15 bits mimic the node id, but as - // the CSV file number. The upper 48 bits are the line number and mimic the - // timestamp. It would take a file with many more than 2**32 lines to even - // begin approaching what unique_rowid would return today, so we assume it - // to be safe. Since the timestamp is won't overlap, it is safe to use any - // number in the node id portion. The 15 bits in that portion should account - // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, - // the data is xor'd so the final bits are flipped instead of set. - datums[hidden] = tree.NewDInt(builtins.GenerateUniqueID(batch.fileIndex, uint64(rowNum))) - } - - // TODO(justin): we currently disallow computed columns in import statements. - var computeExprs []tree.TypedExpr - var computedCols []sqlbase.ColumnDescriptor - - row, err := sql.GenerateInsertRow( - defaultExprs, computeExprs, cols, computedCols, evalCtx, c.tableDesc, datums, &computedIVarContainer) - if err != nil { - return errors.Wrapf(err, "generate insert row: %s: row %d", batch.file, rowNum) - } - if err := ri.InsertRow( - ctx, - inserter(func(kv roachpb.KeyValue) { - kv.Value.InitChecksum(kv.Key) - kvBatch = append(kvBatch, kv) - }), - row, - true, /* ignoreConflicts */ - sqlbase.SkipFKs, - false, /* traceKV */ - ); err != nil { - return errors.Wrapf(err, "insert row: %s: row %d", batch.file, rowNum) - } - if len(kvBatch) >= kvBatchSize { - select { - case kvCh <- kvBatch: - case <-done: - return ctx.Err() - } - kvBatch = make([]roachpb.KeyValue, 0, kvBatchSize+padding) - } - } - } - select { - case kvCh <- kvBatch: - case <-done: - return ctx.Err() - } - return nil -} - var csvOutputTypes = []sqlbase.ColumnType{ {SemanticType: sqlbase.ColumnType_BYTES}, {SemanticType: sqlbase.ColumnType_BYTES}, From 374849dc605225fd457544ebfc1aa017c1cafd3f Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 9 May 2018 16:26:50 +0000 Subject: [PATCH 5/6] importccl: extract datum-to-kv helper from csv logic Release note: none. --- pkg/ccl/importccl/import_stmt_test.go | 3 +- pkg/ccl/importccl/read_import_csv.go | 217 ++++++++++++++++---------- pkg/ccl/importccl/read_import_proc.go | 2 +- 3 files changed, 135 insertions(+), 87 deletions(-) diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index d740fafa82cc..c9652835c300 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -26,7 +26,6 @@ import ( "golang.org/x/sync/errgroup" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" @@ -697,7 +696,7 @@ func BenchmarkConvertRecord(b *testing.B) { b.Fatal(err) } recordCh := make(chan csvRecord) - kvCh := make(chan []roachpb.KeyValue) + kvCh := make(chan kvBatch) group := errgroup.Group{} // no-op drain kvs channel. diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index 8857c584f402..0bcdb5382c96 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -15,11 +15,11 @@ import ( "runtime" "time" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "golang.org/x/sync/errgroup" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -49,9 +49,7 @@ func newCSVInputReader( } } -func (c *csvInputReader) start( - ctx context.Context, group *errgroup.Group, kvCh chan []roachpb.KeyValue, -) { +func (c *csvInputReader) start(ctx context.Context, group *errgroup.Group, kvCh chan kvBatch) { group.Go(func() error { sCtx, span := tracing.ChildSpan(ctx, "convertcsv") defer tracing.FinishSpan(span) @@ -141,123 +139,174 @@ type csvRecord struct { rowOffset int } -// convertRecord converts CSV records into KV pairs and sends them on the -// kvCh chan. -func (c *csvInputReader) convertRecord(ctx context.Context, kvCh chan<- []roachpb.KeyValue) error { - done := ctx.Done() +type kvBatch []roachpb.KeyValue + +type rowConverter struct { + // stored ctx allows caching a matching Done channel between calls. + ctx context.Context + done <-chan struct{} + + // current row buf + datums []tree.Datum + + // kv destination and current batch + kvCh chan<- kvBatch + kvBatch kvBatch + batchCap int + + tableDesc *sqlbase.TableDescriptor + + // The rest of these are derived from tableDesc, just cached here. + hidden int + ri sqlbase.RowInserter + evalCtx tree.EvalContext + cols []sqlbase.ColumnDescriptor + visibleCols []sqlbase.ColumnDescriptor + defaultExprs []tree.TypedExpr + computedIVarContainer sqlbase.RowIndexedVarContainer +} + +const kvBatchSize = 1000 - const kvBatchSize = 1000 - padding := 2 * (len(c.tableDesc.Indexes) + len(c.tableDesc.Families)) - visibleCols := c.tableDesc.VisibleColumns() +func newRowConverter( + ctx context.Context, tableDesc *sqlbase.TableDescriptor, kvCh chan<- kvBatch, +) (*rowConverter, error) { + c := &rowConverter{ctx: ctx, done: ctx.Done(), tableDesc: tableDesc, kvCh: kvCh} - ri, err := sqlbase.MakeRowInserter(nil /* txn */, c.tableDesc, nil, /* fkTables */ - c.tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) + ri, err := sqlbase.MakeRowInserter(nil /* txn */, tableDesc, nil, /* fkTables */ + tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) if err != nil { - return errors.Wrap(err, "make row inserter") + return nil, errors.Wrap(err, "make row inserter") } + c.ri = ri var txCtx transform.ExprTransformContext evalCtx := tree.EvalContext{SessionData: &sessiondata.SessionData{Location: time.UTC}} // Although we don't yet support DEFAULT expressions on visible columns, // we do on hidden columns (which is only the default _rowid one). This // allows those expressions to run. - cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(c.tableDesc.Columns, c.tableDesc, &txCtx, &evalCtx) + cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(tableDesc.Columns, tableDesc, &txCtx, &evalCtx) if err != nil { - return errors.Wrap(err, "process default columns") + return nil, errors.Wrap(err, "process default columns") } + c.cols = cols + c.defaultExprs = defaultExprs - datums := make([]tree.Datum, len(visibleCols), len(cols)) + c.visibleCols = tableDesc.VisibleColumns() + c.datums = make([]tree.Datum, len(c.visibleCols), len(cols)) // Check for a hidden column. This should be the unique_rowid PK if present. - hidden := -1 + c.hidden = -1 for i, col := range cols { if col.Hidden { - if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || hidden != -1 { - return errors.New("unexpected hidden column") + if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || c.hidden != -1 { + return nil, errors.New("unexpected hidden column") } - hidden = i - datums = append(datums, nil) + c.hidden = i + c.datums = append(c.datums, nil) } } - if len(datums) != len(cols) { - return errors.New("unexpected hidden column") + if len(c.datums) != len(cols) { + return nil, errors.New("unexpected hidden column") } - kvBatch := make([]roachpb.KeyValue, 0, kvBatchSize+padding) + padding := 2 * (len(tableDesc.Indexes) + len(tableDesc.Families)) + c.batchCap = kvBatchSize + padding + c.kvBatch = make(kvBatch, 0, c.batchCap) - computedIVarContainer := sqlbase.RowIndexedVarContainer{ + c.computedIVarContainer = sqlbase.RowIndexedVarContainer{ Mapping: ri.InsertColIDtoRowIndex, - Cols: c.tableDesc.Columns, + Cols: tableDesc.Columns, + } + return c, nil +} + +func (c *rowConverter) row(fileIndex int32, rowIndex int64) error { + if c.hidden >= 0 { + // We don't want to call unique_rowid() for the hidden PK column because + // it is not idempotent. The sampling from the first stage will be useless + // during the read phase, producing a single range split with all of the + // data. Instead, we will call our own function that mimics that function, + // but more-or-less guarantees that it will not interfere with the numbers + // that will be produced by it. The lower 15 bits mimic the node id, but as + // the CSV file number. The upper 48 bits are the line number and mimic the + // timestamp. It would take a file with many more than 2**32 lines to even + // begin approaching what unique_rowid would return today, so we assume it + // to be safe. Since the timestamp is won't overlap, it is safe to use any + // number in the node id portion. The 15 bits in that portion should account + // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, + // the data is xor'd so the final bits are flipped instead of set. + c.datums[c.hidden] = tree.NewDInt(builtins.GenerateUniqueID(fileIndex, uint64(rowIndex))) + } + + // TODO(justin): we currently disallow computed columns in import statements. + var computeExprs []tree.TypedExpr + var computedCols []sqlbase.ColumnDescriptor + + row, err := sql.GenerateInsertRow( + c.defaultExprs, computeExprs, c.cols, computedCols, c.evalCtx, c.tableDesc, c.datums, &c.computedIVarContainer) + if err != nil { + return errors.Wrapf(err, "generate insert row") + } + if err := c.ri.InsertRow( + c.ctx, + inserter(func(kv roachpb.KeyValue) { + kv.Value.InitChecksum(kv.Key) + c.kvBatch = append(c.kvBatch, kv) + }), + row, + true, /* ignoreConflicts */ + sqlbase.SkipFKs, + false, /* traceKV */ + ); err != nil { + return errors.Wrapf(err, "insert row") + } + // If our batch is full, flush it and start a new one. + if len(c.kvBatch) >= kvBatchSize { + if err := c.sendBatch(); err != nil { + return err + } + } + return nil +} + +func (c *rowConverter) sendBatch() error { + select { + case c.kvCh <- c.kvBatch: + case <-c.done: + return c.ctx.Err() + } + c.kvBatch = make(kvBatch, 0, c.batchCap) + return nil +} + +// convertRecord converts CSV records into KV pairs and sends them on the +// kvCh chan. +func (c *csvInputReader) convertRecord(ctx context.Context, kvCh chan<- kvBatch) error { + conv, err := newRowConverter(ctx, c.tableDesc, kvCh) + if err != nil { + return err } for batch := range c.recordCh { for batchIdx, record := range batch.r { - rowNum := batch.rowOffset + batchIdx + rowNum := int64(batch.rowOffset + batchIdx) for i, v := range record { - col := visibleCols[i] + col := conv.visibleCols[i] if c.opts.NullEncoding != nil && v == *c.opts.NullEncoding { - datums[i] = tree.DNull + conv.datums[i] = tree.DNull } else { var err error - datums[i], err = tree.ParseDatumStringAs(col.Type.ToDatumType(), v, &evalCtx) + conv.datums[i], err = tree.ParseDatumStringAs(col.Type.ToDatumType(), v, &conv.evalCtx) if err != nil { return errors.Wrapf(err, "%s: row %d: parse %q as %s", batch.file, rowNum, col.Name, col.Type.SQLString()) } } } - if hidden >= 0 { - // We don't want to call unique_rowid() for the hidden PK column because - // it is not idempotent. The sampling from the first stage will be useless - // during the read phase, producing a single range split with all of the - // data. Instead, we will call our own function that mimics that function, - // but more-or-less guarantees that it will not interfere with the numbers - // that will be produced by it. The lower 15 bits mimic the node id, but as - // the CSV file number. The upper 48 bits are the line number and mimic the - // timestamp. It would take a file with many more than 2**32 lines to even - // begin approaching what unique_rowid would return today, so we assume it - // to be safe. Since the timestamp is won't overlap, it is safe to use any - // number in the node id portion. The 15 bits in that portion should account - // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, - // the data is xor'd so the final bits are flipped instead of set. - datums[hidden] = tree.NewDInt(builtins.GenerateUniqueID(batch.fileIndex, uint64(rowNum))) - } - - // TODO(justin): we currently disallow computed columns in import statements. - var computeExprs []tree.TypedExpr - var computedCols []sqlbase.ColumnDescriptor - - row, err := sql.GenerateInsertRow( - defaultExprs, computeExprs, cols, computedCols, evalCtx, c.tableDesc, datums, &computedIVarContainer) - if err != nil { - return errors.Wrapf(err, "generate insert row: %s: row %d", batch.file, rowNum) - } - if err := ri.InsertRow( - ctx, - inserter(func(kv roachpb.KeyValue) { - kv.Value.InitChecksum(kv.Key) - kvBatch = append(kvBatch, kv) - }), - row, - true, /* ignoreConflicts */ - sqlbase.SkipFKs, - false, /* traceKV */ - ); err != nil { - return errors.Wrapf(err, "insert row: %s: row %d", batch.file, rowNum) - } - if len(kvBatch) >= kvBatchSize { - select { - case kvCh <- kvBatch: - case <-done: - return ctx.Err() - } - kvBatch = make([]roachpb.KeyValue, 0, kvBatchSize+padding) + if err := conv.row(batch.fileIndex, rowNum); err != nil { + return errors.Wrapf(err, "converting row: %s: row %d", batch.file, rowNum) } } } - select { - case kvCh <- kvBatch: - case <-done: - return ctx.Err() - } - return nil + return conv.sendBatch() } diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 3b88d4d54732..9587bc379784 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -215,7 +215,7 @@ func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) group, gCtx := errgroup.WithContext(ctx) done := gCtx.Done() - kvCh := make(chan []roachpb.KeyValue) + kvCh := make(chan kvBatch) sampleCh := make(chan sqlbase.EncDatumRow) c := newCSVInputReader(ctx, cp.inputFromat.Csv, &cp.tableDesc, len(cp.tableDesc.VisibleColumns())) From 325258d8b865ca7d33c1842c8956c7300bb55ac9 Mon Sep 17 00:00:00 2001 From: David Taylor Date: Wed, 9 May 2018 16:42:18 +0000 Subject: [PATCH 6/6] importccl: move row converter helper out of csv file Release note: none. --- pkg/ccl/importccl/read_import_csv.go | 151 +---------------- pkg/ccl/importccl/read_import_proc.go | 227 +++++++++++++++++++++----- 2 files changed, 190 insertions(+), 188 deletions(-) diff --git a/pkg/ccl/importccl/read_import_csv.go b/pkg/ccl/importccl/read_import_csv.go index 0bcdb5382c96..83a4f232a548 100644 --- a/pkg/ccl/importccl/read_import_csv.go +++ b/pkg/ccl/importccl/read_import_csv.go @@ -13,16 +13,11 @@ import ( "encoding/csv" "io" "runtime" - "time" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "golang.org/x/sync/errgroup" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/pkg/errors" @@ -36,10 +31,7 @@ type csvInputReader struct { } func newCSVInputReader( - ctx context.Context, - opts roachpb.CSVOptions, - tableDesc *sqlbase.TableDescriptor, - expectedCols int, + opts roachpb.CSVOptions, tableDesc *sqlbase.TableDescriptor, expectedCols int, ) *csvInputReader { return &csvInputReader{ opts: opts, @@ -139,147 +131,6 @@ type csvRecord struct { rowOffset int } -type kvBatch []roachpb.KeyValue - -type rowConverter struct { - // stored ctx allows caching a matching Done channel between calls. - ctx context.Context - done <-chan struct{} - - // current row buf - datums []tree.Datum - - // kv destination and current batch - kvCh chan<- kvBatch - kvBatch kvBatch - batchCap int - - tableDesc *sqlbase.TableDescriptor - - // The rest of these are derived from tableDesc, just cached here. - hidden int - ri sqlbase.RowInserter - evalCtx tree.EvalContext - cols []sqlbase.ColumnDescriptor - visibleCols []sqlbase.ColumnDescriptor - defaultExprs []tree.TypedExpr - computedIVarContainer sqlbase.RowIndexedVarContainer -} - -const kvBatchSize = 1000 - -func newRowConverter( - ctx context.Context, tableDesc *sqlbase.TableDescriptor, kvCh chan<- kvBatch, -) (*rowConverter, error) { - c := &rowConverter{ctx: ctx, done: ctx.Done(), tableDesc: tableDesc, kvCh: kvCh} - - ri, err := sqlbase.MakeRowInserter(nil /* txn */, tableDesc, nil, /* fkTables */ - tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) - if err != nil { - return nil, errors.Wrap(err, "make row inserter") - } - c.ri = ri - - var txCtx transform.ExprTransformContext - evalCtx := tree.EvalContext{SessionData: &sessiondata.SessionData{Location: time.UTC}} - // Although we don't yet support DEFAULT expressions on visible columns, - // we do on hidden columns (which is only the default _rowid one). This - // allows those expressions to run. - cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(tableDesc.Columns, tableDesc, &txCtx, &evalCtx) - if err != nil { - return nil, errors.Wrap(err, "process default columns") - } - c.cols = cols - c.defaultExprs = defaultExprs - - c.visibleCols = tableDesc.VisibleColumns() - c.datums = make([]tree.Datum, len(c.visibleCols), len(cols)) - - // Check for a hidden column. This should be the unique_rowid PK if present. - c.hidden = -1 - for i, col := range cols { - if col.Hidden { - if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || c.hidden != -1 { - return nil, errors.New("unexpected hidden column") - } - c.hidden = i - c.datums = append(c.datums, nil) - } - } - if len(c.datums) != len(cols) { - return nil, errors.New("unexpected hidden column") - } - - padding := 2 * (len(tableDesc.Indexes) + len(tableDesc.Families)) - c.batchCap = kvBatchSize + padding - c.kvBatch = make(kvBatch, 0, c.batchCap) - - c.computedIVarContainer = sqlbase.RowIndexedVarContainer{ - Mapping: ri.InsertColIDtoRowIndex, - Cols: tableDesc.Columns, - } - return c, nil -} - -func (c *rowConverter) row(fileIndex int32, rowIndex int64) error { - if c.hidden >= 0 { - // We don't want to call unique_rowid() for the hidden PK column because - // it is not idempotent. The sampling from the first stage will be useless - // during the read phase, producing a single range split with all of the - // data. Instead, we will call our own function that mimics that function, - // but more-or-less guarantees that it will not interfere with the numbers - // that will be produced by it. The lower 15 bits mimic the node id, but as - // the CSV file number. The upper 48 bits are the line number and mimic the - // timestamp. It would take a file with many more than 2**32 lines to even - // begin approaching what unique_rowid would return today, so we assume it - // to be safe. Since the timestamp is won't overlap, it is safe to use any - // number in the node id portion. The 15 bits in that portion should account - // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, - // the data is xor'd so the final bits are flipped instead of set. - c.datums[c.hidden] = tree.NewDInt(builtins.GenerateUniqueID(fileIndex, uint64(rowIndex))) - } - - // TODO(justin): we currently disallow computed columns in import statements. - var computeExprs []tree.TypedExpr - var computedCols []sqlbase.ColumnDescriptor - - row, err := sql.GenerateInsertRow( - c.defaultExprs, computeExprs, c.cols, computedCols, c.evalCtx, c.tableDesc, c.datums, &c.computedIVarContainer) - if err != nil { - return errors.Wrapf(err, "generate insert row") - } - if err := c.ri.InsertRow( - c.ctx, - inserter(func(kv roachpb.KeyValue) { - kv.Value.InitChecksum(kv.Key) - c.kvBatch = append(c.kvBatch, kv) - }), - row, - true, /* ignoreConflicts */ - sqlbase.SkipFKs, - false, /* traceKV */ - ); err != nil { - return errors.Wrapf(err, "insert row") - } - // If our batch is full, flush it and start a new one. - if len(c.kvBatch) >= kvBatchSize { - if err := c.sendBatch(); err != nil { - return err - } - } - return nil -} - -func (c *rowConverter) sendBatch() error { - select { - case c.kvCh <- c.kvBatch: - case <-c.done: - return c.ctx.Err() - } - c.kvBatch = make(kvBatch, 0, c.batchCap) - return nil -} - // convertRecord converts CSV records into KV pairs and sends them on the // kvCh chan. func (c *csvInputReader) convertRecord(ctx context.Context, kvCh chan<- kvBatch) error { diff --git a/pkg/ccl/importccl/read_import_proc.go b/pkg/ccl/importccl/read_import_proc.go index 9587bc379784..addd28419f2b 100644 --- a/pkg/ccl/importccl/read_import_proc.go +++ b/pkg/ccl/importccl/read_import_proc.go @@ -13,7 +13,12 @@ import ( "io" "math/rand" "sync" + "time" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" + "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "golang.org/x/sync/errgroup" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" @@ -98,47 +103,52 @@ func readInputFiles( return ctx.Err() default: } - conf, err := storageccl.ExportStorageConfFromURI(dataFile) - if err != nil { - return err - } - es, err := storageccl.MakeExportStorage(ctx, conf, settings) - if err != nil { - return err - } - defer es.Close() - f, err := es.ReadFile(ctx, "") - if err != nil { - return err - } - defer f.Close() - bc := byteCounter{r: f} - - wrappedProgressFn := func(finished bool) error { return nil } - if updateFromBytes { - const progressBytes = 100 << 20 - wrappedProgressFn = func(finished bool) error { - // progressBytes is the number of read bytes at which to report job progress. A - // low value may cause excessive updates in the job table which can lead to - // very large rows due to MVCC saving each version. - if finished || bc.n > progressBytes { - readBytes += bc.n - bc.n = 0 - if err := progressFn(float32(readBytes) / float32(totalBytes)); err != nil { - return err + if err := func() error { + conf, err := storageccl.ExportStorageConfFromURI(dataFile) + if err != nil { + return err + } + es, err := storageccl.MakeExportStorage(ctx, conf, settings) + if err != nil { + return err + } + defer es.Close() + f, err := es.ReadFile(ctx, "") + if err != nil { + return err + } + defer f.Close() + bc := byteCounter{r: f} + + wrappedProgressFn := func(finished bool) error { return nil } + if updateFromBytes { + const progressBytes = 100 << 20 + wrappedProgressFn = func(finished bool) error { + // progressBytes is the number of read bytes at which to report job progress. A + // low value may cause excessive updates in the job table which can lead to + // very large rows due to MVCC saving each version. + if finished || bc.n > progressBytes { + readBytes += bc.n + bc.n = 0 + if err := progressFn(float32(readBytes) / float32(totalBytes)); err != nil { + return err + } } + return nil } - return nil } - } - if err := fileFunc(ctx, &bc, dataFileIndex, dataFile, wrappedProgressFn); err != nil { - return errors.Wrap(err, dataFile) - } - if updateFromFiles { - if err := progressFn(float32(currentFile) / float32(len(dataFiles))); err != nil { - return err + if err := fileFunc(ctx, &bc, dataFileIndex, dataFile, wrappedProgressFn); err != nil { + return errors.Wrap(err, dataFile) } + if updateFromFiles { + if err := progressFn(float32(currentFile) / float32(len(dataFiles))); err != nil { + return err + } + } + return nil + }(); err != nil { + return err } } return nil @@ -155,6 +165,147 @@ func (b *byteCounter) Read(p []byte) (int, error) { return n, err } +type kvBatch []roachpb.KeyValue + +type rowConverter struct { + // stored ctx allows caching a matching Done channel between calls. + ctx context.Context + done <-chan struct{} + + // current row buf + datums []tree.Datum + + // kv destination and current batch + kvCh chan<- kvBatch + kvBatch kvBatch + batchCap int + + tableDesc *sqlbase.TableDescriptor + + // The rest of these are derived from tableDesc, just cached here. + hidden int + ri sqlbase.RowInserter + evalCtx tree.EvalContext + cols []sqlbase.ColumnDescriptor + visibleCols []sqlbase.ColumnDescriptor + defaultExprs []tree.TypedExpr + computedIVarContainer sqlbase.RowIndexedVarContainer +} + +const kvBatchSize = 1000 + +func newRowConverter( + ctx context.Context, tableDesc *sqlbase.TableDescriptor, kvCh chan<- kvBatch, +) (*rowConverter, error) { + c := &rowConverter{ctx: ctx, done: ctx.Done(), tableDesc: tableDesc, kvCh: kvCh} + + ri, err := sqlbase.MakeRowInserter(nil /* txn */, tableDesc, nil, /* fkTables */ + tableDesc.Columns, false /* checkFKs */, &sqlbase.DatumAlloc{}) + if err != nil { + return nil, errors.Wrap(err, "make row inserter") + } + c.ri = ri + + var txCtx transform.ExprTransformContext + evalCtx := tree.EvalContext{SessionData: &sessiondata.SessionData{Location: time.UTC}} + // Although we don't yet support DEFAULT expressions on visible columns, + // we do on hidden columns (which is only the default _rowid one). This + // allows those expressions to run. + cols, defaultExprs, err := sqlbase.ProcessDefaultColumns(tableDesc.Columns, tableDesc, &txCtx, &evalCtx) + if err != nil { + return nil, errors.Wrap(err, "process default columns") + } + c.cols = cols + c.defaultExprs = defaultExprs + + c.visibleCols = tableDesc.VisibleColumns() + c.datums = make([]tree.Datum, len(c.visibleCols), len(cols)) + + // Check for a hidden column. This should be the unique_rowid PK if present. + c.hidden = -1 + for i, col := range cols { + if col.Hidden { + if col.DefaultExpr == nil || *col.DefaultExpr != "unique_rowid()" || c.hidden != -1 { + return nil, errors.New("unexpected hidden column") + } + c.hidden = i + c.datums = append(c.datums, nil) + } + } + if len(c.datums) != len(cols) { + return nil, errors.New("unexpected hidden column") + } + + padding := 2 * (len(tableDesc.Indexes) + len(tableDesc.Families)) + c.batchCap = kvBatchSize + padding + c.kvBatch = make(kvBatch, 0, c.batchCap) + + c.computedIVarContainer = sqlbase.RowIndexedVarContainer{ + Mapping: ri.InsertColIDtoRowIndex, + Cols: tableDesc.Columns, + } + return c, nil +} + +func (c *rowConverter) row(fileIndex int32, rowIndex int64) error { + if c.hidden >= 0 { + // We don't want to call unique_rowid() for the hidden PK column because + // it is not idempotent. The sampling from the first stage will be useless + // during the read phase, producing a single range split with all of the + // data. Instead, we will call our own function that mimics that function, + // but more-or-less guarantees that it will not interfere with the numbers + // that will be produced by it. The lower 15 bits mimic the node id, but as + // the CSV file number. The upper 48 bits are the line number and mimic the + // timestamp. It would take a file with many more than 2**32 lines to even + // begin approaching what unique_rowid would return today, so we assume it + // to be safe. Since the timestamp is won't overlap, it is safe to use any + // number in the node id portion. The 15 bits in that portion should account + // for up to 32k CSV files in a single IMPORT. In the case of > 32k files, + // the data is xor'd so the final bits are flipped instead of set. + c.datums[c.hidden] = tree.NewDInt(builtins.GenerateUniqueID(fileIndex, uint64(rowIndex))) + } + + // TODO(justin): we currently disallow computed columns in import statements. + var computeExprs []tree.TypedExpr + var computedCols []sqlbase.ColumnDescriptor + + row, err := sql.GenerateInsertRow( + c.defaultExprs, computeExprs, c.cols, computedCols, c.evalCtx, c.tableDesc, c.datums, &c.computedIVarContainer) + if err != nil { + return errors.Wrapf(err, "generate insert row") + } + if err := c.ri.InsertRow( + c.ctx, + inserter(func(kv roachpb.KeyValue) { + kv.Value.InitChecksum(kv.Key) + c.kvBatch = append(c.kvBatch, kv) + }), + row, + true, /* ignoreConflicts */ + sqlbase.SkipFKs, + false, /* traceKV */ + ); err != nil { + return errors.Wrapf(err, "insert row") + } + // If our batch is full, flush it and start a new one. + if len(c.kvBatch) >= kvBatchSize { + if err := c.sendBatch(); err != nil { + return err + } + } + return nil +} + +func (c *rowConverter) sendBatch() error { + select { + case c.kvCh <- c.kvBatch: + case <-c.done: + return c.ctx.Err() + } + c.kvBatch = make(kvBatch, 0, c.batchCap) + return nil +} + var csvOutputTypes = []sqlbase.ColumnType{ {SemanticType: sqlbase.ColumnType_BYTES}, {SemanticType: sqlbase.ColumnType_BYTES}, @@ -218,8 +369,8 @@ func (cp *readImportDataProcessor) Run(ctx context.Context, wg *sync.WaitGroup) kvCh := make(chan kvBatch) sampleCh := make(chan sqlbase.EncDatumRow) - c := newCSVInputReader(ctx, cp.inputFromat.Csv, &cp.tableDesc, len(cp.tableDesc.VisibleColumns())) - c.start(ctx, group, kvCh) + c := newCSVInputReader(cp.inputFromat.Csv, &cp.tableDesc, len(cp.tableDesc.VisibleColumns())) + c.start(gCtx, group, kvCh) // Read input files into kvs group.Go(func() error {