Skip to content

Commit

Permalink
refactor(listener): refactor listenter started funcs
Browse files Browse the repository at this point in the history
HJianBo authored and Rory-Z committed Aug 28, 2020
1 parent 3e7a950 commit 2bddd9e
Showing 1 changed file with 82 additions and 17 deletions.
99 changes: 82 additions & 17 deletions src/emqx_stomp.erl
Original file line number Diff line number Diff line change
@@ -25,26 +25,32 @@
, stop/1
]).

-export([ start_listener/0
, stop_listener/0
-export([ start_listeners/0
, start_listener/1
, start_listener/3
, stop_listeners/0
, stop_listener/1
, stop_listener/3
]).

-export([init/1]).

-define(APP, ?MODULE).
-define(TCP_OPTS, [binary, {packet, raw}, {reuseaddr, true}, {nodelay, true}]).

-type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}).

%%--------------------------------------------------------------------
%% Application callbacks
%%--------------------------------------------------------------------

start(_StartType, _StartArgs) ->
{ok, Sup} = supervisor:start_link({local, emqx_stomp_sup}, ?MODULE, []),
start_listener(),
start_listeners(),
{ok, Sup}.

stop(_State) ->
stop_listener().
stop_listeners().

%%--------------------------------------------------------------------
%% Supervisor callbacks
@@ -57,18 +63,77 @@ init([]) ->
%% Start/Stop listeners
%%--------------------------------------------------------------------

start_listener() ->
{ok, {Port, Opts}} = application:get_env(?APP, listener),
{ok, Env} = application:get_env(?APP, frame),
MFArgs = {emqx_stomp_connection, start_link, [Env]},
esockd:open(stomp, Port, merge_opts(Opts), MFArgs).

merge_opts(Opts) ->
TcpOpts = emqx_misc:merge_opts(
?TCP_OPTS, proplists:get_value(tcp_options, Opts, [])),
emqx_misc:merge_opts(Opts, [{tcp_options, TcpOpts}]).
-spec(start_listeners() -> ok).
start_listeners() ->
lists:foreach(fun start_listener/1, listeners_confs()).

-spec(start_listener(listener()) -> ok).
start_listener({Proto, ListenOn, Options}) ->
case start_listener(Proto, ListenOn, Options) of
{ok, _} -> io:format("Start stomp:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to start stomp:~s listener on ~s - ~0p~n!",
[Proto, format(ListenOn), Reason]),
error(Reason)
end.

-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
-> {ok, pid()} | {error, term()}).
start_listener(tcp, ListenOn, Options) ->
start_stomp_listener('stomp:tcp', ListenOn, Options);
start_listener(ssl, ListenOn, Options) ->
start_stomp_listener('stomp:ssl', ListenOn, Options).

%% @private
start_stomp_listener(Name, ListenOn, Options) ->
SockOpts = esockd:parse_opt(Options),
esockd:open(Name, ListenOn, merge_default(SockOpts),
{emqx_stomp_connection, start_link, [Options -- SockOpts]}).

-spec(stop_listeners() -> ok).
stop_listeners() ->
lists:foreach(fun stop_listener/1, listeners_confs()).

-spec(stop_listener(listener()) -> ok | {error, term()}).
stop_listener({Proto, ListenOn, Opts}) ->
StopRet = stop_listener(Proto, ListenOn, Opts),
case StopRet of
ok -> io:format("Stop stomp:~s listener on ~s successfully.~n",
[Proto, format(ListenOn)]);
{error, Reason} ->
io:format(standard_error, "Failed to stop stomp:~s listener on ~s - ~p~n.",
[Proto, format(ListenOn), Reason])
end,
StopRet.

-spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
-> ok | {error, term()}).
stop_listener(tcp, ListenOn, _Opts) ->
esockd:close('stomp:tcp', ListenOn);
stop_listener(ssl, ListenOn, _Opts) ->
esockd:close('stomp:ssl', ListenOn).

stop_listener() ->
{ok, {Port, _Opts}} = application:get_env(?APP, listener),
esockd:close({stomp, Port}).
%%--------------------------------------------------------------------
%% Internal funcs
%%--------------------------------------------------------------------

listeners_confs() ->
{ok, {Port, Opts}} = application:get_env(?APP, listener),
Options = application:get_env(?APP, frame, []),
[{tcp, Port, Options ++ Opts}].

merge_default(Options) ->
case lists:keytake(tcp_options, 1, Options) of
{value, {tcp_options, TcpOpts}, Options1} ->
[{tcp_options, emqx_misc:merge_opts(?TCP_OPTS, TcpOpts)} | Options1];
false ->
[{tcp_options, ?TCP_OPTS} | Options]
end.

format(Port) when is_integer(Port) ->
io_lib:format("0.0.0.0:~w", [Port]);
format({Addr, Port}) when is_list(Addr) ->
io_lib:format("~s:~w", [Addr, Port]);
format({Addr, Port}) when is_tuple(Addr) ->
io_lib:format("~s:~w", [inet:ntoa(Addr), Port]).

0 comments on commit 2bddd9e

Please sign in to comment.