Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci-bats-macos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ jobs:
- name: Install Maven
working-directory: ./.ci_bin
run: |
curl -LO https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.9.9-bin.tar.gz
tar -xf apache-maven-3.9.9-bin.tar.gz
echo "$(pwd)/apache-maven-3.9.9/bin" >> $GITHUB_PATH
curl -LO https://dlcdn.apache.org/maven/maven-3/3.9.10/binaries/apache-maven-3.9.10-bin.tar.gz
tar -xf apache-maven-3.9.10-bin.tar.gz
echo "$(pwd)/apache-maven-3.9.10/bin" >> $GITHUB_PATH
- name: Install Hadoop
working-directory: ./.ci_bin
run: |
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ci-bats-unix-remote.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ jobs:
if: ${{ env.use_credentials != 'true' }}
working-directory: ./.ci_bin
run: |
curl -LO https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.9.9-bin.tar.gz
tar -xf apache-maven-3.9.9-bin.tar.gz
echo "$(pwd)/apache-maven-3.9.9/bin" >> $GITHUB_PATH
curl -LO https://dlcdn.apache.org/maven/maven-3/3.9.10/binaries/apache-maven-3.9.10-bin.tar.gz
tar -xf apache-maven-3.9.10-bin.tar.gz
echo "$(pwd)/apache-maven-3.9.10/bin" >> $GITHUB_PATH
- name: Install Hadoop
if: ${{ env.use_credentials != 'true' }}
working-directory: ./.ci_bin
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/ci-bats-unix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ jobs:
- name: Install Maven
working-directory: ./.ci_bin
run: |
curl -LO https://dlcdn.apache.org/maven/maven-3/3.9.9/binaries/apache-maven-3.9.9-bin.tar.gz
tar -xf apache-maven-3.9.9-bin.tar.gz
echo "$(pwd)/apache-maven-3.9.9/bin" >> $GITHUB_PATH
curl -LO https://dlcdn.apache.org/maven/maven-3/3.9.10/binaries/apache-maven-3.9.10-bin.tar.gz
tar -xf apache-maven-3.9.10-bin.tar.gz
echo "$(pwd)/apache-maven-3.9.10/bin" >> $GITHUB_PATH
- name: Install Hadoop
working-directory: ./.ci_bin
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/merge-perf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
TMPDIR=$gw/tmp
./${{ env.SCRIPT_DIR}}/setup.sh $TMPDIR $DATADIR

# small python script times merge, we suppres errcodes but print error messages
# small python script times merge, we suppress errcodes but print error messages
cd $TMPDIR
python3 -c "import time, subprocess, sys; start = time.time(); res=subprocess.run(['dolt', 'merge', '--squash', 'main'], capture_output=True); err = res.stdout + res.stderr if res.returncode != 0 else ''; latency = time.time() -start; print(latency); sys.stderr.write(str(err))" 1> lat.log 2>err.log
latency=$(cat lat.log)
Expand Down
12 changes: 6 additions & 6 deletions go/libraries/doltcore/merge/merge_prolly_rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func mergeProllyTable(
if err != nil {
return nil, nil, err
}
valueMerger := newValueMerger(mergedSch, tm.leftSch, tm.rightSch, tm.ancSch, leftRows.Pool(), tm.ns)
valueMerger := NewValueMerger(mergedSch, tm.leftSch, tm.rightSch, tm.ancSch, leftRows.Pool(), tm.ns)

if !valueMerger.leftMapping.IsIdentityMapping() {
mergeInfo.LeftNeedsRewrite = true
Expand Down Expand Up @@ -397,7 +397,7 @@ func threeWayDiffer(ctx context.Context, tm *TableMerger, valueMerger *valueMerg
leftRows.Tuples(),
rightRows.Tuples(),
ancRows.Tuples(),
valueMerger.tryMerge,
valueMerger.TryMerge,
valueMerger.keyless,
diffInfo,
leftRows.Tuples().Order,
Expand Down Expand Up @@ -1681,7 +1681,7 @@ type valueMerger struct {
ns tree.NodeStore
}

func newValueMerger(merged, leftSch, rightSch, baseSch schema.Schema, syncPool pool.BuffPool, ns tree.NodeStore) *valueMerger {
func NewValueMerger(merged, leftSch, rightSch, baseSch schema.Schema, syncPool pool.BuffPool, ns tree.NodeStore) *valueMerger {
leftMapping, rightMapping, baseMapping := generateSchemaMappings(merged, leftSch, rightSch, baseSch)

baseToLeftMapping, baseToRightMapping, baseToResultMapping := generateSchemaMappings(baseSch, leftSch, rightSch, merged)
Expand Down Expand Up @@ -1753,11 +1753,11 @@ func findNonPKColumnMappingByTagOrName(sch schema.Schema, col schema.Column) int
}
}

// tryMerge performs a cell-wise merge given left, right, and base cell value
// TryMerge performs a cell-wise merge given left, right, and base cell value
// tuples. It returns the merged cell value tuple and a bool indicating if a
// conflict occurred. tryMerge should only be called if left and right produce
// conflict occurred. TryMerge should only be called if left and right produce
// non-identical diffs against base.
func (m *valueMerger) tryMerge(ctx *sql.Context, left, right, base val.Tuple) (val.Tuple, bool, error) {
func (m *valueMerger) TryMerge(ctx *sql.Context, left, right, base val.Tuple) (val.Tuple, bool, error) {
// If we're merging a keyless table and the keys match, but the values are different,
// that means that the row data is the same, but the cardinality has changed, and if the
// cardinality has changed in different ways on each merge side, we can't auto resolve.
Expand Down
43 changes: 38 additions & 5 deletions go/libraries/doltcore/merge/merge_rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/atomicerr"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
)
Expand Down Expand Up @@ -83,6 +84,34 @@ type TableMerger struct {
recordViolations bool
}

func (tm TableMerger) GetNewValueMerger(mergeSch schema.Schema, leftRows prolly.Map) *valueMerger {
return NewValueMerger(mergeSch, tm.leftSch, tm.rightSch, tm.ancSch, leftRows.Pool(), leftRows.NodeStore())
}

func rowsFromTable(ctx context.Context, tbl *doltdb.Table) (prolly.Map, error) {
rd, err := tbl.GetRowData(ctx)
if err != nil {
return prolly.Map{}, err
}
rows, err := durable.ProllyMapFromIndex(rd)
if err != nil {
return prolly.Map{}, err
}
return rows, nil
}

func (tm TableMerger) LeftRows(ctx context.Context) (prolly.Map, error) {
return rowsFromTable(ctx, tm.leftTbl)
}

func (tm TableMerger) RightRows(ctx context.Context) (prolly.Map, error) {
return rowsFromTable(ctx, tm.rightTbl)
}

func (tm TableMerger) AncRows(ctx context.Context) (prolly.Map, error) {
return rowsFromTable(ctx, tm.ancTbl)
}

func (tm TableMerger) tableHashes(ctx context.Context) (left, right, anc hash.Hash, err error) {
if tm.leftTbl != nil {
if left, err = tm.leftTbl.HashOf(); err != nil {
Expand Down Expand Up @@ -114,6 +143,10 @@ func (tm TableMerger) tableHashes(ctx context.Context) (left, right, anc hash.Ha
return
}

func (tm TableMerger) SchemaMerge(ctx *sql.Context, tblName doltdb.TableName) (schema.Schema, SchemaConflict, MergeInfo, tree.ThreeWayDiffInfo, error) {
return SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
}

type RootMerger struct {
left doltdb.RootValue
right doltdb.RootValue
Expand Down Expand Up @@ -171,19 +204,19 @@ func (rm *RootMerger) MergeTable(
opts editor.Options,
mergeOpts MergeOpts,
) (*MergedResult, *MergeStats, error) {
tm, err := rm.makeTableMerger(ctx, tblName, mergeOpts)
tm, err := rm.MakeTableMerger(ctx, tblName, mergeOpts)
if err != nil {
return nil, nil, err
}

// short-circuit here if we can
finished, finishedRootObj, stats, err := rm.maybeShortCircuit(ctx, tm, mergeOpts)
finished, finishedRootObj, stats, err := rm.MaybeShortCircuit(ctx, tm, mergeOpts)
if finished != nil || stats != nil || err != nil {
return &MergedResult{table: finished, rootObj: finishedRootObj}, stats, err
}

// Calculate a merge of the schemas, but don't apply it yet
mergeSch, schConflicts, mergeInfo, diffInfo, err := SchemaMerge(ctx, tm.vrw.Format(), tm.leftSch, tm.rightSch, tm.ancSch, tblName)
mergeSch, schConflicts, mergeInfo, diffInfo, err := tm.SchemaMerge(ctx, tblName)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -233,7 +266,7 @@ func (rm *RootMerger) MergeTable(
return &MergedResult{table: tbl, rootObj: rootObj}, stats, nil
}

func (rm *RootMerger) makeTableMerger(ctx context.Context, tblName doltdb.TableName, mergeOpts MergeOpts) (*TableMerger, error) {
func (rm *RootMerger) MakeTableMerger(ctx context.Context, tblName doltdb.TableName, mergeOpts MergeOpts) (*TableMerger, error) {
recordViolations := true
if mergeOpts.RecordViolationsForTables != nil {
if _, ok := mergeOpts.RecordViolationsForTables[tblName.ToLower()]; !ok {
Expand Down Expand Up @@ -335,7 +368,7 @@ func (rm *RootMerger) makeTableMerger(ctx context.Context, tblName doltdb.TableN
return &tm, nil
}

func (rm *RootMerger) maybeShortCircuit(ctx context.Context, tm *TableMerger, opts MergeOpts) (*doltdb.Table, doltdb.RootObject, *MergeStats, error) {
func (rm *RootMerger) MaybeShortCircuit(ctx context.Context, tm *TableMerger, opts MergeOpts) (*doltdb.Table, doltdb.RootObject, *MergeStats, error) {
// If we need to re-verify all constraints as part of this merge, then we can't short
// circuit considering any tables, so return immediately
if opts.ReverifyAllConstraints {
Expand Down
4 changes: 2 additions & 2 deletions go/libraries/doltcore/merge/row_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,9 @@ func TestRowMerge(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
v := newValueMerger(test.mergedSch, test.leftSch, test.rightSch, test.baseSch, syncPool, nil)
v := NewValueMerger(test.mergedSch, test.leftSch, test.rightSch, test.baseSch, syncPool, nil)

merged, ok, err := v.tryMerge(ctx, test.row, test.mergeRow, test.ancRow)
merged, ok, err := v.TryMerge(ctx, test.row, test.mergeRow, test.ancRow)
assert.NoError(t, err)
assert.Equal(t, test.expectConflict, !ok)
vD := test.mergedSch.GetValueDescriptor(v.ns)
Expand Down
Loading
Loading