Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ECO-4687][Protocol-2] Fix duplicate outgoing ATTACH msg #416

Merged
4 changes: 2 additions & 2 deletions lib/ably/realtime/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ def __incoming_msgbus__
# @return [Ably::Models::ChannelOptions]
def set_options(channel_options)
@options = Ably::Models::ChannelOptions(channel_options)

manager.request_reattach if need_reattach?
# RTL4i
manager.request_reattach if (need_reattach? and connection.state?(:connected))
end
alias options= set_options

Expand Down
81 changes: 46 additions & 35 deletions lib/ably/realtime/channel/channel_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def initialize(channel, connection)
def attach
if can_transition_to?(:attached)
connect_if_connection_initialized
send_attach_protocol_message
send_attach_protocol_message if connection.state?(:connected) # RTL4i
end
end

Expand Down Expand Up @@ -49,14 +49,12 @@ def log_channel_error(error)
end

# Request channel to be reattached by sending an attach protocol message
# @param [Hash] options
# @option options [Ably::Models::ErrorInfo] :reason
def request_reattach(options = {})
reason = options[:reason]
send_attach_protocol_message
logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
# @param [Ably::Models::ErrorInfo] reason
def request_reattach(reason = nil)
channel.set_channel_error_reason(reason) if reason
channel.transition_state_machine! :attaching, reason: reason unless channel.attaching?
send_attach_protocol_message
logger.debug { "Explicit channel reattach request sent to Ably due to #{reason}" }
end

def duplicate_attached_received(protocol_message)
Expand Down Expand Up @@ -169,6 +167,12 @@ def start_attach_from_suspended_timer
end
end

# RTL13c
def notify_state_change
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
end

private
attr_reader :pending_state_change_timer

Expand Down Expand Up @@ -209,48 +213,55 @@ def send_attach_protocol_message
end

message_options[:channelSerial] = channel.properties.channel_serial # RTL4c1
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Attach, :suspended, message_options
end

def send_detach_protocol_message(previous_state)
send_state_change_protocol_message Ably::Models::ProtocolMessage::ACTION.Detach, previous_state # return to previous state if failed
end

def send_state_change_protocol_message(new_state, state_if_failed, message_options = {})
state_at_time_of_request = channel.state
attach_action = Ably::Models::ProtocolMessage::ACTION.Attach
# RTL4f
@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{new_state} operation failed (timed out)")
channel.transition_state_machine state_if_failed, reason: error
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{attach_action} operation failed (timed out)")
channel.transition_state_machine :suspended, reason: error # return to suspended state if failed
end
end
# Shouldn't queue attach message as per RTL4i, so message is added top of the queue
# to be sent immediately while processing next message
connection.send_protocol_message_immediately(
action: attach_action.to_i,
channel: channel.name,
**message_options.to_h
)
end

channel.once_state_changed do
@pending_state_change_timer.cancel if @pending_state_change_timer
@pending_state_change_timer = nil
def send_detach_protocol_message(previous_state)
state_at_time_of_request = channel.state
detach_action = Ably::Models::ProtocolMessage::ACTION.Detach

@pending_state_change_timer = EventMachine::Timer.new(realtime_request_timeout) do
if channel.state == state_at_time_of_request
error = Ably::Models::ErrorInfo.new(code: Ably::Exceptions::Codes::CHANNEL_OPERATION_FAILED_NO_RESPONSE_FROM_SERVER, message: "Channel #{detach_action} operation failed (timed out)")
channel.transition_state_machine previous_state, reason: error # return to previous state if failed
end
end

resend_if_disconnected_and_connected = lambda do
on_disconnected_and_connected = lambda do
connection.unsafe_once(:disconnected) do
next unless pending_state_change_timer
connection.unsafe_once(:connected) do
next unless pending_state_change_timer
connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name,
**message_options.to_h
)
resend_if_disconnected_and_connected.call
end
yield if pending_state_change_timer
end if pending_state_change_timer
end
end
resend_if_disconnected_and_connected.call

connection.send_protocol_message(
action: new_state.to_i,
channel: channel.name,
**message_options.to_h
)
send_detach_message = lambda do
on_disconnected_and_connected.call do
send_detach_message.call
end
connection.send_protocol_message(
action: detach_action.to_i,
channel: channel.name
)
end

send_detach_message.call
end

def logger
Expand Down
1 change: 1 addition & 0 deletions lib/ably/realtime/channel/channel_state_machine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class ChannelStateMachine
transition :from => :failed, :to => [:attaching, :initialized]

after_transition do |channel, transition|
channel.manager.notify_state_change # RTL13c
channel.synchronize_state_with_statemachine
end

Expand Down
25 changes: 18 additions & 7 deletions lib/ably/realtime/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -431,17 +431,28 @@ def logger
# @api private
def send_protocol_message(protocol_message)
add_message_serial_if_ack_required_to(protocol_message) do
Ably::Models::ProtocolMessage.new(protocol_message, logger: logger).tap do |message|
add_message_to_outgoing_queue message
notify_message_dispatcher_of_new_message message
logger.debug { "Connection: Prot msg queued =>: #{message.action} #{message}" }
end
message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger)
add_message_to_outgoing_queue(message)
notify_message_dispatcher_of_new_message message
end
end

def send_protocol_message_immediately(protocol_message)
message = Ably::Models::ProtocolMessage.new(protocol_message, logger: logger)
add_message_to_outgoing_queue(message, true)
notify_message_dispatcher_of_new_message message
end

# @api private
def add_message_to_outgoing_queue(protocol_message)
__outgoing_message_queue__ << protocol_message
def add_message_to_outgoing_queue(protocol_message, send_immediately = false)
if send_immediately
# Adding msg at the top of the queue to get processed immediately while connection is CONNECTED
__outgoing_message_queue__.prepend(protocol_message)
logger.debug { "Connection: protocol msg pushed at the top =>: #{message.action} #{message}" }
else
__outgoing_message_queue__ << protocol_message
logger.debug { "Connection: protocol msg queued =>: #{message.action} #{message}" }
end
end

# @api private
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/realtime/connection/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -582,7 +582,7 @@ def force_reattach_on_channels(error)
channels.select do |channel|
channel.attached? || channel.attaching? || channel.suspended?
end.each do |channel|
channel.manager.request_reattach reason: error
channel.manager.request_reattach error
end
end

Expand Down
28 changes: 28 additions & 0 deletions spec/acceptance/realtime/channel_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -430,16 +430,43 @@ def disconnect_transport
end

context 'with connection state' do

sent_attach_messages = []
received_attached_messages = []
before(:each) do
sent_attach_messages = []
received_attached_messages = []
client.connection.__outgoing_protocol_msgbus__.subscribe do |message|
if message.action == :attach
sent_attach_messages << message
end
end
client.connection.__incoming_protocol_msgbus__.subscribe do |message|
if message.action == :attached
received_attached_messages << message
end
end
end

# Should send/receive attach/attached message only once
# No duplicates should be sent or received
let(:check_for_attach_messages) do
expect(sent_attach_messages.size).to eq(1)
expect(received_attached_messages.size).to eq(1)
end

it 'is initialized (#RTL4i)' do
expect(connection).to be_initialized
channel.attach do
check_for_attach_messages
stop_reactor
end
end

it 'is connecting (#RTL4i)' do
connection.once(:connecting) do
channel.attach do
check_for_attach_messages
stop_reactor
end
end
Expand All @@ -449,6 +476,7 @@ def disconnect_transport
connection.once(:connected) do
connection.once(:disconnected) do
channel.attach do
check_for_attach_messages
stop_reactor
end
end
Expand Down
Loading