Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various bug fixes #2457

Merged
merged 14 commits into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 37 additions & 8 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,31 @@ type Node struct {
messages chan sendmsg
RaftContext *intern.RaftContext
Store *raftwal.DiskStorage
Rand *rand.Rand

// applied is used to keep track of the applied RAFT proposals.
// The stages are proposed -> committed (accepted by cluster) ->
// applied (to PL) -> synced (to BadgerDB).
Applied x.WaterMark
}

type lockedSource struct {
lk sync.Mutex
src rand.Source
}

func (r *lockedSource) Int63() int64 {
r.lk.Lock()
defer r.lk.Unlock()
return r.src.Int63()
}

func (r *lockedSource) Seed(seed int64) {
r.lk.Lock()
defer r.lk.Unlock()
r.src.Seed(seed)
}

func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
n := &Node{
Id: rc.Id,
Expand Down Expand Up @@ -85,6 +103,7 @@ func NewNode(rc *intern.RaftContext, store *raftwal.DiskStorage) *Node {
RaftContext: rc,
messages: make(chan sendmsg, 100),
Applied: x.WaterMark{Name: fmt.Sprintf("Applied watermark")},
Rand: rand.New(&lockedSource{src: rand.NewSource(time.Now().UnixNano())}),
}
n.Applied.Init()
// TODO: n_ = n is a hack. We should properly init node, and make it part of the server struct.
Expand Down Expand Up @@ -167,12 +186,21 @@ func (n *Node) Send(m raftpb.Message) {
x.AssertTruef(n.Id != m.To, "Sending message to itself")
data, err := m.Marshal()
x.Check(err)
select {
case n.messages <- sendmsg{to: m.To, data: data}:
// pass
default:
// ignore
}

// As long as leadership is stable, any attempted Propose() calls should be reflected in the
// next raft.Ready.Messages. Leaders will send MsgApps to the followers; followers will send
// MsgProp to the leader. It is up to the transport layer to get those messages to their
// destination. If a MsgApp gets dropped by the transport layer, it will get retried by raft
// (i.e. it will appear in a future Ready.Messages), but MsgProp will only be sent once. During
// leadership transitions, proposals may get dropped even if the network is reliable.
//
// We can't do a select default here. The messages must be sent to the channel, otherwise we
// should block until the channel can accept these messages. BatchAndSendMessages would take
// care of dropping messages which can't be sent due to network issues to the corresponding
// node. But, we shouldn't take the liberty to do that here. It would take us more time to
// repropose these dropped messages anyway, than to block here a bit waiting for the messages
// channel to clear out.
n.messages <- sendmsg{to: m.To, data: data}
}

func (n *Node) Snapshot() (raftpb.Snapshot, error) {
Expand Down Expand Up @@ -270,7 +298,8 @@ func (n *Node) BatchAndSendMessages() {
if exists := failedConn[to]; !exists {
// So that we print error only the first time we are not able to connect.
// Otherwise, the log is polluted with multiple errors.
x.Printf("No healthy connection found to node Id: %d, err: %v\n", to, err)
x.Printf("No healthy connection found to node Id: %d addr: [%s], err: %v\n",
to, addr, err)
failedConn[to] = true
}
continue
Expand All @@ -286,7 +315,7 @@ func (n *Node) BatchAndSendMessages() {
}

func (n *Node) doSendMessage(pool *Pool, data []byte) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

client := pool.Get()
Expand Down
11 changes: 7 additions & 4 deletions contrib/integration/acctupsert/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ func main() {
flag.Parse()
c := newClient()
setup(c)
fmt.Println("Doing upserts")
doUpserts(c)
fmt.Println("Checking integrity")
checkIntegrity(c)
}

Expand Down Expand Up @@ -111,17 +113,18 @@ var (
func upsert(c *dgo.Dgraph, acc account) {
for {
if time.Since(lastStatus) > 100*time.Millisecond {
fmt.Printf("Success: %d Retries: %d\n",
fmt.Printf("[%s] Success: %d Retries: %d\n", time.Now().Format(time.Stamp),
atomic.LoadUint64(&successCount), atomic.LoadUint64(&retryCount))
lastStatus = time.Now()
}
err := tryUpsert(c, acc)
if err == nil {
atomic.AddUint64(&successCount, 1)
return
}
if err != y.ErrAborted {
x.Check(err)
} else if err == y.ErrAborted {
// pass
} else {
fmt.Errorf("ERROR: %v", err)
}
atomic.AddUint64(&retryCount, 1)
}
Expand Down
5 changes: 4 additions & 1 deletion contrib/scripts/functions.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ sleepTime=11
function runCluster {
basedir=$GOPATH/src/github.com/dgraph-io/dgraph
pushd $basedir/dgraph
sudo rm -Rf /tmp/dg
sudo mkdir /tmp/dg
go build . && go install . && md5sum dgraph $GOPATH/bin/dgraph
docker-compose up --force-recreate --remove-orphans --detach
DATA="/tmp/dg" docker-compose up --force-recreate --remove-orphans --detach
popd
$basedir/contrib/wait-for-it.sh localhost:6080
$basedir/contrib/wait-for-it.sh localhost:9180
sleep 10 # Sleep 10 seconds to get things ready.
}
4 changes: 4 additions & 0 deletions dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func init() {
RootCmd.PersistentFlags().String("config", "",
"Configuration file. Takes precedence over default values, but is "+
"overridden to values set with environment variables and flags.")
RootCmd.PersistentFlags().Bool("bindall", true,
"Use 0.0.0.0 instead of localhost to bind to all addresses on local machine.")
RootCmd.PersistentFlags().Bool("expose_trace", false,
"Allow trace endpoint to be accessible from remote")
rootConf.BindPFlags(RootCmd.PersistentFlags())

var subcommands = []*x.SubCommand{
Expand Down
97 changes: 46 additions & 51 deletions dgraph/cmd/server/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"sort"
Expand All @@ -28,31 +29,24 @@ type res struct {
Extensions *query.Extensions `json:"extensions,omitempty"`
}

var addr = "http://localhost:8180"

func queryWithTs(q string, ts uint64) (string, uint64, error) {
url := "/query"
url := addr + "/query"
if ts != 0 {
url += "/" + strconv.FormatUint(ts, 10)
}
req, err := http.NewRequest("POST", url, bytes.NewBufferString(q))
if err != nil {
return "", 0, err
}
rr := httptest.NewRecorder()
handler := http.HandlerFunc(queryHandler)
handler.ServeHTTP(rr, req)

if status := rr.Code; status != http.StatusOK {
return "", 0, fmt.Errorf("Unexpected status code: %v", status)
}

var qr x.QueryResWithData
json.Unmarshal(rr.Body.Bytes(), &qr)
if len(qr.Errors) > 0 {
return "", 0, errors.New(qr.Errors[0].Message)
_, body, err := runRequest(req)
if err != nil {
return "", 0, err
}

var r res
x.Check(json.Unmarshal(rr.Body.Bytes(), &r))
x.Check(json.Unmarshal(body, &r))
startTs := r.Extensions.Txn.StartTs

// Remove the extensions.
Expand All @@ -66,7 +60,7 @@ func queryWithTs(q string, ts uint64) (string, uint64, error) {

func mutationWithTs(m string, isJson bool, commitNow bool, ignoreIndexConflict bool,
ts uint64) ([]string, uint64, error) {
url := "/mutate"
url := addr + "/mutate"
if ts != 0 {
url += "/" + strconv.FormatUint(ts, 10)
}
Expand All @@ -82,28 +76,44 @@ func mutationWithTs(m string, isJson bool, commitNow bool, ignoreIndexConflict b
if commitNow {
req.Header.Set("X-Dgraph-CommitNow", "true")
}
rr := httptest.NewRecorder()
handler := http.HandlerFunc(mutationHandler)
handler.ServeHTTP(rr, req)

if status := rr.Code; status != http.StatusOK {
return keys, 0, fmt.Errorf("Unexpected status code: %v", status)
}
var qr x.QueryResWithData
json.Unmarshal(rr.Body.Bytes(), &qr)
if len(qr.Errors) > 0 {
return keys, 0, errors.New(qr.Errors[0].Message)
_, body, err := runRequest(req)
if err != nil {
return keys, 0, err
}

var r res
x.Check(json.Unmarshal(rr.Body.Bytes(), &r))
x.Check(json.Unmarshal(body, &r))
startTs := r.Extensions.Txn.StartTs

return r.Extensions.Txn.Keys, startTs, nil
}

func runRequest(req *http.Request) (*x.QueryResWithData, []byte, error) {
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, nil, err
}
if status := resp.StatusCode; status != http.StatusOK {
return nil, nil, fmt.Errorf("Unexpected status code: %v", status)
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, nil, err
}

qr := new(x.QueryResWithData)
json.Unmarshal(body, qr) // Don't check error.
if len(qr.Errors) > 0 {
return nil, nil, errors.New(qr.Errors[0].Message)
}
return qr, body, nil
}

func commitWithTs(keys []string, ts uint64) error {
url := "/commit"
url := addr + "/commit"
if ts != 0 {
url += "/" + strconv.FormatUint(ts, 10)
}
Expand All @@ -116,22 +126,8 @@ func commitWithTs(keys []string, ts uint64) error {
if err != nil {
return err
}

rr := httptest.NewRecorder()
handler := http.HandlerFunc(commitHandler)
handler.ServeHTTP(rr, req)

if status := rr.Code; status != http.StatusOK {
return fmt.Errorf("Unexpected status code: %v", status)
}

var qr x.QueryResWithData
json.Unmarshal(rr.Body.Bytes(), &qr)
if len(qr.Errors) > 0 {
return errors.New(qr.Errors[0].Message)
}

return nil
_, _, err = runRequest(req)
return err
}

func TestTransactionBasic(t *testing.T) {
Expand All @@ -141,7 +137,6 @@ func TestTransactionBasic(t *testing.T) {
q1 := `
{
balances(func: anyofterms(name, "Alice Bob")) {
uid
name
balance
}
Expand All @@ -153,10 +148,10 @@ func TestTransactionBasic(t *testing.T) {
m1 := `
{
set {
<0x1> <name> "Alice" .
<0x1> <name> "Bob" .
<0x1> <balance> "110" .
<0x2> <balance> "60" .
_:alice <name> "Alice" .
_:alice <name> "Bob" .
_:alice <balance> "110" .
_:bob <balance> "60" .
}
}
`
Expand All @@ -174,13 +169,13 @@ func TestTransactionBasic(t *testing.T) {
// Query with same timestamp.
data, _, err = queryWithTs(q1, ts)
require.NoError(t, err)
require.Equal(t, `{"data":{"balances":[{"uid":"0x1","name":"Bob","balance":"110"}]}}`, data)
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)

// Commit and query.
require.NoError(t, commitWithTs(keys, ts))
data, _, err = queryWithTs(q1, 0)
require.NoError(t, err)
require.Equal(t, `{"data":{"balances":[{"uid":"0x1","name":"Bob","balance":"110"}]}}`, data)
require.Equal(t, `{"data":{"balances":[{"name":"Bob","balance":"110"}]}}`, data)
}

func TestAlterAllFieldsShouldBeSet(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions dgraph/cmd/server/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ func init() {
flag.IntP("port_offset", "o", 0,
"Value added to all listening port numbers. [Internal=7080, HTTP=8080, Grpc=9080]")

flag.Bool("bindall", true,
"Use 0.0.0.0 instead of localhost to bind to all addresses on local machine.")
flag.Bool("expose_trace", false,
"Allow trace endpoint to be accessible from remote")

flag.Uint64("query_edge_limit", 1e6,
"Limit for the maximum number of edges that can be returned in a query."+
" This is only useful for shortest path queries.")
Expand Down
Loading