From e86221fc41f28bd94f749e29ee4592a07aca604f Mon Sep 17 00:00:00 2001 From: Gary Elliott Date: Mon, 28 Nov 2016 15:40:26 -0500 Subject: [PATCH] bigtable/bttest: RowKeyFilter support in emulator. Change-Id: Icf731374ae65551877f5b311e002607f8f5198d9 Reviewed-on: https://code-review.googlesource.com/9565 Reviewed-by: Jonathan Amsterdam --- bigtable/bigtable_test.go | 31 ++++++++++++++++++++++- bigtable/bttest/inmem.go | 53 ++++++++++++++++++++++++++++++--------- 2 files changed, 71 insertions(+), 13 deletions(-) diff --git a/bigtable/bigtable_test.go b/bigtable/bigtable_test.go index a8e0489693cb..646d68149b33 100644 --- a/bigtable/bigtable_test.go +++ b/bigtable/bigtable_test.go @@ -193,7 +193,8 @@ func TestClientIntegration(t *testing.T) { readTests := []struct { desc string rr RowSet - filter Filter // may be nil + filter Filter // may be nil + limit ReadOption // may be nil // We do the read, grab all the cells, turn them into "--", // sort that list, and join with a comma. @@ -248,12 +249,40 @@ func TestClientIntegration(t *testing.T) { filter: ColumnRangeFilter("follows", "h", ""), want: "gwashington-jadams-1,jadams-tjefferson-1,tjefferson-jadams-1,tjefferson-wmckinley-1,wmckinley-tjefferson-1", }, + { + desc: "read with RowKeyFilter", + rr: RowRange{}, + filter: RowKeyFilter(".*wash.*"), + want: "gwashington-jadams-1", + }, + { + desc: "read with RowKeyFilter, no matches", + rr: RowRange{}, + filter: RowKeyFilter(".*xxx.*"), + want: "", + }, + { + desc: "read with FamilyFilter, no matches", + rr: RowRange{}, + filter: FamilyFilter(".*xxx.*"), + want: "", + }, + { + desc: "read with ColumnFilter + row limit", + rr: RowRange{}, + filter: ColumnFilter(".*j.*"), // matches "jadams" and "tjefferson" + limit: LimitRows(2), + want: "gwashington-jadams-1,jadams-tjefferson-1", + }, } for _, tc := range readTests { var opts []ReadOption if tc.filter != nil { opts = append(opts, RowFilter(tc.filter)) } + if tc.limit != nil { + opts = append(opts, tc.limit) + } var elt []string err := tbl.ReadRows(context.Background(), tc.rr, func(r Row) bool { for _, ris := range r { diff --git a/bigtable/bttest/inmem.go b/bigtable/bttest/inmem.go index abc090f4e6fc..a3129a2d619d 100644 --- a/bigtable/bttest/inmem.go +++ b/bigtable/bttest/inmem.go @@ -308,13 +308,18 @@ func (s *server) ReadRows(req *btpb.ReadRowsRequest, stream btpb.Bigtable_ReadRo sort.Sort(byRowKey(rows)) limit := int(req.RowsLimit) - for i, r := range rows { - if limit > 0 && i >= limit { + count := 0 + for _, r := range rows { + if limit > 0 && count >= limit { return nil } - if err := streamRow(stream, r, req.Filter); err != nil { + streamed, err := streamRow(stream, r, req.Filter) + if err != nil { return err } + if streamed { + count++ + } } return nil } @@ -334,13 +339,17 @@ func addRows(start, end string, tbl *table, rowSet map[string]*row) { } } -func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) error { +// streamRow filters the given row and sends it via the given stream. +// Returns true if at least one cell matched the filter and was streamed, false otherwise. +func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) (bool, error) { r.mu.Lock() nr := r.copy() r.mu.Unlock() r = nr - filterRow(f, r) + if !filterRow(f, r) { + return false, nil + } rrr := &btpb.ReadRowsResponse{} for col, cells := range r.cells { @@ -366,13 +375,14 @@ func streamRow(stream btpb.Bigtable_ReadRowsServer, r *row, f *btpb.RowFilter) e rrr.Chunks[len(rrr.Chunks)-1].RowStatus = &btpb.ReadRowsResponse_CellChunk_CommitRow{true} } - return stream.Send(rrr) + return true, stream.Send(rrr) } -// filterRow modifies a row with the given filter. -func filterRow(f *btpb.RowFilter, r *row) { +// filterRow modifies a row with the given filter. Returns true if at least one cell from the row matches, +// false otherwise. +func filterRow(f *btpb.RowFilter, r *row) bool { if f == nil { - return + return true } // Handle filters that apply beyond just including/excluding cells. switch f := f.Filter.(type) { @@ -380,7 +390,7 @@ func filterRow(f *btpb.RowFilter, r *row) { for _, sub := range f.Chain.Filters { filterRow(sub, r) } - return + return true case *btpb.RowFilter_Interleave_: srs := make([]*row, 0, len(f.Interleave.Filters)) for _, sub := range f.Interleave.Filters { @@ -399,7 +409,7 @@ func filterRow(f *btpb.RowFilter, r *row) { for _, cs := range r.cells { sort.Sort(byDescTS(cs)) } - return + return true case *btpb.RowFilter_CellsPerColumnLimitFilter: lim := int(f.CellsPerColumnLimitFilter) for col, cs := range r.cells { @@ -407,15 +417,28 @@ func filterRow(f *btpb.RowFilter, r *row) { r.cells[col] = cs[:lim] } } - return + return true + case *btpb.RowFilter_RowKeyRegexFilter: + pat := string(f.RowKeyRegexFilter) + rx, err := regexp.Compile(pat) + if err != nil { + log.Printf("Bad rowkey_regex_filter pattern %q: %v", pat, err) + return false + } + if !rx.MatchString(r.key) { + return false + } } // Any other case, operate on a per-cell basis. + cellCount := 0 for key, cs := range r.cells { i := strings.Index(key, ":") // guaranteed to exist fam, col := key[:i], key[i+1:] r.cells[key] = filterCells(f, fam, col, cs) + cellCount += len(r.cells[key]) } + return cellCount > 0 } func filterCells(f *btpb.RowFilter, fam, col string, cs []cell) []cell { @@ -434,6 +457,12 @@ func includeCell(f *btpb.RowFilter, fam, col string, cell cell) bool { } // TODO(dsymonds): Implement many more filters. switch f := f.Filter.(type) { + case *btpb.RowFilter_CellsPerColumnLimitFilter: + // Don't log, row-level filter + return true + case *btpb.RowFilter_RowKeyRegexFilter: + // Don't log, row-level filter + return true default: log.Printf("WARNING: don't know how to handle filter of type %T (ignoring it)", f) return true