Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store namespace in predicate as a hex separated by a hyphen to prevent json marshal issues #8601

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestTransactionBasic(t *testing.T) {
require.Equal(t, 2, len(mr.preds))
var parsedPreds []string
for _, pred := range mr.preds {
p := strings.Split(pred, "-")[1]
p := strings.SplitN(pred, "-", 2)[1]
parsedPreds = append(parsedPreds, x.ParseAttr(p))
}
sort.Strings(parsedPreds)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type QueryResult struct {

func splitPreds(ps []string) []string {
for i, p := range ps {
ps[i] = x.ParseAttr(strings.Split(p, "-")[1])
ps[i] = x.ParseAttr(strings.SplitN(p, "-", 2)[1])
}

return ps
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
checkPreds := func() error {
// Check if any of these tablets is being moved. If so, abort the transaction.
for _, pkey := range src.Preds {
splits := strings.Split(pkey, "-")
splits := strings.SplitN(pkey, "-", 2)
if len(splits) < 2 {
return errors.Errorf("Unable to find group id in %s", pkey)
}
gid, err := strconv.Atoi(splits[0])
if err != nil {
return errors.Wrapf(err, "unable to parse group id from %s", pkey)
}
pred := strings.Join(splits[1:], "-")
pred := splits[1]
tablet := s.ServingTablet(pred)
if tablet == nil {
return errors.Errorf("Tablet for %s is nil", pred)
Expand Down
11 changes: 1 addition & 10 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,16 +1110,7 @@ func filterTablets(ctx context.Context, ms *pb.MembershipState) error {
return errors.Errorf("Namespace not found in JWT.")
}
if namespace == x.GalaxyNamespace {
// For galaxy namespace, we don't want to filter out the predicates. We only format the
// namespace to human readable form.
for _, group := range ms.Groups {
tablets := make(map[string]*pb.Tablet)
for tabletName, tablet := range group.Tablets {
tablet.Predicate = x.FormatNsAttr(tablet.Predicate)
tablets[x.FormatNsAttr(tabletName)] = tablet
}
group.Tablets = tablets
}
// For galaxy namespace, we don't want to filter out the predicates.
return nil
}
for _, group := range ms.GetGroups() {
Expand Down
13 changes: 6 additions & 7 deletions graphql/admin/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func resolveState(ctx context.Context, q schema.Query) *resolve.Resolved {
u := jsonpb.Unmarshaler{}
var ms pb.MembershipState
err = u.Unmarshal(bytes.NewReader(resp.GetJson()), &ms)

if err != nil {
return resolve.EmptyResult(q, err)
}

// map to graphql response structure
state := convertToGraphQLResp(ms)
ns, _ := x.ExtractNamespace(ctx)
// map to graphql response structure. Only guardian of galaxy can list the namespaces.
state := convertToGraphQLResp(ms, ns == x.GalaxyNamespace)
b, err := json.Marshal(state)
if err != nil {
return resolve.EmptyResult(q, err)
Expand All @@ -77,7 +77,7 @@ func resolveState(ctx context.Context, q schema.Query) *resolve.Resolved {
// values and not the keys. For pb.MembershipState.Group, the keys are the group IDs
// and pb.Group didn't contain this ID, so we are creating a custom clusterGroup type,
// which is same as pb.Group and also contains the ID for the group.
func convertToGraphQLResp(ms pb.MembershipState) membershipState {
func convertToGraphQLResp(ms pb.MembershipState, listNs bool) membershipState {
var state membershipState

// namespaces stores set of namespaces
Expand All @@ -92,9 +92,8 @@ func convertToGraphQLResp(ms pb.MembershipState) membershipState {
var tablets = make([]*pb.Tablet, 0, len(v.Tablets))
for name, v1 := range v.Tablets {
tablets = append(tablets, v1)
val, err := x.ExtractNamespaceFromPredicate(name)
if err == nil {
namespaces[val] = struct{}{}
if listNs {
namespaces[x.ParseNamespace(name)] = struct{}{}
}
}
state.Groups = append(state.Groups, clusterGroup{
Expand Down
18 changes: 7 additions & 11 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,16 +592,15 @@ func (r *rebuilder) Run(ctx context.Context) error {

glog.V(1).Infof(
"Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n",
x.FormatNsAttr(r.attr), r.startTs, hex.Dump(r.prefix))
r.attr, r.startTs, hex.Dump(r.prefix))

// Counter is used here to ensure that all keys are committed at different timestamp.
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):",
x.FormatNsAttr(r.attr))
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand Down Expand Up @@ -663,21 +662,19 @@ func (r *rebuilder) Run(ctx context.Context) error {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: building temp index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))

// Now we write all the created posting lists to disk.
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger", r.attr)
start = time.Now()
defer func() {
glog.V(1).Infof("Rebuilding index for predicate %s: writing index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))
}()

writer := pstore.NewManagedWriteBatch()
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):",
x.FormatNsAttr(r.attr))
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
l, err := ReadPostingList(key, itr)
if err != nil {
Expand Down Expand Up @@ -720,8 +717,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
if err := tmpStream.Orchestrate(ctx); err != nil {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", r.attr)
return writer.Flush()
}

Expand Down
16 changes: 8 additions & 8 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestAddMutation_mrjn2(t *testing.T) {
}

func TestAddMutation_gru(t *testing.T) {
key := x.DataKey("question.tag", 0x01)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x01)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestAddMutation_gru(t *testing.T) {
}

func TestAddMutation_gru2(t *testing.T) {
key := x.DataKey("question.tag", 0x100)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x100)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -639,7 +639,7 @@ func TestAddMutation_gru2(t *testing.T) {
func TestAddAndDelMutation(t *testing.T) {
// Ensure each test uses unique key since we don't clear the postings
// after each test
key := x.DataKey("dummy_key", 0x927)
key := x.DataKey(x.GalaxyAttr("dummy_key"), 0x927)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -878,7 +878,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 5000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -926,7 +926,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 10000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func TestBinSplit(t *testing.T) {
defer func() {
maxListSize = originalListSize
}()
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1268,7 +1268,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
maxListSize = 5000

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func TestRecursiveSplits(t *testing.T) {

// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down
20 changes: 14 additions & 6 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
alphaBackupDir = "/data/backups"
oldBackupDir1 = "/data/to_restore/1"
oldBackupDir2 = "/data/to_restore/2"
oldBackupDir3 = "/data/to_restore/3"
alphaContainers = []string{
"alpha1",
"alpha2",
Expand Down Expand Up @@ -92,7 +93,8 @@ func TestBackupOfOldRestore(t *testing.T) {
common.DirSetup(t)
common.CopyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)
Expand All @@ -105,7 +107,8 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, oldBackupDir1)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
q := `{ authors(func: has(Author.name)) { count(uid) } }`
resp, err := dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))

Expand All @@ -117,7 +120,7 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, alphaBackupDir)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
resp, err = dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))
}
Expand Down Expand Up @@ -151,16 +154,19 @@ func TestRestoreOfOldBackup(t *testing.T) {
require.NoError(t, err)
require.JSONEq(t, r, string(resp.Json))
}

queryAndCheck("p1", 0)
queryAndCheck("p2", 2)
queryAndCheck("p3", 0)
queryAndCheck("p4", 2)
}
t.Run("backup of 20.11", func(t *testing.T) { test(oldBackupDir2) })
t.Run("backup of 21.03", func(t *testing.T) { test(oldBackupDir3) })
}

func TestBackupFilesystem(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

Expand Down Expand Up @@ -432,15 +438,17 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,

var data interface{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&data))
require.Equal(t, "Success", testutil.JsonGet(data, "data", "backup", "response", "code").(string))
require.Equal(t, "Success",
testutil.JsonGet(data, "data", "backup", "response", "code").(string))
taskId := testutil.JsonGet(data, "data", "backup", "taskId").(string)
testutil.WaitForTask(t, taskId, true, testutil.SockAddrHttp)

// Verify that the right amount of files and directories were created.
common.CopyToLocalFs(t)

files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
return !isdir && strings.HasSuffix(path, ".backup") &&
strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedFiles, len(files))

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"Manifests":[{"type":"full","since":0,"read_ts":9,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":1,"version":2103,"path":"dgraph.20210517.095641.969","encrypted":false,"drop_operations":null,"compression":"snappy"},{"type":"incremental","since":0,"read_ts":21,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":2,"version":2103,"path":"dgraph.20210517.095716.130","encrypted":false,"drop_operations":[{"drop_op":1}],"compression":"snappy"},{"type":"incremental","since":0,"read_ts":26,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":3,"version":2103,"path":"dgraph.20210517.095726.320","encrypted":false,"drop_operations":[{"drop_op":2,"drop_value":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3"}],"compression":"snappy"}]}
4 changes: 0 additions & 4 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func (m *Manifest) getPredsInGroup(gid uint32) predicateSet {

predSet := make(predicateSet)
for _, pred := range preds {
if m.Version == 0 {
// For older versions, preds set will contain attribute without namespace.
pred = x.NamespaceAttr(x.GalaxyNamespace, pred)
}
predSet[pred] = struct{}{}
}
return predSet
Expand Down
8 changes: 4 additions & 4 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
m := Manifest{
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
Version: x.ManifestVersion,
DropOperations: dropOperations,
Path: dir,
Compression: "snappy",
Expand Down Expand Up @@ -555,14 +555,14 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro
return err
}

manifest, err := GetManifest(handler, uri)
manifest, err := GetManifestNoUpgrade(handler, uri)
if err != nil {
return err
}
manifest.Manifests = append(manifest.Manifests, m)

if err := createManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "Complete backup failed")
if err := CreateManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "complete backup failed")
}
glog.Infof("Backup completed OK.")
return nil
Expand Down
Loading