Skip to content

Commit

Permalink
Merge pull request #411 from yokofly/feature/allow-direct-insert
Browse files Browse the repository at this point in the history
Feature/allow direct insert
  • Loading branch information
flarco authored Oct 24, 2024
2 parents fd10903 + 93c855f commit e7486ac
Showing 1 changed file with 136 additions and 0 deletions.
136 changes: 136 additions & 0 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
return 0, err
}

allowDirectInsert := cast.ToBool(os.Getenv("SLING_ALLOW_DIRECT_INSERT"))

if allowDirectInsert {
return t.writeDirectly(cfg, df, tgtConn)
}

// Initialize target and temp tables
targetTable, err := initializeTargetTable(cfg, tgtConn)
if err != nil {
Expand Down Expand Up @@ -345,6 +351,136 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
return cnt, nil
}

func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error) {
// Initialize target table
targetTable, err := initializeTargetTable(cfg, tgtConn)
if err != nil {
return 0, err
}

// Ensure schema exists
if err := ensureSchemaExists(tgtConn, targetTable.Schema); err != nil {
return 0, err
}

// Pause dataflow to set up DDL and handlers
if paused := df.Pause(); !paused {
err = g.Error(err, "could not pause streams to infer columns")
return 0, err
}

// Prepare dataflow
sampleData, err := prepareDataflow(t, df, tgtConn)
if err != nil {
return 0, err
}

// Set table keys
if err := targetTable.SetKeys(cfg.Source.PrimaryKey(), cfg.Source.UpdateKey, cfg.Target.Options.TableKeys); err != nil {
err = g.Error(err, "could not set keys for "+targetTable.FullName())
return 0, err
}

// Execute pre-SQL
if err := executeSQL(t, tgtConn, cfg.Target.Options.PreSQL, "pre"); err != nil {
return cnt, err
}

// Create final table
if err := createTable(t, tgtConn, targetTable, sampleData, false); err != nil {
return 0, err
}

cfg.Target.Columns = sampleData.Columns
df.Columns = sampleData.Columns
setStage("Direct Insert - prepare-final")

// Begin transaction for final table operations
txOptions := determineTxOptions(tgtConn.GetType())
if err := tgtConn.BeginContext(df.Context.Ctx, &txOptions); err != nil {
err = g.Error(err, "could not open transaction to write to final table")
return 0, err
}

defer tgtConn.Rollback()

// Configure column handlers (if applicable)
if err := configureColumnHandlers(t, cfg, df, tgtConn, targetTable); err != nil {
return 0, err
}

df.Unpause() // Resume dataflow
t.SetProgress("streaming data directly into final table")

// Set batch limit if specified
if batchLimit := cfg.Target.Options.BatchLimit; batchLimit != nil {
df.SetBatchLimit(*batchLimit)
}

// Bulk import data directly into final table
cnt, err = tgtConn.BulkImportFlow(targetTable.FullName(), df)
if err != nil {
tgtConn.Rollback()
err = g.Error(err, "could not insert into "+targetTable.FullName())
return 0, err
}

// Validate data
tCnt, err := tgtConn.GetCount(targetTable.FullName())
if err != nil {
err = g.Error(err, "could not get count from final table %s", targetTable.FullName())
return 0, err
}
if cnt != tCnt {
err = g.Error(err, "inserted into final table but table count (%d) != stream count (%d). Records missing/mismatch. Aborting", tCnt, cnt)
return 0, err
} else if tCnt == 0 && len(sampleData.Rows) > 0 {
err = g.Error(err, "Loaded 0 records while sample data has %d records. Exiting.", len(sampleData.Rows))
return 0, err
}

// Handle empty data case
if cnt == 0 && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY_TABLES")) && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY")) {
g.Warn("No data or records found in stream. Nothing to do. To allow Sling to create empty tables, set SLING_ALLOW_EMPTY=TRUE")
return 0, nil
} else if cnt > 0 {
// FIXME: find root cause of why columns don't sync while streaming
df.SyncColumns()

// Aggregate stats from stream processors
df.Inferred = !cfg.sourceIsFile() // Re-infer if source is file
df.SyncStats()

// Checksum Comparison, data quality. Limit to env var SLING_CHECKSUM_ROWS, cause sums get too high
if val := cast.ToUint64(os.Getenv("SLING_CHECKSUM_ROWS")); val > 0 && df.Count() <= val {
err = tgtConn.CompareChecksums(targetTable.FullName(), df.Columns)
if err != nil {
return
}
}
}

// Commit final transaction
if err := tgtConn.Commit(); err != nil {
err = g.Error(err, "could not commit final transaction")
return 0, err
}

// Execute post-SQL
if err := executeSQL(t, tgtConn, cfg.Target.Options.PostSQL, "post"); err != nil {
return cnt, err
}

// Finalize progress
if err := df.Err(); err != nil {
setStage("6 - closing")
return cnt, err
}

setStage("6 - closing")
return cnt, nil
}

func determineTxOptions(dbType dbio.Type) sql.TxOptions {
switch dbType {
case dbio.TypeDbSnowflake, dbio.TypeDbDuckDb:
Expand Down

0 comments on commit e7486ac

Please sign in to comment.