-
Notifications
You must be signed in to change notification settings - Fork 501
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added Stream support to ChangeFeedProcessor #888
Conversation
private ChangeFeedLeaseOptions changeFeedLeaseOptions; | ||
private ChangeFeedProcessorOptions changeFeedProcessorOptions; | ||
private DocumentServiceLeaseStoreManager documentServiceLeaseStoreManager; | ||
private bool initialized = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid a double initialization. opt for a private instance constructor and a public static CreateAsync
method. The member can then be made readonly
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not catching the intent. The code is a copy of ChangeFeedProcessorCore with only the delegate changing. The upstream call to the constructor is made in ContainerCore.GetChangeFeedProcessorBuilder (which is not async) and only started and stopped through the abstract base class. My reading of the original code was that the initialization was lazy.
this.observerFactory = observerFactory; | ||
} | ||
|
||
public void ApplyBuildConfiguration( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add these to the static constructor to avoid extra state changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (lease == null) | ||
throw new ArgumentNullException(nameof(lease)); | ||
|
||
var changeFeedObserver = this.observerFactory.CreateObserver(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please avoid var
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private readonly CheckpointFrequency checkpointFrequency; | ||
private readonly ChangeFeedObserver observer; | ||
private int processedDocCount; | ||
private DateTime lastCheckpointTime = DateTime.UtcNow; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please set this in the instance constructor instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
{ | ||
private readonly CheckpointFrequency checkpointFrequency; | ||
private readonly ChangeFeedObserver observer; | ||
private int processedDocCount; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe a long
will be more suitable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed...
{ | ||
private readonly PartitionCheckpointer checkpointer; | ||
|
||
internal ChangeFeedObserverContextCore(string leaseToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoid multiple instance constructors and instead have this one call the larger one with default values.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
this.LeaseToken = leaseToken; | ||
} | ||
|
||
internal ChangeFeedObserverContextCore(string leaseToken, ResponseMessage feedResponse, PartitionCheckpointer checkpointer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these all allowed to be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. This is a copy of this, but it sure seems like CheckpointAsync
can result in NRE
PartitionCheckpointer checkpointer, | ||
CosmosSerializer cosmosJsonSerializer) | ||
{ | ||
this.observer = observer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
null checks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. but you may want to have a look at the original FeedProcessorCore
{ | ||
string lastContinuation = this.options.StartContinuation; | ||
|
||
while (!cancellationToken.IsCancellationRequested) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a cancellation is requested won't it immediately leave this method instead of throwing the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure w hat the original intent was/is. This is a copy of FeedProcessorCore intended for use with a stream delegate rather than a typed one
This PR is duplicating a lot of the Change Feed Processor files, which makes maintaining this code in the future very hard. A better approach would probably be:
That would reduce the amount of files changed greatly. |
I was looking at that possibility, but was trying not to be too intrusive on the existing code. |
… and ChangeFeedObserverFactoryCore provides the item deserialization.
@@ -28,9 +28,9 @@ public override Task OpenAsync(ChangeFeedObserverContext context) | |||
return Task.CompletedTask; | |||
} | |||
|
|||
public override Task ProcessChangesAsync(ChangeFeedObserverContext context, IReadOnlyCollection<T> docs, CancellationToken cancellationToken) | |||
public async override Task ProcessChangesAsync(ChangeFeedObserverContext context, Stream stream, CancellationToken cancellationToken) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can still return the task here:
return this.onChanges(stream, cancellation);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the call is using .ConfigureAwait(false)
which is not directly returnable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is my point, it was initially not using ConfigureAwait. Since this is something that will be executed in a thread that is not the main thread, which would be the benefit of ConfigureAwait(false)
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactoryCore.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the refactoring. @kirankumarkolli can you please take a look? Since this involves public API changes
/azp run |
Azure Pipelines successfully started running 2 pipeline(s). |
Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserver.cs
Outdated
Show resolved
Hide resolved
...t.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/ChangeFeedProcessorCoreTests.cs
Outdated
Show resolved
Hide resolved
Microsoft.Azure.Cosmos/tests/Microsoft.Azure.Cosmos.Tests/ChangeFeed/FeedProcessorCoreTests.cs
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, left some comments regarding some test changes but overall looks fine. Please address them and let's see if the tests pass and we can get it in.
Microsoft.Azure.Cosmos/src/ChangeFeedProcessor/FeedProcessing/ChangeFeedObserverFactoryCore.cs
Outdated
Show resolved
Hide resolved
@@ -1038,7 +1069,7 @@ | |||
}, | |||
"NestedTypes": {} | |||
}, | |||
"Microsoft.Azure.Cosmos.ContainerProperties;System.Object;IsAbstract:False;IsSealed:False;IsInterface:False;IsEnum:False;IsClass:True;IsValueType:False;IsNested:False;IsGenericType:False;IsSerializable:False": { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are weird and unrelated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's verify the contract changes, it seems there are changes in the files that do not align with the expected API changes. @j82w do you know what could be causing it?
/azp run |
Azure Pipelines successfully started running 2 pipeline(s). |
@fuocor There seems to be some failures on the Unit tests (besides the contract change one), could you take a look? |
Tests have been fixed |
/azp run |
Azure Pipelines successfully started running 2 pipeline(s). |
/asp run |
/azp run |
Azure Pipelines successfully started running 2 pipeline(s). |
Closing as the changes were ported to #2331 |
Pull Request Template
Description
Added the ability to use a Stream delegate rather than a typed collection in order to provide support for custom serialization and use patterns where deserialization of the items is not required.
Type of change
Added set of classes needed to support the Stream delegate.
Please delete options that are not relevant.
Closing issues
#865
Assignee
Richard Fuoco