diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index ad2b9647668..bedc5fcb24d 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -43,3 +43,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Use the go-lookslike library for testing in heartbeat. Eventually the mapval package will be replaced with it. {pull}12540[12540] - New ReporterV2 interfaces that can receive a context on `Fetch(ctx, reporter)`, or `Run(ctx, reporter)`. {pull}11981[11981] - Generate configuration from `mage` for all Beats. {pull}12618[12618] +- Add ClientFactory to TCP input source to add SplitFunc/NetworkFuncs per client. {pull}8543[8543] diff --git a/filebeat/input/syslog/config.go b/filebeat/input/syslog/config.go index 76c4480c12c..4e8eecc74b4 100644 --- a/filebeat/input/syslog/config.go +++ b/filebeat/input/syslog/config.go @@ -60,7 +60,7 @@ var defaultUDP = udp.Config{ } func factory( - cb inputsource.NetworkFunc, + nf inputsource.NetworkFunc, config common.ConfigNamespace, ) (inputsource.Network, error) { n, cfg := config.Name(), config.Config() @@ -77,13 +77,15 @@ func factory( return nil, fmt.Errorf("error creating splitFunc from delimiter %s", config.LineDelimiter) } - return tcp.New(&config.Config, splitFunc, cb) + factory := tcp.SplitHandlerFactory(nf, splitFunc) + + return tcp.New(&config.Config, factory) case udp.Name: config := defaultUDP if err := cfg.Unpack(&config); err != nil { return nil, err } - return udp.New(&config, cb), nil + return udp.New(&config, nf), nil default: return nil, fmt.Errorf("you must choose between TCP or UDP") } diff --git a/filebeat/input/tcp/input.go b/filebeat/input/tcp/input.go index 90f0cf4f691..3ebf90657ae 100644 --- a/filebeat/input/tcp/input.go +++ b/filebeat/input/tcp/input.go @@ -80,7 +80,9 @@ func NewInput( return nil, fmt.Errorf("unable to create splitFunc for delimiter %s", config.LineDelimiter) } - server, err := tcp.New(&config.Config, splitFunc, cb) + factory := tcp.SplitHandlerFactory(cb, splitFunc) + + server, err := tcp.New(&config.Config, factory) if err != nil { return nil, err } diff --git a/filebeat/inputsource/tcp/client.go b/filebeat/inputsource/tcp/client.go index e6fc3b1aabd..1e5769a797a 100644 --- a/filebeat/inputsource/tcp/client.go +++ b/filebeat/inputsource/tcp/client.go @@ -31,10 +31,9 @@ import ( "github.com/elastic/beats/libbeat/logp" ) -// Client is a remote client. -type client struct { +// splitHandler is a TCP client that has splitting capabilities. +type splitHandler struct { conn net.Conn - log *logp.Logger callback inputsource.NetworkFunc done chan struct{} metadata inputsource.NetworkMetadata @@ -43,33 +42,50 @@ type client struct { timeout time.Duration } -func newClient( - conn net.Conn, - log *logp.Logger, +// ClientFactory returns a ConnectionHandler func +type ClientFactory func(config Config) ConnectionHandler + +// ConnectionHandler interface provides mechanisms for handling of incoming TCP connections +type ConnectionHandler interface { + Handle(conn net.Conn) error + Close() +} + +// SplitHandlerFactory allows creation of a ConnectionHandler that can do splitting of messages received on a TCP connection. +func SplitHandlerFactory(callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc) ClientFactory { + return func(config Config) ConnectionHandler { + return newSplitHandler(callback, splitFunc, uint64(config.MaxMessageSize), config.Timeout) + } +} + +// newSplitHandler allows creation of a TCP client that has splitting capabilities. +func newSplitHandler( callback inputsource.NetworkFunc, splitFunc bufio.SplitFunc, maxReadMessage uint64, timeout time.Duration, -) *client { - client := &client{ - conn: conn, - log: log.With("remote_address", conn.RemoteAddr()), +) ConnectionHandler { + client := &splitHandler{ callback: callback, done: make(chan struct{}), splitFunc: splitFunc, maxMessageSize: maxReadMessage, timeout: timeout, - metadata: inputsource.NetworkMetadata{ - RemoteAddr: conn.RemoteAddr(), - TLS: extractSSLInformation(conn), - }, } - extractSSLInformation(conn) return client } -func (c *client) handle() error { - r := NewResetableLimitedReader(NewDeadlineReader(c.conn, c.timeout), c.maxMessageSize) +// Handle takes a connection as input and processes data received on it. +func (c *splitHandler) Handle(conn net.Conn) error { + c.conn = conn + c.metadata = inputsource.NetworkMetadata{ + RemoteAddr: conn.RemoteAddr(), + TLS: extractSSLInformation(conn), + } + + log := logp.NewLogger("split_client").With("remote_addr", conn.RemoteAddr().String()) + + r := NewResetableLimitedReader(NewDeadlineReader(conn, c.timeout), c.maxMessageSize) buf := bufio.NewReader(r) scanner := bufio.NewScanner(buf) scanner.Split(c.splitFunc) @@ -79,7 +95,7 @@ func (c *client) handle() error { for scanner.Scan() { err := scanner.Err() if err != nil { - // we are forcing a close on the socket, lets ignore any error that could happen. + // we are forcing a Close on the socket, lets ignore any error that could happen. select { case <-c.done: break @@ -87,16 +103,16 @@ func (c *client) handle() error { } // This is a user defined limit and we should notify the user. if IsMaxReadBufferErr(err) { - c.log.Errorw("client error", "error", err) + log.Errorw("split_client error", "error", err) } - return errors.Wrap(err, "tcp client error") + return errors.Wrap(err, "tcp split_client error") } r.Reset() c.callback(scanner.Bytes(), c.metadata) } // We are out of the scanner, either we reached EOF or another fatal error occurred. - // like we failed to complete the TLS handshake or we are missing the client certificate when + // like we failed to complete the TLS handshake or we are missing the splitHandler certificate when // mutual auth is on, which is the default. if err := scanner.Err(); err != nil { return err @@ -105,7 +121,8 @@ func (c *client) handle() error { return nil } -func (c *client) close() { +// Close is used to perform clean up before the client is released. +func (c *splitHandler) Close() { close(c.done) c.conn.Close() } diff --git a/filebeat/inputsource/tcp/server.go b/filebeat/inputsource/tcp/server.go index 69d8ec10f93..e1e6225cb38 100644 --- a/filebeat/inputsource/tcp/server.go +++ b/filebeat/inputsource/tcp/server.go @@ -27,7 +27,6 @@ import ( "golang.org/x/net/netutil" - "github.com/elastic/beats/filebeat/inputsource" "github.com/elastic/beats/libbeat/common/transport/tlscommon" "github.com/elastic/beats/libbeat/logp" "github.com/elastic/beats/libbeat/outputs/transport" @@ -36,13 +35,12 @@ import ( // Server represent a TCP server type Server struct { sync.RWMutex - callback inputsource.NetworkFunc config *Config Listener net.Listener - clients map[*client]struct{} + clients map[ConnectionHandler]struct{} wg sync.WaitGroup done chan struct{} - splitFunc bufio.SplitFunc + factory ClientFactory log *logp.Logger tlsConfig *transport.TLSConfig } @@ -50,24 +48,22 @@ type Server struct { // New creates a new tcp server func New( config *Config, - splitFunc bufio.SplitFunc, - callback inputsource.NetworkFunc, + factory ClientFactory, ) (*Server, error) { tlsConfig, err := tlscommon.LoadTLSServerConfig(config.TLS) if err != nil { return nil, err } - if splitFunc == nil { - return nil, fmt.Errorf("SplitFunc can't be empty") + if factory == nil { + return nil, fmt.Errorf("ClientFactory can't be empty") } return &Server{ config: config, - callback: callback, - clients: make(map[*client]struct{}, 0), + clients: make(map[ConnectionHandler]struct{}, 0), done: make(chan struct{}), - splitFunc: splitFunc, + factory: factory, log: logp.NewLogger("tcp").With("address", config.Host), tlsConfig: tlsConfig, }, nil @@ -91,7 +87,11 @@ func (s *Server) Start() error { return nil } -// Run start and run a new TCP listener to receive new data +// Run start and run a new TCP listener to receive new data. When a new connection is accepted, the factory is used +// to create a ConnectionHandler. The ConnectionHandler takes the connection as input and handles the data that is +// being received via tha io.Reader. Most clients use the splitHandler which can take a bufio.SplitFunc and parse +// out each message into an appropriate event. The Close() of the ConnectionHandler can be used to clean up the +// connection either by client or server based on need. func (s *Server) run() { for { conn, err := s.Listener.Accept() @@ -105,14 +105,7 @@ func (s *Server) run() { } } - client := newClient( - conn, - s.log, - s.callback, - s.splitFunc, - uint64(s.config.MaxMessageSize), - s.config.Timeout, - ) + client := s.factory(*s.config) s.wg.Add(1) go func() { @@ -124,13 +117,13 @@ func (s *Server) run() { defer s.unregisterClient(client) s.log.Debugw("New client", "remote_address", conn.RemoteAddr(), "total", s.clientsCount()) - err := client.handle() + err := client.Handle(conn) if err != nil { - s.log.Debugw("Client error", "error", err) + s.log.Debugw("client error", "error", err) } defer s.log.Debugw( - "Client disconnected", + "client disconnected", "remote_address", conn.RemoteAddr(), "total", @@ -140,34 +133,34 @@ func (s *Server) run() { } } -// Stop stops accepting new incoming TCP connection and close any active clients +// Stop stops accepting new incoming TCP connection and Close any active clients func (s *Server) Stop() { s.log.Info("Stopping TCP server") close(s.done) s.Listener.Close() for _, client := range s.allClients() { - client.close() + client.Close() } s.wg.Wait() s.log.Info("TCP server stopped") } -func (s *Server) registerClient(client *client) { +func (s *Server) registerClient(client ConnectionHandler) { s.Lock() defer s.Unlock() s.clients[client] = struct{}{} } -func (s *Server) unregisterClient(client *client) { +func (s *Server) unregisterClient(client ConnectionHandler) { s.Lock() defer s.Unlock() delete(s.clients, client) } -func (s *Server) allClients() []*client { +func (s *Server) allClients() []ConnectionHandler { s.RLock() defer s.RUnlock() - currentClients := make([]*client, len(s.clients)) + currentClients := make([]ConnectionHandler, len(s.clients)) idx := 0 for client := range s.clients { currentClients[idx] = client diff --git a/filebeat/inputsource/tcp/server_test.go b/filebeat/inputsource/tcp/server_test.go index 160f99b2435..df63417a841 100644 --- a/filebeat/inputsource/tcp/server_test.go +++ b/filebeat/inputsource/tcp/server_test.go @@ -167,7 +167,9 @@ func TestReceiveEventsAndMetadata(t *testing.T) { if !assert.NoError(t, err) { return } - server, err := New(&config, test.splitFunc, to) + + factory := SplitHandlerFactory(to, test.splitFunc) + server, err := New(&config, factory) if !assert.NoError(t, err) { return } @@ -217,7 +219,10 @@ func TestReceiveNewEventsConcurrently(t *testing.T) { if !assert.NoError(t, err) { return } - server, err := New(&config, bufio.ScanLines, to) + + factory := SplitHandlerFactory(to, bufio.ScanLines) + + server, err := New(&config, factory) if !assert.NoError(t, err) { return }