Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(GraphQL): This PR fixes issue of multiple responses in a subscription for an update. #6868

Merged
merged 4 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions graphql/e2e/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,3 +950,125 @@ func TestSubscriptionAuthHeaderCaseInsensitive(t *testing.T) {
require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`,
string(resp.Data))
}

func TestSubscriptionAuth_MultiSubscriptionResponses(t *testing.T) {
dg, err := testutil.DgraphClient(groupOnegRPC)
require.NoError(t, err)
testutil.DropAll(t, dg)

// Upload schema
add := &common.GraphQLParams{
Query: `mutation updateGQLSchema($sch: String!) {
updateGQLSchema(input: { set: { schema: $sch }}) {
gqlSchema {
schema
}
}
}`,
Variables: map[string]interface{}{"sch": schAuth},
}
addResult := add.ExecuteAsPost(t, adminEndpoint)
require.Nil(t, addResult.Errors)
time.Sleep(time.Second * 2)

metaInfo := &testutil.AuthMeta{
PublicKey: "secret",
Namespace: "https://dgraph.io",
Algo: "HS256",
Header: "Authorization",
}
metaInfo.AuthVars = map[string]interface{}{
"USER": "jatin",
"ROLE": "USER",
}

jwtToken, err := metaInfo.GetSignedToken("secret", 5*time.Second)
require.NoError(t, err)

// first subscription
payload := fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken)
subscriptionClient, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{
Query: `subscription{
queryTodo{
owner
text
}
}`,
}, payload)
require.Nil(t, err)

res, err := subscriptionClient.RecvMsg()
require.NoError(t, err)

var resp common.GraphQLResponse
err = json.Unmarshal(res, &resp)
require.NoError(t, err)

require.Nil(t, resp.Errors)
require.JSONEq(t, `{"queryTodo":[]}`,
string(resp.Data))
// Terminate subscription and wait for 1 second before starting new subscription
subscriptionClient.Terminate()
time.Sleep(time.Second)

jwtToken1, err := metaInfo.GetSignedToken("secret", 5*time.Second)
require.NoError(t, err)

// Second subscription
payload = fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken1)
subscriptionClient1, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{
Query: `subscription{
queryTodo{
owner
text
}
}`,
}, payload)
require.Nil(t, err)

res, err = subscriptionClient1.RecvMsg()
require.NoError(t, err)

err = json.Unmarshal(res, &resp)
require.NoError(t, err)

require.Nil(t, resp.Errors)
require.JSONEq(t, `{"queryTodo":[]}`,
string(resp.Data))

// for user jatin
add = &common.GraphQLParams{
Query: `mutation{
addTodo(input: [
{text : "GraphQL is exciting!!",
owner : "jatin"}
])
{
todo{
text
owner
}
}
}`,
}

addResult = add.ExecuteAsPost(t, graphQLEndpoint)
require.Nil(t, addResult.Errors)
time.Sleep(time.Second)

// 1st response
res, err = subscriptionClient1.RecvMsg()
require.NoError(t, err)
err = json.Unmarshal(res, &resp)
require.NoError(t, err)

require.Nil(t, resp.Errors)
require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`,
string(resp.Data))

// second response should be nil
res, err = subscriptionClient1.RecvMsg()
require.NoError(t, err)
require.Nil(t, res)
subscriptionClient1.Terminate()
}
13 changes: 7 additions & 6 deletions graphql/subscription/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ func (p *Poller) AddSubscriber(
expiry: customClaims.StandardClaims.ExpiresAt.Time, updateCh: updateCh}
p.pollRegistry[bucketID] = subscriptions

if len(subscriptions) != 1 {
// Already there is subscription for this bucket. So,no need to poll the server. We can
// use the existing polling routine to publish the update.

if ok {
// Already there is a running go routine for this bucket. So,no need to poll the server.
// We can use the existing polling routine to publish the update.
return &SubscriberResponse{
BucketID: bucketID,
SubscriptionID: subscriptionID,
Expand Down Expand Up @@ -176,11 +175,12 @@ func (p *Poller) poll(req *pollRequest) {
// Don't update if there is no change in response.
continue
}
// Every thirty poll. We'll check there is any active subscription for the
// current poll. If not we'll terminate this poll.
// Every second poll, we'll check if there is any active subscription for the
// current goroutine. If not we'll terminate this poll.
p.Lock()
subscribers, ok := p.pollRegistry[req.bucketID]
if !ok || len(subscribers) == 0 {
delete(p.pollRegistry, req.bucketID)
p.Unlock()
return
}
Expand All @@ -200,6 +200,7 @@ func (p *Poller) poll(req *pollRequest) {
if !ok || len(subscribers) == 0 {
// There is no subscribers to push the update. So, kill the current polling
// go routine.
delete(p.pollRegistry, req.bucketID)
p.Unlock()
return
}
Expand Down