Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cdddb18
IBiDi
nvborisenko Feb 21, 2026
40a888e
Interface in event args
nvborisenko Feb 21, 2026
7a33df8
Session
nvborisenko Feb 21, 2026
0e21970
EventDispatcher
nvborisenko Feb 22, 2026
fd4a896
Expose interface
nvborisenko Feb 22, 2026
5f0d776
BrowsingContext interfaces
nvborisenko Feb 22, 2026
c697469
Hide implementation
nvborisenko Feb 22, 2026
ed0a256
License headers
nvborisenko Feb 22, 2026
46912b3
Return BiDi.ConnectAsync() as public
nvborisenko Feb 22, 2026
dc9068d
Try to resolve circular deps
nvborisenko Feb 22, 2026
64222d0
Session end result
nvborisenko Feb 22, 2026
be4c11f
Unsubscribe twice issue, delegate event deserialization
nvborisenko Feb 22, 2026
915eab4
It is not JsonException, it is validation errors
nvborisenko Feb 22, 2026
3eb4844
Avoid massive usings
nvborisenko Feb 22, 2026
a0a6cbd
Remove unnecessary TryGetEventTypeInfo
nvborisenko Feb 22, 2026
5d32fec
Unsubscribe
nvborisenko Feb 22, 2026
87170d2
Deferred events deserialization
nvborisenko Feb 22, 2026
4524428
Ignore OperationCanceledException on dispose
nvborisenko Feb 22, 2026
97251c7
Only when expected
nvborisenko Feb 22, 2026
0091636
BiDi private ctor
nvborisenko Feb 22, 2026
07901f4
Cancel/await/dispose token source
nvborisenko Feb 22, 2026
8c9a519
Clean paramsReader
nvborisenko Feb 22, 2026
b5b5935
Dispose in ctor
nvborisenko Feb 22, 2026
6f844c7
Hide AsModule IDE
nvborisenko Feb 22, 2026
4be64ca
Merge remote-tracking branch 'upstream/trunk' into bidi-interface
nvborisenko Feb 22, 2026
a47cc97
Format
nvborisenko Feb 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 26 additions & 29 deletions dotnet/src/webdriver/BiDi/BiDi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,65 +20,62 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Text.Json.Serialization;
using OpenQA.Selenium.BiDi.Json.Converters;
using OpenQA.Selenium.BiDi.Session;

namespace OpenQA.Selenium.BiDi;

public sealed class BiDi : IAsyncDisposable
public sealed class BiDi : IBiDi
{
private readonly ConcurrentDictionary<Type, Module> _modules = new();

private BiDi(string url)
{
var uri = new Uri(url);

Broker = new Broker(this, uri);
}
private Broker Broker { get; set; } = null!;

Comment thread
nvborisenko marked this conversation as resolved.
private Broker Broker { get; }
internal ISessionModule Session => AsModule<SessionModule>();

internal Session.SessionModule SessionModule => AsModule<Session.SessionModule>();
private BiDi() { }

public BrowsingContext.BrowsingContextModule BrowsingContext => AsModule<BrowsingContext.BrowsingContextModule>();
public BrowsingContext.IBrowsingContextModule BrowsingContext => AsModule<BrowsingContext.BrowsingContextModule>();

public Browser.BrowserModule Browser => AsModule<Browser.BrowserModule>();
public Browser.IBrowserModule Browser => AsModule<Browser.BrowserModule>();

public Network.NetworkModule Network => AsModule<Network.NetworkModule>();
public Network.INetworkModule Network => AsModule<Network.NetworkModule>();

public Input.InputModule Input => AsModule<Input.InputModule>();
public Input.IInputModule Input => AsModule<Input.InputModule>();

public Script.ScriptModule Script => AsModule<Script.ScriptModule>();
public Script.IScriptModule Script => AsModule<Script.ScriptModule>();

public Log.LogModule Log => AsModule<Log.LogModule>();
public Log.ILogModule Log => AsModule<Log.LogModule>();

public Storage.StorageModule Storage => AsModule<Storage.StorageModule>();
public Storage.IStorageModule Storage => AsModule<Storage.StorageModule>();

public WebExtension.WebExtensionModule WebExtension => AsModule<WebExtension.WebExtensionModule>();
public WebExtension.IWebExtensionModule WebExtension => AsModule<WebExtension.WebExtensionModule>();

public Emulation.EmulationModule Emulation => AsModule<Emulation.EmulationModule>();
public Emulation.IEmulationModule Emulation => AsModule<Emulation.EmulationModule>();

public static async Task<BiDi> ConnectAsync(string url, BiDiOptions? options = null, CancellationToken cancellationToken = default)
public static async Task<IBiDi> ConnectAsync(string url, BiDiOptions? options = null, CancellationToken cancellationToken = default)
Comment thread
nvborisenko marked this conversation as resolved.
{
Comment thread
nvborisenko marked this conversation as resolved.
var bidi = new BiDi(url);
var transport = await WebSocketTransport.ConnectAsync(new Uri(url), cancellationToken).ConfigureAwait(false);

BiDi bidi = new();

await bidi.Broker.ConnectAsync(cancellationToken).ConfigureAwait(false);
bidi.Broker = new Broker(transport, bidi, () => bidi.Session);
Comment thread
nvborisenko marked this conversation as resolved.

Comment thread
nvborisenko marked this conversation as resolved.
return bidi;
}

public Task<Session.StatusResult> StatusAsync(Session.StatusOptions? options = null, CancellationToken cancellationToken = default)
public Task<StatusResult> StatusAsync(StatusOptions? options = null, CancellationToken cancellationToken = default)
{
return SessionModule.StatusAsync(options, cancellationToken);
return Session.StatusAsync(options, cancellationToken);
}

public Task<Session.NewResult> NewAsync(Session.CapabilitiesRequest capabilities, Session.NewOptions? options = null, CancellationToken cancellationToken = default)
public Task<NewResult> NewAsync(CapabilitiesRequest capabilities, NewOptions? options = null, CancellationToken cancellationToken = default)
{
return SessionModule.NewAsync(capabilities, options, cancellationToken);
return Session.NewAsync(capabilities, options, cancellationToken);
}

public Task EndAsync(Session.EndOptions? options = null, CancellationToken cancellationToken = default)
public Task<EndResult> EndAsync(EndOptions? options = null, CancellationToken cancellationToken = default)
{
return SessionModule.EndAsync(options, cancellationToken);
return Session.EndAsync(options, cancellationToken);
}

public async ValueTask DisposeAsync()
Expand All @@ -101,7 +98,7 @@ private static JsonSerializerOptions CreateDefaultJsonOptions()
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
Converters =
{
new DateTimeOffsetConverter(),
new Json.Converters.DateTimeOffsetConverter(),
}
};
}
Expand Down
206 changes: 71 additions & 135 deletions dotnet/src/webdriver/BiDi/Broker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
using System.Collections.Concurrent;
using System.Text.Json;
using System.Text.Json.Serialization.Metadata;
using System.Threading.Channels;
using OpenQA.Selenium.BiDi.Session;
using OpenQA.Selenium.Internal.Logging;

namespace OpenQA.Selenium.BiDi;
Expand All @@ -29,107 +29,33 @@ internal sealed class Broker : IAsyncDisposable
{
private readonly ILogger _logger = Internal.Logging.Log.GetLogger<Broker>();

private readonly BiDi _bidi;
private readonly ITransport _transport;
private readonly EventDispatcher _eventDispatcher;
private readonly IBiDi _bidi;

private readonly ConcurrentDictionary<long, CommandInfo> _pendingCommands = new();
private readonly Channel<EventInfo> _pendingEvents = Channel.CreateUnbounded<EventInfo>(new()
{
SingleReader = true,
SingleWriter = true
});
private readonly Dictionary<string, JsonTypeInfo> _eventTypesMap = [];

private readonly ConcurrentDictionary<string, List<EventHandler>> _eventHandlers = new();

private long _currentCommandId;

private static readonly TaskFactory _myTaskFactory = new(CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskContinuationOptions.None, TaskScheduler.Default);

private Task? _receivingMessageTask;
private Task? _eventEmitterTask;
private CancellationTokenSource? _receiveMessagesCancellationTokenSource;
private readonly Task _receivingMessageTask;
private readonly CancellationTokenSource _receiveMessagesCancellationTokenSource;

internal Broker(BiDi bidi, Uri url)
public Broker(ITransport transport, IBiDi bidi, Func<ISessionModule> sessionProvider)
{
_transport = transport;
_bidi = bidi;
_transport = new WebSocketTransport(url);
}

public async Task ConnectAsync(CancellationToken cancellationToken)
{
await _transport.ConnectAsync(cancellationToken).ConfigureAwait(false);
_eventDispatcher = new EventDispatcher(sessionProvider);

_receiveMessagesCancellationTokenSource = new CancellationTokenSource();
_receivingMessageTask = _myTaskFactory.StartNew(async () => await ReceiveMessagesAsync(_receiveMessagesCancellationTokenSource.Token), TaskCreationOptions.LongRunning).Unwrap();
_eventEmitterTask = _myTaskFactory.StartNew(ProcessEventsAwaiterAsync).Unwrap();
}

private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var data = await _transport.ReceiveAsync(cancellationToken).ConfigureAwait(false);

try
{
ProcessReceivedMessage(data);
}
catch (Exception ex)
{
if (_logger.IsEnabled(LogEventLevel.Error))
{
_logger.Error($"Unhandled error occurred while processing remote message: {ex}");
}
}
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
if (_logger.IsEnabled(LogEventLevel.Error))
{
_logger.Error($"Unhandled error occurred while receiving remote messages: {ex}");
}

throw;
}
}

private async Task ProcessEventsAwaiterAsync()
public Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, EventHandler eventHandler, SubscriptionOptions? options, JsonTypeInfo<TEventArgs> jsonTypeInfo, CancellationToken cancellationToken)
where TEventArgs : EventArgs
{
var reader = _pendingEvents.Reader;
while (await reader.WaitToReadAsync().ConfigureAwait(false))
{
while (reader.TryRead(out var result))
{
try
{
if (_eventHandlers.TryGetValue(result.Method, out var eventHandlers))
{
if (eventHandlers is not null)
{
foreach (var handler in eventHandlers.ToArray()) // copy handlers avoiding modified collection while iterating
{
var args = result.Params;

args.BiDi = _bidi;

await handler.InvokeAsync(args).ConfigureAwait(false);
}
}
}
}
catch (Exception ex)
{
if (_logger.IsEnabled(LogEventLevel.Error))
{
_logger.Error($"Unhandled error processing BiDi event handler: {ex}");
}
}
}
}
return _eventDispatcher.SubscribeAsync(eventName, eventHandler, options, jsonTypeInfo, cancellationToken);
}

public async Task<TResult> ExecuteCommandAsync<TCommand, TResult>(TCommand command, CommandOptions? options, JsonTypeInfo<TCommand> jsonCommandTypeInfo, JsonTypeInfo<TResult> jsonResultTypeInfo, CancellationToken cancellationToken)
Expand Down Expand Up @@ -157,40 +83,23 @@ public async Task<TResult> ExecuteCommandAsync<TCommand, TResult>(TCommand comma
return (TResult)await tcs.Task.ConfigureAwait(false);
}

public async Task<Subscription> SubscribeAsync<TEventArgs>(string eventName, EventHandler eventHandler, SubscriptionOptions? options, JsonTypeInfo<TEventArgs> jsonTypeInfo, CancellationToken cancellationToken)
where TEventArgs : EventArgs
{
_eventTypesMap[eventName] = jsonTypeInfo;

var handlers = _eventHandlers.GetOrAdd(eventName, (a) => []);

var subscribeResult = await _bidi.SessionModule.SubscribeAsync([eventName], new() { Contexts = options?.Contexts, UserContexts = options?.UserContexts }, cancellationToken).ConfigureAwait(false);

handlers.Add(eventHandler);

return new Subscription(subscribeResult.Subscription, this, eventHandler);
}

public async Task UnsubscribeAsync(Subscription subscription, CancellationToken cancellationToken)
{
var eventHandlers = _eventHandlers[subscription.EventHandler.EventName];

eventHandlers.Remove(subscription.EventHandler);

await _bidi.SessionModule.UnsubscribeAsync([subscription.SubscriptionId], null, cancellationToken).ConfigureAwait(false);
}

public async ValueTask DisposeAsync()
{
_pendingEvents.Writer.Complete();
_receiveMessagesCancellationTokenSource.Cancel();

Comment thread
nvborisenko marked this conversation as resolved.
_receiveMessagesCancellationTokenSource?.Cancel();
await _eventDispatcher.DisposeAsync().ConfigureAwait(false);

if (_eventEmitterTask is not null)
try
{
await _receivingMessageTask.ConfigureAwait(false);
}
catch (OperationCanceledException) when (_receiveMessagesCancellationTokenSource.IsCancellationRequested)
{
await _eventEmitterTask.ConfigureAwait(false);
// Expected when cancellation is requested, ignore.
}

_receiveMessagesCancellationTokenSource.Dispose();

_transport.Dispose();
Comment thread
nvborisenko marked this conversation as resolved.

GC.SuppressFinalize(this);
Expand All @@ -204,7 +113,8 @@ private void ProcessReceivedMessage(byte[]? data)
string? error = default;
string? message = default;
Utf8JsonReader resultReader = default;
Utf8JsonReader paramsReader = default;
long paramsStartIndex = 0;
long paramsEndIndex = 0;

Utf8JsonReader reader = new(new ReadOnlySpan<byte>(data));
Comment thread
nvborisenko marked this conversation as resolved.
reader.Read();
Expand Down Expand Up @@ -235,7 +145,7 @@ private void ProcessReceivedMessage(byte[]? data)
break;

case "params":
paramsReader = reader; // snapshot
paramsStartIndex = reader.TokenStartIndex;
break;

case "error":
Expand All @@ -247,21 +157,29 @@ private void ProcessReceivedMessage(byte[]? data)
break;
}

reader.Skip();
if (propertyName == "params")
{
reader.Skip();
paramsEndIndex = reader.BytesConsumed;
}
else
{
reader.Skip();
}
reader.Read();
}

switch (type)
{
case "success":
if (id is null) throw new JsonException("The remote end responded with 'success' message type, but missed required 'id' property.");
if (id is null) throw new BiDiException("The remote end responded with 'success' message type, but missed required 'id' property.");

if (_pendingCommands.TryGetValue(id.Value, out var command))
{
try
{
var commandResult = JsonSerializer.Deserialize(ref resultReader, command.JsonResultTypeInfo)
?? throw new JsonException("Remote end returned null command result in the 'result' property.");
?? throw new BiDiException("Remote end returned null command result in the 'result' property.");

command.TaskCompletionSource.SetResult((EmptyResult)commandResult);
}
Expand All @@ -282,25 +200,13 @@ private void ProcessReceivedMessage(byte[]? data)
break;

case "event":
if (method is null) throw new JsonException("The remote end responded with 'event' message type, but missed required 'method' property.");

if (_eventTypesMap.TryGetValue(method, out var eventInfo))
{
var eventArgs = (EventArgs)JsonSerializer.Deserialize(ref paramsReader, eventInfo)!;

eventArgs.BiDi = _bidi;

_pendingEvents.Writer.TryWrite(new EventInfo(method, eventArgs));
}
else
{
throw new BiDiException($"The remote end responded with 'event' message type, but no event type mapping for method '{method}' was found.");
}

if (method is null) throw new BiDiException("The remote end responded with 'event' message type, but missed required 'method' property.");
var paramsJsonData = new ReadOnlyMemory<byte>(data, (int)paramsStartIndex, (int)(paramsEndIndex - paramsStartIndex));
_eventDispatcher.EnqueueEvent(method, paramsJsonData, _bidi);
Comment thread
nvborisenko marked this conversation as resolved.
break;

case "error":
if (id is null) throw new JsonException("The remote end responded with 'error' message type, but missed required 'id' property.");
if (id is null) throw new BiDiException("The remote end responded with 'error' message type, but missed required 'id' property.");

if (_pendingCommands.TryGetValue(id.Value, out var errorCommand))
{
Expand All @@ -316,7 +222,37 @@ private void ProcessReceivedMessage(byte[]? data)
}
}

private readonly record struct CommandInfo(TaskCompletionSource<EmptyResult> TaskCompletionSource, JsonTypeInfo JsonResultTypeInfo);
private async Task ReceiveMessagesAsync(CancellationToken cancellationToken)
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
var data = await _transport.ReceiveAsync(cancellationToken).ConfigureAwait(false);

private readonly record struct EventInfo(string Method, EventArgs Params);
try
{
ProcessReceivedMessage(data);
}
catch (Exception ex)
{
if (_logger.IsEnabled(LogEventLevel.Error))
{
_logger.Error($"Unhandled error occurred while processing remote message: {ex}");
}
}
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
if (_logger.IsEnabled(LogEventLevel.Error))
{
_logger.Error($"Unhandled error occurred while receiving remote messages: {ex}");
}

throw;
}
}

private readonly record struct CommandInfo(TaskCompletionSource<EmptyResult> TaskCompletionSource, JsonTypeInfo JsonResultTypeInfo);
}
Loading