Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 36 additions & 23 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,99 +239,112 @@ func compare(comparison Opcode, columnValue, filterValue sqltypes.Value, collati
return false, nil
}

// filter filters the row against the plan. It returns false if the row did not match.
// The output of the filtering operation is stored in the 'result' argument because
// filtering cannot be performed in-place. The result argument must be a slice of
// length equal to ColExprs
func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.ID) (bool, error) {
if len(result) != len(plan.ColExprs) {
return false, fmt.Errorf("expected %d values in result slice", len(plan.ColExprs))
// shouldFilter evaluates whether a binlog (before or after) image should be included in the stream based on the plan's filters.
// It returns:
// - bool: true if the row should be included in the stream (passes all filters)
// - bool: true if a vindex filter was applied (indicates sharded filtering)
func (plan *Plan) shouldFilter(values []sqltypes.Value, charsets []collations.ID) (bool, bool, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For readability it will be good to name the return variables.

hasVindex := false
if len(values) == 0 {
return false, false, nil
}
for _, filter := range plan.Filters {
switch filter.Opcode {
case VindexMatch:
ksid, err := getKeyspaceID(values, filter.Vindex, filter.VindexColumns, plan.Table.Fields)
if err != nil {
return false, err
return false, false, err
}
hasVindex = true
if !key.KeyRangeContains(filter.KeyRange, ksid) {
return false, nil
return false, true, nil
}
case IsNotNull:
if values[filter.ColNum].IsNull() {
return false, nil
return false, false, nil
}
case IsNull:
if !values[filter.ColNum].IsNull() {
return false, nil
return false, false, nil
}
case In:
if filter.Values == nil {
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected empty filter values when performing IN operator")
return false, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected empty filter values when performing IN operator")
}
found := false
for _, filterValue := range filter.Values {
match, err := compare(Equal, values[filter.ColNum], filterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
return false, err
return false, false, err
}
if match {
found = true
break
}
}
if !found {
return false, nil
return false, false, nil
}
case NotBetween:
// Note that we do not implement filtering for BETWEEN because
// in the plan we rewrite `x BETWEEN a AND b` to `x >= a AND x <= b`
// This is the filtering for NOT BETWEEN since we don't have support
// for OR yet.
if filter.Values == nil || len(filter.Values) != 2 {
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected 2 filter values when performing NOT BETWEEN")
return false, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected 2 filter values when performing NOT BETWEEN")
}
leftFilterValue, rightFilterValue := filter.Values[0], filter.Values[1]
isValueLessThanLeftFilter, err := compare(LessThan, values[filter.ColNum], leftFilterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
return false, err
return false, false, err
}
if isValueLessThanLeftFilter {
continue
}
isValueGreaterThanRightFilter, err := compare(GreaterThan, values[filter.ColNum], rightFilterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil || !isValueGreaterThanRightFilter {
return false, err
return false, false, err
}
default:
match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
return false, err
return false, false, err
}
if !match {
return false, nil
return false, false, nil
}
}
}
return true, hasVindex, nil
}

// mapValues maps the row values against the plan.
// The output of the filtering operation is stored in the 'result' argument because
// filtering cannot be performed in-place. The result argument must be a slice of
// length equal to ColExprs
func (plan *Plan) mapValues(values []sqltypes.Value) ([]sqltypes.Value, error) {

result := make([]sqltypes.Value, len(plan.ColExprs))

for i, colExpr := range plan.ColExprs {
if colExpr.ColNum == -1 {
result[i] = colExpr.FixedValue
continue
}
if colExpr.ColNum >= len(values) {
return false, fmt.Errorf("index out of range, colExpr.ColNum: %d, len(values): %d", colExpr.ColNum, len(values))
return nil, fmt.Errorf("index out of range, colExpr.ColNum: %d, len(values): %d", colExpr.ColNum, len(values))
}
if colExpr.Vindex == nil {
result[i] = values[colExpr.ColNum]
} else {
ksid, err := getKeyspaceID(values, colExpr.Vindex, colExpr.VindexColumns, plan.Table.Fields)
if err != nil {
return false, err
return nil, err
}
result[i] = sqltypes.MakeTrusted(sqltypes.VarBinary, []byte(ksid))
result[i] = sqltypes.MakeTrusted(sqltypes.VarBinary, ksid)
}
}
return true, nil
return result, nil
}

func getKeyspaceID(values []sqltypes.Value, vindex vindexes.Vindex, vindexColumns []int, fields []*querypb.Field) (key.DestinationKeyspaceID, error) {
Expand Down
10 changes: 7 additions & 3 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
mysqlrow []sqltypes.Value
)

filtered := make([]sqltypes.Value, len(rs.plan.ColExprs))
lastpk := make([]sqltypes.Value, len(rs.pkColumns))
byteCount := 0
logger := logutil.NewThrottledLogger(rs.vse.GetTabletInfo(), throttledLoggerInterval)
Expand Down Expand Up @@ -441,12 +440,17 @@ func (rs *rowStreamer) streamQuery(send func(*binlogdatapb.VStreamRowsResponse)
for i, pk := range rs.pkColumns {
lastpk[i] = mysqlrow[pk]
}
// Reuse the vstreamer's filter.
ok, err := rs.plan.filter(mysqlrow, filtered, charsets)

// verify that the row should be sent
ok, _, err := rs.plan.shouldFilter(mysqlrow, charsets)
if err != nil {
return err
}
if ok {
filtered, err := rs.plan.mapValues(mysqlrow)
if err != nil {
return err
}
if rowCount >= len(rows) {
rows = append(rows, &querypb.Row{})
}
Expand Down
98 changes: 64 additions & 34 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -977,12 +977,11 @@ func (vs *vstreamer) processJournalEvent(vevents []*binlogdatapb.VEvent, plan *s
}
nextrow:
for _, row := range rows.Rows {
afterOK, afterValues, _, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues)
afterValues, _, _, err := vs.getValues(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues)
if err != nil {
return nil, vterrors.Wrap(err, "failed to extract journal from binlog event and apply filters")
}
if !afterOK {
// This can happen if someone manually deleted rows.
if len(afterValues) == 0 {
continue
}
// Exclude events that don't match the db_name.
Expand Down Expand Up @@ -1012,41 +1011,77 @@ nextrow:
return vevents, nil
}

// processRowEvent converts binlog rows into row vevents using the following steps:
// - converts the raw before and after binlog images into Values
// - finds which before or after images passes the filter criterion
// - if the target is sharded, pass only images that pass
// - if the target is not sharded, pass both images if either after or before passes
func (vs *vstreamer) processRowEvent(vevents []*binlogdatapb.VEvent, plan *streamerPlan, rows mysql.Rows) ([]*binlogdatapb.VEvent, error) {
rowChanges := make([]*binlogdatapb.RowChange, 0, len(rows.Rows))
for _, row := range rows.Rows {
// The BEFORE image does not have partial JSON values so we pass an empty bitmap.
beforeOK, beforeValues, _, err := vs.extractRowAndFilter(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns, mysql.Bitmap{})
beforeRawValues, beforeCharsets, _, err := vs.getValues(plan, row.Identify, rows.IdentifyColumns, row.NullIdentifyColumns, mysql.Bitmap{})
if err != nil {
return nil, vterrors.Wrap(err, "failed to extract row's before values from binlog event and apply filters")
return nil, err
}
beforeOK, beforeHasVindex, err := plan.shouldFilter(beforeRawValues, beforeCharsets)
if err != nil {
return nil, err
}

// The AFTER image is where we may have partial JSON values, as reflected in the
// row's JSONPartialValues bitmap.
afterOK, afterValues, partial, err := vs.extractRowAndFilter(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues)
afterRawValues, afterCharsets, partial, err := vs.getValues(plan, row.Data, rows.DataColumns, row.NullColumns, row.JSONPartialValues)
if err != nil {
return nil, vterrors.Wrap(err, "failed to extract row's after values from binlog event and apply filters")
return nil, err
}
if !beforeOK && !afterOK {
afterOK, afterHasVindex, err := plan.shouldFilter(afterRawValues, afterCharsets)
if err != nil {
return nil, err
}

hasVindex := beforeHasVindex || afterHasVindex
if !afterOK && !beforeOK {
// both before and after images are filtered out
continue
}

// at least one image passes the filter and is not a sharded filter
if !hasVindex {
// we want both images to be part of the row event if either passes and we are not in a sharded situation
afterOK = true
beforeOK = true
}

rowChange := &binlogdatapb.RowChange{}
if beforeOK {
rowChange.Before = sqltypes.RowToProto3(beforeValues)
if len(beforeRawValues) > 0 {
beforeValues, err := plan.mapValues(beforeRawValues)
if err != nil {
return nil, err
}
rowChange.Before = sqltypes.RowToProto3(beforeValues)
}
}
if afterOK {
rowChange.After = sqltypes.RowToProto3(afterValues)
if ((vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) && partial) ||
(row.JSONPartialValues.Count() > 0) {

rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{
Count: int64(rows.DataColumns.Count()),
Cols: rows.DataColumns.Bits(),
if len(afterRawValues) > 0 {
afterValues, err := plan.mapValues(afterRawValues)
if err != nil {
return nil, err
}
}
if row.JSONPartialValues.Count() > 0 {
rowChange.JsonPartialValues = &binlogdatapb.RowChange_Bitmap{
Count: int64(row.JSONPartialValues.Count()),
Cols: row.JSONPartialValues.Bits(),
rowChange.After = sqltypes.RowToProto3(afterValues)
if ((vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage != 0) && partial) ||
(row.JSONPartialValues.Count() > 0) {
rowChange.DataColumns = &binlogdatapb.RowChange_Bitmap{
Count: int64(rows.DataColumns.Count()),
Cols: rows.DataColumns.Bits(),
}
}
if row.JSONPartialValues.Count() > 0 {
rowChange.JsonPartialValues = &binlogdatapb.RowChange_Bitmap{
Count: int64(row.JSONPartialValues.Count()),
Cols: row.JSONPartialValues.Bits(),
}
}
}
}
Expand Down Expand Up @@ -1090,13 +1125,10 @@ func (vs *vstreamer) rebuildPlans() error {
return nil
}

// extractRowAndFilter takes the data and bitmaps from the binlog events and returns the following
// - true, if row needs to be skipped because of workflow filter rules
// - data values, array of one value per column
// - true, if the row image was partial (i.e. binlog_row_image=noblob and dml doesn't update one or more blob/text columns)
func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataColumns, nullColumns mysql.Bitmap, jsonPartialValues mysql.Bitmap) (bool, []sqltypes.Value, bool, error) {
func (vs *vstreamer) getValues(plan *streamerPlan, data []byte,
dataColumns, nullColumns mysql.Bitmap, jsonPartialValues mysql.Bitmap) ([]sqltypes.Value, []collations.ID, bool, error) {
if len(data) == 0 {
return false, nil, false, nil
return nil, nil, false, nil
}
values := make([]sqltypes.Value, dataColumns.Count())
charsets := make([]collations.ID, len(values))
Expand All @@ -1107,7 +1139,7 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
for colNum := 0; colNum < dataColumns.Count(); colNum++ {
if !dataColumns.Bit(colNum) {
if vs.config.ExperimentalFlags /**/ & /**/ vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage == 0 {
return false, nil, false, fmt.Errorf("partial row image encountered: ensure binlog_row_image is set to 'full'")
return nil, nil, false, fmt.Errorf("partial row image encountered: ensure binlog_row_image is set to 'full'")
} else {
partial = true
}
Expand All @@ -1129,7 +1161,7 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
if err != nil {
log.Errorf("extractRowAndFilter: %s, table: %s, colNum: %d, fields: %+v, current values: %+v",
err, plan.Table.Name, colNum, plan.Table.Fields, values)
return false, nil, false, vterrors.Wrapf(err, "failed to extract row's value for column %s from binlog event",
return nil, nil, false, vterrors.Wrapf(err, "failed to extract row's value for column %s from binlog event",
plan.Table.Fields[colNum].Name)
}
pos += l
Expand All @@ -1147,13 +1179,13 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
if plan.Table.Fields[colNum].Type == querypb.Type_ENUM || mysqlType == mysqlbinlog.TypeEnum {
value, err = buildEnumStringValue(vs.se.Environment(), plan, colNum, value)
if err != nil {
return false, nil, false, vterrors.Wrapf(err, "failed to perform ENUM column integer to string value mapping")
return nil, nil, false, vterrors.Wrapf(err, "failed to perform ENUM column integer to string value mapping")
}
}
if plan.Table.Fields[colNum].Type == querypb.Type_SET || mysqlType == mysqlbinlog.TypeSet {
value, err = buildSetStringValue(vs.se.Environment(), plan, colNum, value)
if err != nil {
return false, nil, false, vterrors.Wrapf(err, "failed to perform SET column integer to string value mapping")
return nil, nil, false, vterrors.Wrapf(err, "failed to perform SET column integer to string value mapping")
}
}
}
Expand All @@ -1162,9 +1194,7 @@ func (vs *vstreamer) extractRowAndFilter(plan *streamerPlan, data []byte, dataCo
values[colNum] = value
valueIndex++
}
filtered := make([]sqltypes.Value, len(plan.ColExprs))
ok, err := plan.filter(values, filtered, charsets)
return ok, filtered, partial, err
return values, charsets, partial, nil
}

// addEnumAndSetMappingstoPlan sets up any necessary ENUM and SET integer to string mappings.
Expand Down
Loading
Loading