Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into feature/embedded
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomasz Zdybał committed Jul 11, 2017
2 parents a8b69e3 + 8ecb978 commit 35ffe62
Show file tree
Hide file tree
Showing 24 changed files with 362 additions and 278 deletions.
2 changes: 1 addition & 1 deletion client/mutations.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (d *Dgraph) NodeXid(xid string, storeXid bool) (Node, error) {
}
n := Node{uid: uid}
if storeXid && isNew {
e := n.Edge("_xid_")
e := n.Edge("xid")
x.Check(e.SetValueString(xid))
d.BatchSet(e)
}
Expand Down
14 changes: 13 additions & 1 deletion cmd/dgraphloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ import (
"google.golang.org/grpc/credentials"

"github.com/dgraph-io/dgraph/client"
"github.com/dgraph-io/dgraph/protos"
"github.com/dgraph-io/dgraph/rdf"
"github.com/dgraph-io/dgraph/schema"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"

"github.com/pkg/profile"
Expand All @@ -40,7 +42,7 @@ var (
dgraph = flag.String("d", "127.0.0.1:9080", "Dgraph gRPC server address")
concurrent = flag.Int("c", 100, "Number of concurrent requests to make to Dgraph")
numRdf = flag.Int("m", 1000, "Number of RDF N-Quads to send as part of a mutation.")
storeXid = flag.Bool("x", false, "Store xids by adding corresponding _xid_ edges")
storeXid = flag.Bool("x", false, "Store xids by adding corresponding xid edges")
mode = flag.String("profile.mode", "", "enable profiling mode, one of [cpu, mem, mutex, block]")
clientDir = flag.String("cd", "c", "Directory to store xid to uid mapping")
blockRate = flag.Int("block", 0, "Block profiling rate")
Expand Down Expand Up @@ -242,6 +244,16 @@ func main() {

filesList := strings.Split(*files, ",")
x.AssertTrue(len(filesList) > 0)
if *storeXid {
if err := dgraphClient.AddSchema(protos.SchemaUpdate{
Predicate: "xid",
ValueType: uint32(types.StringID),
Tokenizer: []string{"hash"},
Directive: protos.SchemaUpdate_INDEX,
}); err != nil {
log.Fatal("While adding schema to batch ", err)
}
}
if len(*schemaFile) > 0 {
processSchemaFile(*schemaFile, dgraphClient)
}
Expand Down
2 changes: 1 addition & 1 deletion contrib/freebase/spielberg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func TestSpielberg(t *testing.T) {
q := `
{
me(func:eq(_xid_,"m.06pj8")) {
me(func:eq(xid,"m.06pj8")) {
name@en
director.film (first: 4, orderasc: initial_release_date) {
name@en
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/allof_the.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film @filter(allofterms(name, "the")) {
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/allof_the_a.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film @filter(allofterms(name, "the") and allofterms(name, "a")) {
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/allof_the_first.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film(first: 10) @filter(allofterms(name, "the")) {
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/basic.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film {
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/releasedate.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film {
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/releasedate_geq.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film @filter(ge(initial_release_date, "2000-01-05")) {
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/releasedate_sort.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film(orderasc: initial_release_date) {
Expand Down
2 changes: 1 addition & 1 deletion contrib/indextest/releasedate_sort_first_offset.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
me(func:eq(_xid_,"m.07mv_h")) {
me(func:eq(xid,"m.07mv_h")) {
director.film {
directed_by {
director.film(orderasc: initial_release_date, first: 5, offset: 10) {
Expand Down
6 changes: 3 additions & 3 deletions gql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,11 +482,11 @@ func Parse(r Request) (res Result, rerr error) {
return res, err
}

l := lex.Lexer{Input: query}
l.Run(lexTopLevel)
lexer := lex.Lexer{Input: query}
lexer.Run(lexTopLevel)

var qu *GraphQuery
it := l.NewIterator()
it := lexer.NewIterator()
fmap := make(fragmentMap)
for it.Next() {
item := it.Item()
Expand Down
42 changes: 22 additions & 20 deletions rdf/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,24 +321,26 @@ func isNewline(r rune) bool {
}

var typeMap = map[string]types.TypeID{
"xs:string": types.StringID,
"xs:date": types.DateTimeID,
"xs:dateTime": types.DateTimeID,
"xs:int": types.IntID,
"xs:boolean": types.BoolID,
"xs:double": types.FloatID,
"xs:float": types.FloatID,
"xs:base64Binary": types.BinaryID,
"geo:geojson": types.GeoID,
"pwd:password": types.PasswordID,
"http://www.w3.org/2001/XMLSchema#string": types.StringID,
"http://www.w3.org/2001/XMLSchema#dateTime": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#date": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#int": types.IntID,
"http://www.w3.org/2001/XMLSchema#integer": types.IntID,
"http://www.w3.org/2001/XMLSchema#boolean": types.BoolID,
"http://www.w3.org/2001/XMLSchema#double": types.FloatID,
"http://www.w3.org/2001/XMLSchema#float": types.FloatID,
"http://www.w3.org/2001/XMLSchema#gYear": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#gYearMonth": types.DateTimeID,
"xs:string": types.StringID,
"xs:date": types.DateTimeID,
"xs:dateTime": types.DateTimeID,
"xs:int": types.IntID,
"xs:positiveInteger": types.IntID,
"xs:boolean": types.BoolID,
"xs:double": types.FloatID,
"xs:float": types.FloatID,
"xs:base64Binary": types.BinaryID,
"geo:geojson": types.GeoID,
"pwd:password": types.PasswordID,
"http://www.w3.org/2001/XMLSchema#string": types.StringID,
"http://www.w3.org/2001/XMLSchema#dateTime": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#date": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#int": types.IntID,
"http://www.w3.org/2001/XMLSchema#positiveInteger": types.IntID,
"http://www.w3.org/2001/XMLSchema#integer": types.IntID,
"http://www.w3.org/2001/XMLSchema#boolean": types.BoolID,
"http://www.w3.org/2001/XMLSchema#double": types.FloatID,
"http://www.w3.org/2001/XMLSchema#float": types.FloatID,
"http://www.w3.org/2001/XMLSchema#gYear": types.DateTimeID,
"http://www.w3.org/2001/XMLSchema#gYearMonth": types.DateTimeID,
}
6 changes: 3 additions & 3 deletions worker/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func AssignUidsOverNetwork(ctx context.Context, num *protos.Num) (*protos.Assign
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Not leader of group: %d. Sending to: %d", leaseGid, lid)
}
p := pools().get(addr)
conn, err := p.Get()
p, err := pools().get(addr)
if err != nil {
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Error while retrieving connection: %+v", err)
}
return &emptyAssignedIds, err
}
defer p.Put(conn)
defer pools().release(p)
conn := p.Get()
if tr, ok := trace.FromContext(ctx); ok {
tr.LazyPrintf("Calling AssignUids for group: %d, addr: %s", leaseGid, addr)
}
Expand Down
154 changes: 83 additions & 71 deletions worker/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"

"github.com/dgraph-io/dgraph/protos"
"github.com/dgraph-io/dgraph/x"
Expand All @@ -35,11 +36,16 @@ var (
errNoConnection = fmt.Errorf("No connection exists")
)

// Pool is used to manage the grpc client connections for communicating with
// other worker instances.
// "pool" is used to manage the grpc client connection(s) for communicating with other
// worker instances. Right now it just holds one of them.
type pool struct {
conns chan *grpc.ClientConn
Addr string
// A "pool" now consists of one connection. gRPC uses HTTP2 transport to combine
// messages in the same TCP stream.
conn *grpc.ClientConn

Addr string
// Requires a lock on poolsi.
refcount int64
}

type poolsi struct {
Expand All @@ -58,104 +64,110 @@ func pools() *poolsi {
return pi
}

func (p *poolsi) any() *pool {
func (p *poolsi) any() (*pool, error) {
p.RLock()
defer p.RUnlock()
for _, pool := range p.all {
return pool
pool.AddOwner()
return pool, nil
}
return nil
return nil, errNoConnection
}

func (p *poolsi) get(addr string) *pool {
func (p *poolsi) get(addr string) (*pool, error) {
p.RLock()
defer p.RUnlock()
pool, _ := p.all[addr]
return pool
pool, ok := p.all[addr]
if !ok {
return nil, errNoConnection
}
pool.AddOwner()
return pool, nil
}

func (p *poolsi) connect(addr string) {
// Returns a pool that you should call put() on.
func (p *poolsi) connect(addr string) (*pool, bool) {
if addr == Config.MyAddr {
return
return nil, false
}
p.RLock()
_, has := p.all[addr]
existingPool, has := p.all[addr]
if has {
p.RUnlock()
existingPool.AddOwner()
return existingPool, true
}
p.RUnlock()

pool, err := newPool(addr)
// TODO: Rename newPool to newConn, rename pool.
// TODO: This can get triggered with totally bogus config.
x.Checkf(err, "Unable to connect to host %s", addr)

p.Lock()
existingPool, has = p.all[addr]
if has {
return
p.Unlock()
destroyPool(pool)
existingPool.refcount++
return existingPool, true
}
p.all[addr] = pool
pool.AddOwner() // matches p.put() run by caller
p.Unlock()

// No need to block this thread just to print some messages.
pool.AddOwner() // matches p.put() in goroutine
go func() {
defer p.release(pool)
err = TestConnection(pool)
if err != nil {
log.Printf("Connection to %q fails, got error: %v\n", addr, err)
// Don't return -- let's still put the empty pool in the map. Its users
// have to handle errors later anyway.
} else {
fmt.Printf("Connection with %q healthy.\n", addr)
}
}()

return pool, true
}

// TestConnection tests if we can run an Echo query on a connection.
func TestConnection(p *pool) error {
conn := p.Get()

pool := newPool(addr, 5)
query := new(protos.Payload)
query.Data = make([]byte, 10)
x.Check2(rand.Read(query.Data))

conn, err := pool.Get()
x.Checkf(err, "Unable to connect")

c := protos.NewWorkerClient(conn)
resp, err := c.Echo(context.Background(), query)
if err != nil {
log.Printf("While trying to connect to %q, got error: %v\n", addr, err)
// Don't return -- let's still put the empty pool in the map. Its users
// have to handle errors later anyway.
} else {
x.AssertTrue(bytes.Equal(resp.Data, query.Data))
x.Check(pool.Put(conn))
fmt.Printf("Connection with %q successful.\n", addr)
return err
}

p.Lock()
defer p.Unlock()
_, has = p.all[addr]
if has {
return
}
p.all[addr] = pool
// If a server is sending bad echos, do we have to freak out and die?
x.AssertTruef(bytes.Equal(resp.Data, query.Data),
"non-matching Echo response value from %v", p.Addr)
return nil
}

// NewPool initializes an instance of Pool which is used to connect with other
// workers. The pool instance also has a buffered channel,conn with capacity
// maxCap that stores the connections.
func newPool(addr string, maxCap int) *pool {
p := new(pool)
p.Addr = addr
p.conns = make(chan *grpc.ClientConn, maxCap)
conn, err := p.dialNew()
// NewPool creates a new "pool" with one gRPC connection, refcount 0.
func newPool(addr string) (*pool, error) {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
log.Fatal(err)
return nil
return nil, err
}
p.conns <- conn
return p
// The pool hasn't been added to poolsi yet, so it gets no refcount.
return &pool{conn: conn, Addr: addr, refcount: 0}, nil
}

func (p *pool) dialNew() (*grpc.ClientConn, error) {
return grpc.Dial(p.Addr, grpc.WithInsecure())
// Get returns the connection to use from the pool of connections.
func (p *pool) Get() *grpc.ClientConn {
return p.conn
}

// Get returns a connection from the pool of connections or a new connection if
// the pool is empty.
func (p *pool) Get() (*grpc.ClientConn, error) {
if p == nil {
return nil, errNoConnection
}

select {
case conn := <-p.conns:
return conn, nil
default:
return p.dialNew()
}
}

// Put returns a connection to the pool or closes and discards the connection
// incase the pool channel is at capacity.
func (p *pool) Put(conn *grpc.ClientConn) error {
select {
case p.conns <- conn:
return nil
default:
return conn.Close()
}
// AddOwner adds 1 to the refcount for the pool (atomically).
func (p *pool) AddOwner() {
atomic.AddInt64(&p.refcount, 1)
}
Loading

0 comments on commit 35ffe62

Please sign in to comment.