-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
server.rb
820 lines (691 loc) · 30.7 KB
/
server.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin_helper/event_loop'
require 'serverengine'
require 'cool.io'
require 'socket'
require 'ipaddr'
require 'fcntl'
require 'openssl'
require_relative 'socket_option'
require_relative 'cert_option'
module Fluent
module PluginHelper
module Server
include Fluent::PluginHelper::EventLoop
include Fluent::PluginHelper::SocketOption
include Fluent::PluginHelper::CertOption
# This plugin helper doesn't support these things for now:
# * TCP/TLS keepalive
# * TLS session cache/tickets
# * unix domain sockets
# stop : [-]
# shutdown : detach server event handler from event loop (event_loop)
# close : close listening sockets
# terminate: remote all server instances
attr_reader :_servers # for tests
def server_wait_until_start
# event_loop_wait_until_start works well for this
end
def server_wait_until_stop
sleep 0.1 while @_servers.any?{|si| si.server.attached? }
@_servers.each{|si| si.server.close rescue nil }
end
PROTOCOLS = [:tcp, :udp, :tls, :unix]
CONNECTION_PROTOCOLS = [:tcp, :tls, :unix]
# server_create_connection(:title, @port) do |conn|
# # on connection
# source_addr = conn.remote_host
# source_port = conn.remote_port
# conn.data do |data|
# # on data
# conn.write resp # ...
# conn.close
# end
# end
def server_create_connection(title, port, proto: nil, bind: '0.0.0.0', shared: true, backlog: nil, tls_options: nil, **socket_options, &block)
proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp
raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
raise ArgumentError, "BUG: cannot create connection for UDP" unless CONNECTION_PROTOCOLS.include?(proto)
raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls
raise ArgumentError, "BUG: block not specified which handles connection" unless block_given?
raise ArgumentError, "BUG: block must have just one argument" unless block.arity == 1
if proto == :tcp || proto == :tls
socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
end
socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
case proto
when :tcp
server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
when :tls
transport_config = if tls_options
server_create_transport_section_object(tls_options)
elsif @transport_config && @transport_config.protocol == :tls
@transport_config
else
raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
end
server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter, &block)
when :unix
raise "not implemented yet"
else
raise "unknown protocol #{proto}"
end
server_attach(title, proto, port, bind, shared, server)
end
# server_create(:title, @port) do |data|
# # ...
# end
# server_create(:title, @port) do |data, conn|
# # ...
# end
# server_create(:title, @port, proto: :udp, max_bytes: 2048) do |data, sock|
# sock.remote_host
# sock.remote_port
# # ...
# end
def server_create(title, port, proto: nil, bind: '0.0.0.0', shared: true, socket: nil, backlog: nil, tls_options: nil, max_bytes: nil, flags: 0, **socket_options, &callback)
proto ||= (@transport_config && @transport_config.protocol == :tls) ? :tls : :tcp
raise ArgumentError, "BUG: title must be a symbol" unless title && title.is_a?(Symbol)
raise ArgumentError, "BUG: port must be an integer" unless port && port.is_a?(Integer)
raise ArgumentError, "BUG: invalid protocol name" unless PROTOCOLS.include?(proto)
raise ArgumentError, "BUG: socket option is available only for udp" if socket && proto != :udp
raise ArgumentError, "BUG: tls_options is available only for tls" if tls_options && proto != :tls
raise ArgumentError, "BUG: block not specified which handles received data" unless block_given?
raise ArgumentError, "BUG: block must have 1 or 2 arguments" unless callback.arity == 1 || callback.arity == 2
if proto == :tcp || proto == :tls
socket_options[:linger_timeout] ||= @transport_config&.linger_timeout || 0
end
unless socket
socket_option_validate!(proto, **socket_options)
socket_option_setter = ->(sock){ socket_option_set(sock, **socket_options) }
end
if proto != :tcp && proto != :tls && proto != :unix # options to listen/accept connections
raise ArgumentError, "BUG: backlog is available for tcp/tls" if backlog
end
if proto != :udp # UDP options
raise ArgumentError, "BUG: max_bytes is available only for udp" if max_bytes
raise ArgumentError, "BUG: flags is available only for udp" if flags != 0
end
case proto
when :tcp
server = server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter) do |conn|
conn.data(&callback)
end
when :tls
transport_config = if tls_options
server_create_transport_section_object(tls_options)
elsif @transport_config && @transport_config.protocol == :tls
@transport_config
else
raise ArgumentError, "BUG: TLS transport specified, but certification options are not specified"
end
server = server_create_for_tls_connection(shared, bind, port, transport_config, backlog, socket_option_setter) do |conn|
conn.data(&callback)
end
when :udp
raise ArgumentError, "BUG: max_bytes must be specified for UDP" unless max_bytes
if socket
sock = socket
close_socket = false
else
sock = server_create_udp_socket(shared, bind, port)
socket_option_setter.call(sock)
close_socket = true
end
server = EventHandler::UDPServer.new(sock, max_bytes, flags, close_socket, @log, @under_plugin_development, &callback)
when :unix
raise "not implemented yet"
else
raise "BUG: unknown protocol #{proto}"
end
server_attach(title, proto, port, bind, shared, server)
end
def server_create_tcp(title, port, **kwargs, &callback)
server_create(title, port, proto: :tcp, **kwargs, &callback)
end
def server_create_udp(title, port, **kwargs, &callback)
server_create(title, port, proto: :udp, **kwargs, &callback)
end
def server_create_tls(title, port, **kwargs, &callback)
server_create(title, port, proto: :tls, **kwargs, &callback)
end
def server_create_unix(title, port, **kwargs, &callback)
server_create(title, port, proto: :unix, **kwargs, &callback)
end
ServerInfo = Struct.new(:title, :proto, :port, :bind, :shared, :server)
def server_attach(title, proto, port, bind, shared, server)
@_servers << ServerInfo.new(title, proto, port, bind, shared, server)
event_loop_attach(server)
end
def server_create_for_tcp_connection(shared, bind, port, backlog, socket_option_setter, &block)
sock = server_create_tcp_socket(shared, bind, port)
socket_option_setter.call(sock)
close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
server = Coolio::TCPServer.new(sock, nil, EventHandler::TCPServer, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
unless conn.closing
@_server_mutex.synchronize do
@_server_connections << conn
end
end
end
server.listen(backlog) if backlog
server
end
def server_create_for_tls_connection(shared, bind, port, conf, backlog, socket_option_setter, &block)
context = cert_option_create_context(conf.version, conf.insecure, conf.ciphers, conf)
sock = server_create_tcp_socket(shared, bind, port)
socket_option_setter.call(sock)
close_callback = ->(conn){ @_server_mutex.synchronize{ @_server_connections.delete(conn) } }
server = Coolio::TCPServer.new(sock, nil, EventHandler::TLSServer, context, socket_option_setter, close_callback, @log, @under_plugin_development, block) do |conn|
unless conn.closing
@_server_mutex.synchronize do
@_server_connections << conn
end
end
end
server.listen(backlog) if backlog
server
end
SERVER_TRANSPORT_PARAMS = [
:protocol, :version, :min_version, :max_version, :ciphers, :insecure,
:ca_path, :cert_path, :private_key_path, :private_key_passphrase, :client_cert_auth,
:ca_cert_path, :ca_private_key_path, :ca_private_key_passphrase,
:cert_verifier, :generate_private_key_length,
:generate_cert_country, :generate_cert_state, :generate_cert_state,
:generate_cert_locality, :generate_cert_common_name,
:generate_cert_expiration, :generate_cert_digest,
]
def server_create_transport_section_object(opts)
transport_section = configured_section_create(:transport)
SERVER_TRANSPORT_PARAMS.each do |param|
if opts.has_key?(param)
transport_section[param] = opts[param]
end
end
transport_section
end
module ServerTransportParams
include Fluent::Configurable
config_section :transport, required: false, multi: false, init: true, param_name: :transport_config do
config_argument :protocol, :enum, list: [:tcp, :tls], default: :tcp
### Socket Params ###
# SO_LINGER 0 to send RST rather than FIN to avoid lots of connections sitting in TIME_WAIT at src.
# Set positive value if needing to send FIN on closing on non-Windows.
# (On Windows, Fluentd can send FIN with zero `linger_timeout` since Fluentd doesn't set 0 to SO_LINGER on Windows.
# See `socket_option.rb`.)
# NOTE:
# Socket-options can be specified from each plugin as needed, so most of them is not defined here for now.
# This is because there is no positive reason to do so.
# `linger_timeout` option in particular needs to be defined here
# although it can be specified from each plugin as well.
# This is because this helper fixes the default value to `0` for its own reason
# and it has a critical effect on the behavior.
desc 'The timeout time used to set linger option.'
config_param :linger_timeout, :integer, default: 0
### TLS Params ###
config_param :version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: Fluent::TLS::DEFAULT_VERSION
config_param :min_version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: nil
config_param :max_version, :enum, list: Fluent::TLS::SUPPORTED_VERSIONS, default: nil
config_param :ciphers, :string, default: Fluent::TLS::CIPHERS_DEFAULT
config_param :insecure, :bool, default: false
# Cert signed by public CA
config_param :ca_path, :string, default: nil
config_param :cert_path, :string, default: nil
config_param :private_key_path, :string, default: nil
config_param :private_key_passphrase, :string, default: nil, secret: true
config_param :client_cert_auth, :bool, default: false
# Cert generated and signed by private CA Certificate
config_param :ca_cert_path, :string, default: nil
config_param :ca_private_key_path, :string, default: nil
config_param :ca_private_key_passphrase, :string, default: nil, secret: true
config_param :cert_verifier, :string, default: nil
# Options for generating certs by private CA certs or self-signed
config_param :generate_private_key_length, :integer, default: 2048
config_param :generate_cert_country, :string, default: 'US'
config_param :generate_cert_state, :string, default: 'CA'
config_param :generate_cert_locality, :string, default: 'Mountain View'
config_param :generate_cert_common_name, :string, default: nil
config_param :generate_cert_expiration, :time, default: 10 * 365 * 86400 # 10years later
config_param :generate_cert_digest, :enum, list: [:sha1, :sha256, :sha384, :sha512], default: :sha256
end
end
def self.included(mod)
mod.include ServerTransportParams
end
def initialize
super
@_servers = []
@_server_connections = []
@_server_mutex = Mutex.new
end
def configure(conf)
super
if @transport_config
if @transport_config.protocol == :tls
cert_option_server_validate!(@transport_config)
end
end
end
def stop
@_server_mutex.synchronize do
@_servers.each do |si|
si.server.detach if si.server.attached?
# to refuse more connections: (connected sockets are still alive here)
si.server.close rescue nil
end
end
super
end
def shutdown
@_server_connections.each do |conn|
conn.close rescue nil
end
super
end
def terminate
@_servers = []
super
end
def server_socket_manager_client
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
ServerEngine::SocketManager::Client.new(socket_manager_path)
end
def server_create_tcp_socket(shared, bind, port)
sock = if shared
server_socket_manager_client.listen_tcp(bind, port)
else
# TCPServer.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
# backlog will be set by the caller, we don't need to set backlog here
tsock = Addrinfo.tcp(bind, port).listen
tsock.autoclose = false
TCPServer.for_fd(tsock.fileno)
end
# close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
sock
end
def server_create_udp_socket(shared, bind, port)
sock = if shared
server_socket_manager_client.listen_udp(bind, port)
else
# UDPSocket.new doesn't set IPV6_V6ONLY flag, so use Addrinfo class instead.
usock = Addrinfo.udp(bind, port).bind
usock.autoclose = false
UDPSocket.for_fd(usock.fileno)
end
# close-on-exec is set by default in Ruby 2.0 or later (, and it's unavailable on Windows)
sock.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) # nonblock
sock
end
# Use string "?" for port, not integer or nil. "?" is clear than -1 or nil in the log.
PEERADDR_FAILED = ["?", "?", "name resolution failed", "?"]
class CallbackSocket
def initialize(server_type, sock, enabled_events = [], close_socket: true)
@server_type = server_type
@sock = sock
@peeraddr = nil
@enabled_events = enabled_events
@close_socket = close_socket
end
def remote_addr
@peeraddr[3]
end
def remote_host
@peeraddr[2]
end
def remote_port
@peeraddr[1]
end
def send(data, flags = 0)
@sock.send(data, flags)
end
def write(data)
raise "not implemented here"
end
def close_after_write_complete
@sock.close_after_write_complete = true
end
def close
@sock.close if @close_socket
end
def data(&callback)
on(:data, &callback)
end
def on(event, &callback)
raise "BUG: this event is disabled for #{@server_type}: #{event}" unless @enabled_events.include?(event)
case event
when :data
@sock.data(&callback)
when :write_complete
cb = ->(){ callback.call(self) }
@sock.on_write_complete(&cb)
when :close
cb = ->(){ callback.call(self) }
@sock.on_close(&cb)
else
raise "BUG: unknown event: #{event}"
end
end
end
class TCPCallbackSocket < CallbackSocket
ENABLED_EVENTS = [:data, :write_complete, :close]
attr_accessor :buffer
def initialize(sock)
super("tcp", sock, ENABLED_EVENTS)
@peeraddr = (@sock.peeraddr rescue PEERADDR_FAILED)
@buffer = ''
end
def write(data)
@sock.write(data)
end
end
class TLSCallbackSocket < CallbackSocket
ENABLED_EVENTS = [:data, :write_complete, :close]
attr_accessor :buffer
def initialize(sock)
super("tls", sock, ENABLED_EVENTS)
@peeraddr = (@sock.to_io.peeraddr rescue PEERADDR_FAILED)
@buffer = ''
end
def write(data)
@sock.write(data)
end
end
class UDPCallbackSocket < CallbackSocket
ENABLED_EVENTS = []
def initialize(sock, peeraddr, **kwargs)
super("udp", sock, ENABLED_EVENTS, **kwargs)
@peeraddr = peeraddr
end
def remote_addr
@peeraddr[3]
end
def remote_host
@peeraddr[2]
end
def remote_port
@peeraddr[1]
end
def write(data)
@sock.send(data, 0, @peeraddr[3], @peeraddr[1])
end
end
module EventHandler
class UDPServer < Coolio::IO
attr_writer :close_after_write_complete # dummy for consistent method call in callbacks
def initialize(sock, max_bytes, flags, close_socket, log, under_plugin_development, &callback)
raise ArgumentError, "socket must be a UDPSocket: sock = #{sock}" unless sock.is_a?(UDPSocket)
super(sock)
@sock = sock
@max_bytes = max_bytes
@flags = flags
@close_socket = close_socket
@log = log
@under_plugin_development = under_plugin_development
@callback = callback
on_readable_impl = case @callback.arity
when 1 then :on_readable_without_sock
when 2 then :on_readable_with_sock
else
raise "BUG: callback block must have 1 or 2 arguments"
end
self.define_singleton_method(:on_readable, method(on_readable_impl))
end
def on_readable_without_sock
begin
data = @sock.recv(@max_bytes, @flags)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF
return
end
@callback.call(data)
rescue => e
@log.error "unexpected error in processing UDP data", error: e
@log.error_backtrace
raise if @under_plugin_development
end
def on_readable_with_sock
begin
data, addr = @sock.recvfrom(@max_bytes)
rescue Errno::EAGAIN, Errno::EWOULDBLOCK, Errno::EINTR, Errno::ECONNRESET, IOError, Errno::EBADF
return
end
@callback.call(data, UDPCallbackSocket.new(@sock, addr, close_socket: @close_socket))
rescue => e
@log.error "unexpected error in processing UDP data", error: e
@log.error_backtrace
raise if @under_plugin_development
end
end
class TCPServer < Coolio::TCPSocket
attr_reader :closing
attr_writer :close_after_write_complete
def initialize(sock, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)
socket_option_setter.call(sock)
@_handler_socket = sock
super(sock)
@log = log
@under_plugin_development = under_plugin_development
@connect_callback = connect_callback
@data_callback = nil
@close_callback = close_callback
@callback_connection = nil
@close_after_write_complete = false
@closing = false
@mutex = Mutex.new # to serialize #write and #close
end
def to_io
@_handler_socket
end
def data(&callback)
raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
@data_callback = callback
on_read_impl = case callback.arity
when 1 then :on_read_without_connection
when 2 then :on_read_with_connection
else
raise "BUG: callback block must have 1 or 2 arguments"
end
self.define_singleton_method(:on_read, method(on_read_impl))
end
def write(data)
@mutex.synchronize do
super
end
end
def on_writable
super
close if @close_after_write_complete
end
def on_connect
@callback_connection = TCPCallbackSocket.new(self)
@connect_callback.call(@callback_connection)
unless @data_callback
raise "connection callback must call #data to set data callback"
end
end
def on_read_without_connection(data)
@data_callback.call(data)
rescue => e
@log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
@log.error_backtrace
close rescue nil
raise if @under_plugin_development
end
def on_read_with_connection(data)
@data_callback.call(data, @callback_connection)
rescue => e
@log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
@log.error_backtrace
close rescue nil
raise if @under_plugin_development
end
def close
@mutex.synchronize do
return if @closing
@closing = true
@close_callback.call(self)
super
end
end
end
class TLSServer < Coolio::Socket
attr_reader :closing
attr_writer :close_after_write_complete
# It can't use Coolio::TCPSocket, because Coolio::TCPSocket checks that underlying socket (1st argument of super) is TCPSocket.
def initialize(sock, context, socket_option_setter, close_callback, log, under_plugin_development, connect_callback)
raise ArgumentError, "socket must be a TCPSocket: sock=#{sock}" unless sock.is_a?(TCPSocket)
socket_option_setter.call(sock)
@_handler_socket = OpenSSL::SSL::SSLSocket.new(sock, context)
@_handler_socket.sync_close = true
@_handler_write_buffer = ''.force_encoding('ascii-8bit')
@_handler_accepted = false
super(@_handler_socket)
@log = log
@under_plugin_development = under_plugin_development
@connect_callback = connect_callback
@data_callback = nil
@close_callback = close_callback
@callback_connection = nil
@close_after_write_complete = false
@closing = false
@mutex = Mutex.new # to serialize #write and #close
end
def to_io
@_handler_socket.to_io
end
def data(&callback)
raise "data callback can be registered just once, but registered twice" if self.singleton_methods.include?(:on_read)
@data_callback = callback
on_read_impl = case callback.arity
when 1 then :on_read_without_connection
when 2 then :on_read_with_connection
else
raise "BUG: callback block must have 1 or 2 arguments"
end
self.define_singleton_method(:on_read, method(on_read_impl))
end
def write(data)
@mutex.synchronize do
@_handler_write_buffer << data
schedule_write
data.bytesize
end
end
def try_tls_accept
return true if @_handler_accepted
begin
result = @_handler_socket.accept_nonblock(exception: false) # this method call actually try to do handshake via TLS
if result == :wait_readable || result == :wait_writable
# retry accept_nonblock: there aren't enough data in underlying socket buffer
else
@_handler_accepted = true
@callback_connection = TLSCallbackSocket.new(self)
@connect_callback.call(@callback_connection)
unless @data_callback
raise "connection callback must call #data to set data callback"
end
return true
end
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ETIMEDOUT, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
@log.trace "unexpected error before accepting TLS connection",
addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
close rescue nil
rescue OpenSSL::SSL::SSLError => e
peeraddr = (@_handler_socket.peeraddr rescue PEERADDR_FAILED)
# Use same log level as on_readable
@log.warn "unexpected error before accepting TLS connection by OpenSSL",
addr: peeraddr[3], host: peeraddr[2], port: peeraddr[1], error: e
close rescue nil
end
false
end
def on_connect
try_tls_accept
end
def on_readable
if try_tls_accept
super
end
rescue IO::WaitReadable, IO::WaitWritable
# ignore and return with doing nothing
rescue OpenSSL::SSL::SSLError => e
@log.warn "close socket due to unexpected ssl error: #{e}"
close rescue nil
end
def on_writable
begin
@mutex.synchronize do
# Consider write_nonblock with {exception: false} when IO::WaitWritable error happens frequently.
written_bytes = @_handler_socket.write_nonblock(@_handler_write_buffer)
@_handler_write_buffer.slice!(0, written_bytes)
end
# No need to call `super` in a synchronized context because TLSServer doesn't use the inner buffer(::IO::Buffer) of Coolio::IO.
# Instead of using Coolio::IO's inner buffer, TLSServer has own buffer(`@_handler_write_buffer`). See also TLSServer#write.
# Actually, the only reason calling `super` here is call Coolio::IO#disable_write_watcher.
# If `super` is called in a synchronized context, it could cause a mutex recursive locking since Coolio::IO#on_write_complete
# eventually calls TLSServer#close which try to get a lock.
super
close if @close_after_write_complete
rescue IO::WaitWritable, IO::WaitReadable
return
rescue Errno::EINTR
return
rescue SystemCallError, IOError, SocketError
# SystemCallError catches Errno::EPIPE & Errno::ECONNRESET amongst others.
close rescue nil
return
rescue OpenSSL::SSL::SSLError => e
@log.debug "unexpected SSLError while writing data into socket connected via TLS", error: e
end
end
def on_read_without_connection(data)
@data_callback.call(data)
rescue => e
@log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
@log.error_backtrace
close rescue nil
raise if @under_plugin_development
end
def on_read_with_connection(data)
@data_callback.call(data, @callback_connection)
rescue => e
@log.error "unexpected error on reading data", host: @callback_connection.remote_host, port: @callback_connection.remote_port, error: e
@log.error_backtrace
close rescue nil
raise if @under_plugin_development
end
def close
@mutex.synchronize do
return if @closing
@closing = true
@close_callback.call(self)
super
end
end
end
end
end
end
end