Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce DropPrefix API into Dgraph #3060

Merged
merged 9 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func NewNode(rc *pb.RaftContext, store *raftwal.DiskStorage) *Node {
peers: make(map[uint64]string),
requestCh: make(chan linReadReq),
}
n.Applied.Init()
n.Applied.Init(nil)
// This should match up to the Applied index set above.
n.Applied.SetDoneUntil(n.Cfg.Applied)
glog.Infof("Setting raft.Config to: %+v\n", n.Cfg)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func ipInIPWhitelistRanges(ipString string) bool {
return false
}

for _, ipRange := range worker.Config.WhiteListedIPRanges {
for _, ipRange := range x.WorkerConfig.WhiteListedIPRanges {
if bytes.Compare(ip, ipRange.Lower) >= 0 && bytes.Compare(ip, ipRange.Upper) <= 0 {
return true
}
Expand Down
18 changes: 9 additions & 9 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ func setupCustomTokenizers() {
// and returns a slice of []IPRange.
//
// e.g. "144.142.126.222:144.142.126.244,144.142.126.254,192.168.0.0/16,host.docker.internal"
func getIPsFromString(str string) ([]worker.IPRange, error) {
func getIPsFromString(str string) ([]x.IPRange, error) {
if str == "" {
return []worker.IPRange{}, nil
return []x.IPRange{}, nil
}

var ipRanges []worker.IPRange
var ipRanges []x.IPRange
rangeStrings := strings.Split(str, ",")

for _, s := range rangeStrings {
Expand All @@ -201,15 +201,15 @@ func getIPsFromString(str string) ([]worker.IPRange, error) {
// or IPv6 address like fd03:b188:0f3c:9ec4::babe:face
ipAddr := net.ParseIP(s)
if ipAddr != nil {
ipRanges = append(ipRanges, worker.IPRange{Lower: ipAddr, Upper: ipAddr})
ipRanges = append(ipRanges, x.IPRange{Lower: ipAddr, Upper: ipAddr})
} else {
ipAddrs, err := net.LookupIP(s)
if err != nil {
return nil, fmt.Errorf("invalid IP address or hostname: %s", s)
}

for _, addr := range ipAddrs {
ipRanges = append(ipRanges, worker.IPRange{Lower: addr, Upper: addr})
ipRanges = append(ipRanges, x.IPRange{Lower: addr, Upper: addr})
}
}
} else {
Expand All @@ -226,7 +226,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) {
rangeHi[addrLen-i] |= ^network.Mask[maskLen-i]
}

ipRanges = append(ipRanges, worker.IPRange{Lower: rangeLo, Upper: rangeHi})
ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi})
}
case len(tuple) == 2:
// string is range like a.b.c.d:w.x.y.z
Expand All @@ -239,7 +239,7 @@ func getIPsFromString(str string) ([]worker.IPRange, error) {
} else if bytes.Compare(rangeLo, rangeHi) > 0 {
return nil, fmt.Errorf("inverted IP address range: %s", s)
} else {
ipRanges = append(ipRanges, worker.IPRange{Lower: rangeLo, Upper: rangeHi})
ipRanges = append(ipRanges, x.IPRange{Lower: rangeLo, Upper: rangeHi})
}
default:
return nil, fmt.Errorf("invalid IP address range: %s", s)
Expand Down Expand Up @@ -482,7 +482,7 @@ func run() {

ips, err := getIPsFromString(Alpha.Conf.GetString("whitelist"))
x.Check(err)
worker.Config = worker.Options{
x.WorkerConfig = x.WorkerOptions{
ExportPath: Alpha.Conf.GetString("export"),
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Tracing: Alpha.Conf.GetFloat64("trace"),
Expand Down Expand Up @@ -519,7 +519,7 @@ func run() {
}
}
otrace.ApplyConfig(otrace.Config{
DefaultSampler: otrace.ProbabilitySampler(worker.Config.Tracing)})
DefaultSampler: otrace.ProbabilitySampler(x.WorkerConfig.Tracing)})

// Posting will initialize index which requires schema. Hence, initialize
// schema before calling posting.Init().
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/query"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -1674,7 +1674,7 @@ func TestTypeMutationAndQuery(t *testing.T) {
}

func TestIPStringParsing(t *testing.T) {
var addrRange []worker.IPRange
var addrRange []x.IPRange
var err error

addrRange, err = getIPsFromString("144.142.126.222:144.142.126.244")
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (o *Oracle) Init() {
o.keyCommit = make(map[string]uint64)
o.subscribers = make(map[int]chan *pb.OracleDelta)
o.updates = make(chan *pb.OracleDelta, 100000) // Keeping 1 second worth of updates.
o.doneUntil.Init()
o.doneUntil.Init(nil)
go o.sendDeltasToSubscribers()
}

Expand Down
11 changes: 7 additions & 4 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,10 +553,13 @@ func (s *Server) ShouldServe(
var proposal pb.ZeroProposal
// Multiple Groups might be assigned to same tablet, so during proposal we will check again.
tablet.Force = false
if x.IsAclPredicate(tablet.Predicate) {
// force all the acl predicates to be allocated to group 1
// this is to make it eaiser to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all ACL predicates
if x.IsReservedPredicate(tablet.Predicate) {
// Force all the reserved predicates to be allocated to group 1.
// This is to make it eaiser to stream ACL updates to all alpha servers
// since they only need to open one pipeline to receive updates for all
// ACL predicates.
// This will also make it easier to restore the reserved predicates after
// a DropAll operation.
tablet.GroupId = 1
}
proposal.Tablet = tablet
Expand Down
7 changes: 3 additions & 4 deletions edgraph/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"time"

"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
)

Expand Down Expand Up @@ -89,9 +88,9 @@ func setConfVar(conf Options) {
x.Conf.Set("allotted_memory", newFloat(conf.AllottedMemory))

// Set some vars from worker.Config.
x.Conf.Set("tracing", newFloat(worker.Config.Tracing))
x.Conf.Set("num_pending_proposals", newInt(worker.Config.NumPendingProposals))
x.Conf.Set("expand_edge", newIntFromBool(worker.Config.ExpandEdge))
x.Conf.Set("tracing", newFloat(x.WorkerConfig.Tracing))
x.Conf.Set("num_pending_proposals", newInt(x.WorkerConfig.NumPendingProposals))
x.Conf.Set("expand_edge", newIntFromBool(x.WorkerConfig.ExpandEdge))
}

func SetConfiguration(newConfig Options) {
Expand Down
65 changes: 7 additions & 58 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,20 +445,6 @@ func deleteEntries(prefix []byte, remove func(key []byte) bool) error {
})
}

func deleteAllEntries(prefix []byte) error {
return deleteEntries(prefix, func(key []byte) bool {
return true
})
}

// deleteAllTokens deletes the index for the given attribute. All tokenizers are
// used by this function.
func deleteAllTokens(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.IndexPrefix()
return deleteAllEntries(prefix)
}

// deleteTokensFor deletes the index for the given attribute and token.
func deleteTokensFor(attr, tokenizerName string) error {
pk := x.ParsedKey{Attr: attr}
Expand All @@ -469,22 +455,21 @@ func deleteTokensFor(attr, tokenizerName string) error {
}
prefix = append(prefix, tokenizer.Identifier())

return deleteAllEntries(prefix)
return pstore.DropPrefix(prefix)
}

func deleteReverseEdges(attr string) error {
pk := x.ParsedKey{Attr: attr}
prefix := pk.ReversePrefix()
return deleteAllEntries(prefix)
return pstore.DropPrefix(prefix)
}

func deleteCountIndex(attr string) error {
pk := x.ParsedKey{Attr: attr}
if err := deleteAllEntries(pk.CountPrefix(false)); err != nil {
if err := pstore.DropPrefix(pk.CountPrefix(false)); err != nil {
return err
}

return deleteAllEntries(pk.CountPrefix(true))
return pstore.DropPrefix(pk.CountPrefix(true))
}

// Index rebuilding logic here.
Expand Down Expand Up @@ -983,52 +968,16 @@ func rebuildListType(ctx context.Context, rb *IndexRebuild) error {

// DeleteAll deletes all entries in the posting list.
func DeleteAll() error {
return deleteEntries(nil, func(key []byte) bool {
pk := x.Parse(key)
if pk == nil {
return true
} else if pk.IsSchema() {
// Don't delete schema for _predicate_
return !x.IsReservedPredicate(pk.Attr)
}
return true
})
return pstore.DropAll()
}

// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string) error {
glog.Infof("Dropping predicate: [%s]", attr)
pk := x.ParsedKey{
Attr: attr,
}
prefix := pk.DataPrefix()
// Delete all data postings for the given predicate.
err := deleteEntries(prefix, func(key []byte) bool {
return true
})
if err != nil {
prefix := x.PredicatePrefix(attr)
if err := pstore.DropPrefix(prefix); err != nil {
return err
}

// TODO - We will still have the predicate present in <uid, _predicate_> posting lists.
indexed := schema.State().IsIndexed(attr)
reversed := schema.State().IsReversed(attr)
if indexed {
if err := deleteAllTokens(attr); err != nil {
return err
}
} else if reversed {
if err := deleteReverseEdges(attr); err != nil {
return err
}
}

hasCountIndex := schema.State().HasCount(attr)
if hasCountIndex {
if err := deleteCountIndex(attr); err != nil {
return err
}
}

return schema.State().Delete(attr)
}
9 changes: 9 additions & 0 deletions query/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func setSchema(schema string) {
}
}

func dropPredicate(pred string) {
err := client.Alter(context.Background(), &api.Operation{
DropAttr: pred,
})
if err != nil {
panic(fmt.Sprintf("Could not drop predicate. Got error %v", err.Error()))
}
}

func processQuery(t *testing.T, ctx context.Context, query string) (string, error) {
txn := client.NewTxn()
defer txn.Discard(ctx)
Expand Down
4 changes: 2 additions & 2 deletions query/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ import (
)

func ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) {
if worker.Config.ExpandEdge {
if x.WorkerConfig.ExpandEdge {
edges, err := expandEdges(ctx, m)
if err != nil {
return nil, x.Wrapf(err, "While adding pb.edges")
}
m.Edges = edges
} else {
for _, mu := range m.Edges {
if mu.Attr == x.Star && !worker.Config.ExpandEdge {
if mu.Attr == x.Star && !x.WorkerConfig.ExpandEdge {
return nil, x.Errorf("Expand edge (--expand_edge) is set to false." +
" Cannot perform S * * deletion.")
}
Expand Down
4 changes: 2 additions & 2 deletions query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,7 +1852,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
for i := 0; i < len(sg.Children); i++ {
child := sg.Children[i]

if !worker.Config.ExpandEdge && child.Attr == "_predicate_" {
if !x.WorkerConfig.ExpandEdge && child.Attr == "_predicate_" {
return out,
x.Errorf("Cannot ask for _predicate_ when ExpandEdge(--expand_edge) is false.")
}
Expand All @@ -1862,7 +1862,7 @@ func expandSubgraph(ctx context.Context, sg *SubGraph) ([]*SubGraph, error) {
continue
}

if !worker.Config.ExpandEdge {
if !x.WorkerConfig.ExpandEdge {
return out,
x.Errorf("Cannot run expand() query when ExpandEdge(--expand_edge) is false.")
}
Expand Down
Loading