Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing hooks and tracing for GraphQL operations #79

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 103 additions & 39 deletions src/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,44 @@ module.exports = function(mixinOptions) {
};
},
},
context: {
visibility: "private",
tracing: false,
handler(ctx) {
const { req, connection } = ctx.params;
let context = {
dataLoaders: new Map(), // create an empty map to load DataLoader instances into
};

if (req) {
context = {
...context,
ctx: req.$ctx,
service: req.$service,
params: req.$params,
};
} else if (connection) {
context = {
...context,
ctx,
connectionCtx: connection.context.$ctx,
service: connection.context.$service,
params: connection.context.$params,
};
} else {
throw new Error("Unrecognized request type for context action");
}

return context;
},
},
actionOptions: {
visibility: "private",
tracing: false,
handler() {
return {};
},
},
},

events: {
Expand Down Expand Up @@ -95,7 +133,7 @@ module.exports = function(mixinOptions) {

if (service.version != null)
return (
(typeof service.version == "number"
(typeof service.version === "number"
? "v" + service.version
: service.version) +
"." +
Expand Down Expand Up @@ -155,10 +193,18 @@ module.exports = function(mixinOptions) {
params: staticParams = {},
rootParams = {},
fileUploadArg = null,
fieldName = "",
typeName = "",
} = def;
const rootKeys = Object.keys(rootParams);

return async (root, args, context) => {
// Record the span if possible
let operationSpan;
if (context.ctx)
operationSpan = context.ctx.startSpan(`GQL ${typeName} ${fieldName}`);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icebob I am unfamiliar with how best to approach setting up spans for tracing, so I'm not sure if this is an appropriate pattern. Can you weigh in?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method is correct but it would be good to add the typeName and fieldName into tags, as well.

let result;

try {
if (useDataLoader) {
const dataLoaderMapKey = this.getDataLoaderMapKey(
Expand Down Expand Up @@ -205,33 +251,41 @@ module.exports = function(mixinOptions) {
}

if (dataLoaderKey == null) {
return null;
result = null;
} else {
result = Array.isArray(dataLoaderKey)
? await dataLoader.loadMany(dataLoaderKey)
: await dataLoader.load(dataLoaderKey);
}

return Array.isArray(dataLoaderKey)
? await dataLoader.loadMany(dataLoaderKey)
: await dataLoader.load(dataLoaderKey);
} else if (fileUploadArg != null && args[fileUploadArg] != null) {
if (Array.isArray(args[fileUploadArg])) {
return await Promise.all(
result = await Promise.all(
args[fileUploadArg].map(async uploadPromise => {
const {
createReadStream,
...$fileInfo
} = await uploadPromise;
const stream = createReadStream();
return context.ctx.call(actionName, stream, {
meta: { $fileInfo },
});
return context.ctx.call(
actionName,
stream,
{ meta: { $fileInfo } },
await this.actions.actionOptions(root, args, context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this pattern be changed to only have the call to this.actions.actionOptions a single time and assign its result to a variable that is used for all of the calls?

One other note on these options is it seems like it would need to apply to the DataLoader callback as well, so the options should probably be passed to that function and added to the call there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right.

As for DataLoader, you're probably correct there as well, I don't know much about DataLoader, so I'll have to take a look.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So... I'm trying to make the change mentioned... but for some reason, calling this.actions.actionOptions earlier is magically adding {timeout: 0} to any existing params. The only change is me moving the call earlier. Does this make sense to anyone?

);
})
);
} else {
const { createReadStream, ...$fileInfo } = await args[
fileUploadArg
];
const stream = createReadStream();
result = await context.ctx.call(
actionName,
stream,
{ meta: { $fileInfo } },
await this.actions.actionOptions(root, args, context)
);
}

const { createReadStream, ...$fileInfo } = await args[fileUploadArg];
const stream = createReadStream();
return await context.ctx.call(actionName, stream, {
meta: { $fileInfo },
});
} else {
const params = {};
if (root && rootKeys) {
Expand All @@ -240,12 +294,17 @@ module.exports = function(mixinOptions) {
});
}

return await context.ctx.call(
result = await context.ctx.call(
actionName,
_.defaultsDeep({}, args, params, staticParams)
_.defaultsDeep({}, args, params, staticParams),
{},
await this.actions.actionOptions(root, args, context)
);
}
if (operationSpan) operationSpan.finish();
return result;
} catch (err) {
if (operationSpan) operationSpan.finish();
if (nullIfError) {
return null;
}
Expand Down Expand Up @@ -324,17 +383,29 @@ module.exports = function(mixinOptions) {
subscribe: filter
? withFilter(
() => this.pubsub.asyncIterator(tags),
async (payload, params, { ctx }) =>
payload !== undefined
? ctx.call(filter, { ...params, payload })
: false
async (payload, params) => {
return payload !== undefined
? this.createAsyncIteratorContext().call(filter, {
...params,
payload,
})
: false;
}
)
: () => this.pubsub.asyncIterator(tags),
resolve: (payload, params, { ctx }) =>
ctx.call(actionName, { ...params, payload }),
resolve: (payload, params) => {
return this.createAsyncIteratorContext().call(actionName, {
...params,
payload,
});
},
};
},

createAsyncIteratorContext() {
return this.broker.ContextFactory.create(this.broker, null, {}, {});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems to imply that a fresh moleculer context is not being created earlier (previous comment), but is instead necessary to be created here. Is that correct?

If so, I have a few concerns. First, it does seem like the code in the server setup area is creating several instances of Context that it really cannot make use of. If they were being used then that would make sense, but if they're not it seems like a drag on connection/request time otherwise. If they are being used, then I'm not sure I'm understanding the reason to create a new Context here. Can you clarify for me?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My other concern here is that if we need to create a fresh moleculer context here, at action call time (and it can't be done in the server setup callback) then we probably have a (likely preexisting) issue with request to the DataLoader instances. If those instances are not being created for each new "request" (subscription, query, or mutation) then we're likely to run into caching issues on those instances. That is, repeated requests several hours later will still be using the same DataLoader cached results, which is not the prescribed "DataLoader per request" pattern.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason for the new context here is due to the fact that these are actions fired on subscription events. So not as a direct result of the Subscription operation, but created later as a result of a pubsub event. The problem I found with using the previous context (the one created with the initial the subscription operation) is particularly evident whenrequestTimeout is enabled (which I believe should almost always be the case). The way I was looking at this is that every event is a new action source, which should create a completely new context so that it could be handled by Moleculer as such. This way things like requestTimeout work intuitively.

I don't know that DataLoader should be involved at all here, am I mistaken?

},

/**
* Generate GraphQL Schema
*
Expand Down Expand Up @@ -442,7 +513,11 @@ module.exports = function(mixinOptions) {
const name = this.getFieldName(query);
queries.push(query);
resolver.Query[name] = this.createActionResolver(
action.name
action.name,
{
typeName: "query",
fieldName: name,
}
);
});
}
Expand All @@ -457,6 +532,8 @@ module.exports = function(mixinOptions) {
action.name,
{
fileUploadArg: def.fileUploadArg,
typeName: "mutation",
fieldName: name,
}
);
});
Expand Down Expand Up @@ -607,20 +684,7 @@ module.exports = function(mixinOptions) {
this.apolloServer = new ApolloServer({
schema,
..._.defaultsDeep({}, mixinOptions.serverOptions, {
context: ({ req, connection }) => ({
...(req
? {
ctx: req.$ctx,
service: req.$service,
params: req.$params,
}
: {
ctx: connection.context.$ctx,
service: connection.context.$service,
params: connection.context.$params,
}),
dataLoaders: new Map(), // create an empty map to load DataLoader instances into
}),
context: integrationContext => this.actions.context(integrationContext),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive my lack of knowledge with subscription behaviors, so I'll probably ask questions along the way here.

Does this specific change imply that a fresh moleculer context will be created each time a subscription does to resolve, or is this the moleculer context for the entire lifespan of the connected subscription?

As far as I can tell, the primary reason for the ws action was to generate a moleculer context at connection start and bind various data to it. Presumably, that context was for the entire lifespan of the connected subscription. I'm trying to understand if the generation of moleculer context is redundant. That is, could one or the other of these calls to this.actions.xxx be handled instead with a plain old method call, or is there a benefit to establishing new moleculer Context for each of these independently?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem, it's taking me a bit to really wrap my head around all this too, and I may be wrong, so I'm more than glad to do some mental gymnastics to make sure we get this right.

If you haven't caught up or had the opportunity recently to read my original issue You may want to do that because I go over my reasoning for a lot of this and the problems I'm trying to solve.

Apollo will create a new context for for every GQL operation. So you can actually connect, and do nothing. So the onConnect hook is important for people who want to guard that entry point. Once an operation, of any kind, is sent via WebSocket, or HTTP, then it will get a new context for that operation. We use WebSockets as our main messaging transport, we open the connection and reuse it not only for subscriptions, but also for all queries and mutations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so if I am understanding this correctly, the lifecycle is as follows:

  • onConnect runs when the client connects. This runs a single time for the entire subscription lifecycle.
  • A new Apollo Context is created each time a subscription publishes a new event, or a query/mutation is performed. This is represented in the context: integrationContext => this.actions.context(integrationContext) line.

So in each of these scenarios, an action call is being used which is establishing a new moleculer Context. In addition, this will allow consumers to wrap those actions with various moleculer lifecycle handlers if they so desire.

What I think I am still missing then is the need for the createAsyncIteratorContext method getting called. If a fresh Apollo context and a fresh moleculer context are both getting created during each operation, I'm guessing I'm misinterpreting what is considered an operation as it pertains to subscriptions and that does not include the publish event.

Assuming that I am misinterpreting this and a fresh moleculer context is needed each time a publish event is fired, I'm concerned about a few things:

  1. The DataLoader cache. So I'll fill in a little more here about what I'm concerned about here. DataLoader instances are lazily loaded into the Apollo Context as they become needed. That is, every time a unique combination of action, args, and staticParams is detected, a new DataLoader instance is added to the Map inside the Apollo Context. This allows operations to be deferred and batched to limit the total number of calls during child resolution, provided that the action being called complies with the rules of DataLoader (must accept an array, must return results sequenced with that array, the output array must be the same length of the input array). One thing DataLoader does is cache results for a given "id". Therefore, if you request the same id it will return the Promise that was bound to the previous time that "id" was requested. This limits the total number of ids that need to be called. However, this cache is intended to be short lived and essentially last for a single request's lifespan. If it lasted longer than that, the cache would grow stale and potentially be out of date. There are cache invalidation mechanisms in DataLoader, but they generally just advise to create new instances on each request rather than trying to engage with the cache invalidation. If the Apollo Context is long-lived for a subscription, then that would also imply that DataLoader instances are long lived, which would result in stale caches. If the Apollo Context is short-lived then that isn't a concern, but its likely a mechanism to clear the DataLoader Map in some way (potentially at publish event time?) would be needed if the Apollo Context isn't refreshed.
  2. The use of this.broker.ContextFactory.create to generate a new moleculer Context is, AFAIK, going to miss out on some things like the merging of meta. If the context or ws actions are enriching the moleculer Context with meta that is necessary for upstream actions, then that would cause an issue with those upstream operations expecting that meta to be present. Again, assuming that we need a new moleculer Context for each publish event, it might be preferable to make use of an action handler (similar to the context and ws actions) to generate the context so that it facilitates this merging properly. Alternatively, it appears to me that the Context.create method accepts an option for parentCtx, which it would utilize to do the merging of things like meta.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@NickClark I sent you a message on Discord to see if we can schedule some time to discuss this. It would probably be more efficient if we can do some interactive back and forth rather than trying to hash things out with this delay. Let me know if that works for you.

subscriptions: {
onConnect: (connectionParams, socket) =>
this.actions.ws({ connectionParams, socket }),
Expand Down
Loading