Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak using OnBasicAuthWrapper Hook #177

Closed
xorduna opened this issue Oct 20, 2022 · 7 comments · Fixed by #186
Closed

Memory leak using OnBasicAuthWrapper Hook #177

xorduna opened this issue Oct 20, 2022 · 7 comments · Fixed by #186

Comments

@xorduna
Copy link

xorduna commented Oct 20, 2022

Hi,

We are currently experiencing a memory leak using gmqtt. The broker has around 20 clients that are publishing messages and no one is consuming. We are inserting data in the database using the OnMsgArrived hook and authenticating clients using OnBasicAuthWrapper hook.

After some further investigation I found the problem is on the OnBasicAuthWrapper hook. Apparently when there an in valid password, this error is thrown:

2022-10-21T10:38:28.636+0200    ERROR   server/client.go:273    connection lost {"client_id": "", "remote_addr": "[::1]:52040", "error": "operation error: Code = 5, reasonString: "}
github.com/DrmagicE/gmqtt/server.(*client).setError.func1
        /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:273
sync.(*Once).doSlow
        /Users/xorduna/go/go1.19.1/src/sync/once.go:74
sync.(*Once).Do
        /Users/xorduna/go/go1.19.1/src/sync/once.go:65
github.com/DrmagicE/gmqtt/server.(*client).setError
        /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:271
github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut.func1
        /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:523
github.com/DrmagicE/gmqtt/server.(*client).connectWithTimeOut
        /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:599
github.com/DrmagicE/gmqtt/server.(*client).serve
        /Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:1461

If we check the code on client.go, method serve() and follow the execution with the debugger, we see that after sending the "error authentication problem" the goroutine calls client.wg.Wait(). That cause the routing to stay in wait status and never end.

Is this a normal behaviour?

I followed the code in https://github.com/DrmagicE/gmqtt/blob/master/plugin/auth/hooks.go is something wrong with my code?

This is the plugin load function

func (m *Myc) Load(service server.Server) error {
	log = server.LoggerWithField(zap.String("plugin", Name))

	enforce := casbin.Enforcer{}

	// panic("implement me")
	log.Info("Loading myc plugin")
	log.Info("DB URI", zap.String("dbUri:", m.dbUri))

	dataBase, err := database.Open(database.Config{DBPostgres: m.dbUri, MaxIdleConns: 0, MaxOpenConns: 10, ShowSQL: true}, 5)
	if err != nil {
		return fmt.Errorf("connecting to db: %w", err)
	}
	m.store = store.NewStore(log.Sugar(), dataBase)
	log.Info("Loading ready")
	m.core = core.NewCore(log.Sugar(), app_config.Config{}, m.store, enforce, nil, new(session.Session))

        return nil
}

And that is the OnBasicAuthHook hook

func (m *Myc) OnBasicAuthWrapper(pre server.OnBasicAuth) server.OnBasicAuth {
	return func(ctx context.Context, client server.Client, req *server.ConnectRequest) (err error) {
		err = pre(ctx, client, req)
		if err != nil {
			return err
		}
		// fmt.Println("Authentication with:", string(req.Connect.Username), string(req.Connect.Password))
		log.Info("Authentication with:", zap.String("username", string(req.Connect.Username)),
			zap.String("password", string(req.Connect.Password)))

		valid := m.core.Gateway.ValidateToken(string(req.Connect.Username))
		if string(req.Connect.Password) != "" || !valid {
			log.Info("authentication failed", zap.String("username", string(req.Connect.Username)))
			switch client.Version() {
			case packets.Version5:
				return codes.NewError(codes.BadUserNameOrPassword)
			case packets.Version311, packets.Version31:
				return codes.NewError(codes.V3BadUsernameorPassword)
			default:
				return codes.NewError(codes.UnsupportedProtocolVersion)
			}
		}
		return nil
	}
}

I enabled pprof on the broker and I from what i see there are thousands of goroutines blocked and waiting ... Is that normal?

goroutine-stack.txt

Here can see also the space_inuse from the heap

heap-broker-6h

Thank you very much!

Xavi

@xorduna xorduna changed the title Memory leak using OnMsgArrived Hook Memory leak using OnBasicAuthWrapper Hook Oct 21, 2022
@xorduna
Copy link
Author

xorduna commented Oct 26, 2022

Hi,

I digged deeper into the problem and it seems that there is a memory leak when the username / password is incorrect.

From what I have seen, there are two goroutines that never end:

128 @ 0x10096a524 0x10097ae18 0x100e93ebc 0x100e9bbf4 0x10099b744
#	0x100e93ebb	github.com/DrmagicE/gmqtt/server.(*client).writeLoop+0xab	/Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:308
#	0x100e9bbf3	github.com/DrmagicE/gmqtt/server.(*client).serve.func2+0x23	/Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:1458

128 @ 0x10096a524 0x10097bd2c 0x10097bd09 0x100996f08 0x1009b2500 0x100e9bad4 0x10099b744
#	0x100996f07	sync.runtime_Semacquire+0x27				/Users/xorduna/go/go1.19.1/src/runtime/sema.go:62
#	0x1009b24ff	sync.(*WaitGroup).Wait+0x7f				/Users/xorduna/go/go1.19.1/src/sync/waitgroup.go:139
#	0x100e9bad3	github.com/DrmagicE/gmqtt/server.(*client).serve+0x363	/Users/xorduna/devel/myc-cloud/vendor/github.com/DrmagicE/gmqtt/server/client.go:1494

When such error occurs, the goroutine serve() in server/client.go never ends, in fact it gets blocked by the client.wg.Wait()at the end.

I tried to call manually to client.Close() in case there is an error, with the following code:

	if client.err != nil {
		client.Close()
		return
	}

But then the this goroutine

go func() { //write
		client.writeLoop()
		client.wg.Done()
	}()

gets blocked in client.writeLoop().

From what I understand, the defer client.internalClose() should closed the channel where writeLoop() is listening, and also in the writeLoop() when an error is sent, this goroutine should return, but somehow it keeps running.

Can somebody give some light on this problem?

Thank you very much!

Xavi

@xorduna
Copy link
Author

xorduna commented Oct 27, 2022

Hi,

It seems I got it fixed, adding this piece of code

	if client.err != nil {
		client.Close()
		close(client.close)
	}

just before client.wg.Wait() closes the client.close channel, which causes the writeLoop() to end and finally returning from serve().

I will deploy it in production and create a pull request, but it would be nice to get the opinion of the maintainer.

Thanks!

Xavi

@xorduna
Copy link
Author

xorduna commented Oct 27, 2022

Opened a pull request: #178

@DrmagicE
Copy link
Owner

Oh, that is an impressive deep dive. I can reproduce your problem and you are right. Give me some time, I am looking into it.

@DrmagicE
Copy link
Owner

@xorduna After some digging, I think the root cause is that the server does not close the underlying TCP connection when the client gives an invalid username/password, or lets say, under any authentication failed situations.

client.serve() is blocked by:

readWg.Wait()

When clients providing invalid username/password:

  • If the client closes the TCP connection after receiving the authentication failed Connack packet, then everything goes fine.
  • if the client also does not close the connection, then the readLoop and writeLoop goroutines will be blocked.

This two gorouting will be released when the underlying TCP connection get closed.

So I think this PR #178 may not solve the problem.

I think always close the underlying TCP connection would be the solution, like this:

func (client *client) setError(err error) {
	client.errOnce.Do(func() {
		if err != nil && err != io.EOF {
			zaplog.Error("connection lost",
				zap.String("client_id", client.opts.ClientID),
				zap.String("remote_addr", client.rwc.RemoteAddr().String()),
				zap.Error(err))
			client.err = err
			if client.version == packets.Version5 {
				if code, ok := err.(*codes.Error); ok {
					if client.IsConnected() {
						// send Disconnect
						client.write(&packets.Disconnect{
							Version: packets.Version5,
							Code:    code.Code,
							Properties: &packets.Properties{
								ReasonString: code.ReasonString,
								User:         kvsToProperties(code.UserProperties),
							},
						})
					}
				}
			}
			_ = client.rwc.Close() // add this line
		}
	})
}

But sorry I don't have much time to test it carefully, would you please verify this and raise another PR if it works? Many thanks!

@xorduna
Copy link
Author

xorduna commented Oct 29, 2022

HI @DrmagicE

Thanks for answering!

I tried your fix, but when executing _ = client.rwc.Close() the connection is closed before sending the error to the client and the client gets an EOF error instead of a Bad username or password.

What about executing close(client.close) instead?

Thanks!

Xavi

@DrmagicE
Copy link
Owner

@xorduna Hi, sorry for the late reply. I have submitted a pr which should fix the problem.

HI @DrmagicE

Thanks for answering!

I tried your fix, but when executing _ = client.rwc.Close() the connection is closed before sending the error to the client and the client gets an EOF error instead of a Bad username or password.

What about executing close(client.close) instead?

Thanks!

Xavi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants