Tools and abstractions to interact with AMQP servers.
Define a publisher:
defmodule MyPublisher do
use Hare.Publisher
@config [exchange: [name: "foo"]]
def start_link(conn) do
Hare.Publisher.start_link(__MODULE__, conn, @config, [])
end
def publish(publisher, payload) do
Hare.Publisher.publish(publisher, payload)
end
end
Define a consumer:
defmodule MyConsumer do
use Hare.Consumer
@config [exchange: [name: "foo"],
queue: [name: "bar"]]
def start_link(conn, config) do
Hare.Consumer.start_link(__MODULE__, conn, config, [])
end
def handle_message(payload, _meta, state) do
IO.puts(payload)
{:reply, :ack, state}
end
end
Publish and consume a message:
{:ok, conn} = Hare.Conn.start_link(adapter: Hare.Adapter.AMQP)
{:ok, publisher} = MyPublisher.start_link(conn)
{:ok, _consumer} = MyConsumer.start_link(conn)
MyPublisher.publish(publisher, "hello world!")
# => Prints "hello world!\n"
If available in Hex, the package can be installed as:
-
Add
hare
to your list of dependencies inmix.exs
:def deps do [{:amqp, "~> 0.1.4", hex: :amqp19}, {:hare, "~> 0.1.9"}] end
-
Ensure
hare
is started before your application:def application do [applications: [:hare]] end
The first step to interact with an AMQP server is to establish a connection.
The Hare.Conn
starts a process that wraps a real connection. Starts a connection,
monitors it, and reconnects on failure.
{:ok, conn} = Hare.Conn.start_link(adapter: Hare.Adapter.AMQP,
backoff: [0, 100, 1000],
config: [host: "localhost", port: 1234])
# => {:ok, %Hare.Core.Conn{...}}
The example above shows all available options when starting a connection.
The :adapter
option is mandatory and it specifies what adapter to use.
An adapter must implement the Hare.Adapter
behaviour which includes functions
like connecting to the server, opening channels, declaring queues, publishing
messages, etc.
The only adapter provided is Hare.Adapter.AMQP
that uses the AMQP
library.
Note that in order to use the Hare.Adapter.AMQP
, the :amqp
must be explicitly
included as a dependency.
The :backoff
option specifies a set of intervals to wait before retrying connection
in case of failure. It defaults to [0, 10, 100, 1000, 5000]
.
On successive connection failures it waits successive intervals before retrying and then keeps waiting for the last interval to retry forever.
The :config
option will be given without modification to the adapter
open_connection/1
function. It defaults to []
.
The simplest abstraction provided is the Hare.Publisher
. It allows to publish
messages to a particular exchange.
It expects a connection and the exchange configuration in the following format:
config = [exchange: [name: "the.exchange.name",
type: :direct,
opts: [durable: true]]]
A module implementing a Hare.Publisher
must implement some callbacks (a default
implementation is provided with use Hare.Publisher
)
defmodule MyPublisher do
use Hare.Publisher
@config Application.get_env(:my_app, :my_publisher)
def start_link(conn, blacklist) do
Hare.Publisher.start_link(__MODULE__, conn, @config, blacklist)
end
defdelegate publish(pid, payload), to: Hare.Publisher
#
# Callbacks
#
def init(blacklist) do
{:ok, %{blacklist: blacklist}}
end
def before_publication(payload, _routing_key, _opts, state) do
case Enum.member?(state.blacklist, payload) do
true -> {:ok, state}
false -> {:ignore, state}
end
end
end
The above snippet implements a publisher. It receives a payloads blacklist when starts and keeps it in its internal state.
Before sending a message to the exchange it runs the before_publication/4
callback
that allows the publisher to ignore that message if it is blacklisted instead of
publishing it.
For more information about all available callbacks and its possible return values
check Hare.Publisher
documentation.
Another provided is the Hare.Consumer
. It allows to consume messages from
a particular queue.
It expects a connection and the exchange, queue, and binding configuration in the following format:
config = [exchange: [name: "the.exchange.name",
type: :direct,
opts: [durable: true]],
queue: [name: "the.queue.name",
opts: [durable: true]],
bind: [routing_key: "the.routing.key"]]
A module implementing a Hare.Consumer
must implement some callbacks (a default
implementation is provided with use Hare.Consumer
)
defmodule MyConsumer do
use Hare.Consumer
@config Application.get_env(:my_app, :my_consumer)
def start_link(conn, handler) do
Hare.Consumer.start_link(__MODULE__, conn, @config, handler)
end
#
# Callbacks
#
def init(handler) do
{:ok, %{handler: handler}}
end
def handle_message(payload, meta, %{handler: handler} = state) do
on_success = fn -> Hare.Consumer.ack(meta) end
on_failure = fn -> Hare.Consumer.reject(meta) end
handler.(payload, on_success, on_failure)
{:noreply, state}
end
end
The above snippet implements a consumer. It receives a handler function when starts and keeps it in its internal state.
When a message is received from the queue, the handle_message/3
callback
is called with the payload of the message, some metadata, and the internal
state of the consumer.
On the example above the handle_message/3
callback builds 2 anonymous functions,
one for a successfull scenario and another for an unexpected failure scenario,
and calls the handler with the payload and both functions.
This handler call may start another process, that will be able to ack or reject the message depending on the success of the message handling.
For more information about all available callbacks and its possible return values
check Hare.Consumer
documentation.
The abstraction Hare.RPC.Client
represents the client of a RPC communication
through the AMQP server. It allows to publish messages and wait for its responses.
It expects a connection and the exchange configuration in the following format:
config = [exchange: [name: "the.exchange.name",
type: :direct,
opts: [durable: true]]]
A module implementing a Hare.RPC.Client
must implement some callbacks (a default
implementation is provided with use Hare.RPC.Client
)
defmodule MyClient do
use Hare.RPC.Client
@config Application.get_env(:my_app, :my_client)
def start_link(conn) do
Hare.RPC.Client.start_link(__MODULE__, conn, @config, :ok)
end
#
# Callbacks
#
def init(:ok) do
{:ok, %{pending: %{}, cache: %{}}}
end
def before_request(payload, _routing_key, _opts, from, state) do
case Map.fetch(state.cache, payload) do
{:ok, response} ->
{:reply, {:ok, response}, state}
:error ->
new_pending = Map.put(state.pending, from, payload)
{:ok, %{state | pending: new_pending}}
end
end
def on_response(response, from, state) do
{payload, new_pending} = Map.pop(state.pending, from)
new_cache = Map.put(state.cache, payload, response)
{:reply, {:ok, response}, %{state | pending: new_pending, cache: new_cache}}
end
end
The above snippet implements a cached RPC client. It keeps pending requests data and a cache in its internal state.
Before performing the request it checks if the current payload is already cached, if so it replies to the caller without performing the request.
If the current payload is not cached it stores it in relation with a term representing the caller as a pending request.
When the response arrives it gets the payload of that call from the pending requests, stores the payload-response relation in cache, and responds to the caller.
For more information about all available callbacks and its possible return values
check Hare.RPC.Client
documentation.
The other side of the RPC Client is represented by the Hare.RPC.Server
abstraction.
It allows to consume requests from a particular queue and reply to the
specified queue through the default exchange.
It expects a connection and the exchange, queue, and binding configuration in the following format:
config = [exchange: [name: "the.exchange.name",
type: :direct,
opts: [durable: true]],
queue: [name: "the.queue.name",
opts: [durable: true]],
bind: [routing_key: "the.routing.key"]]
A module implementing a Hare.RPC.Server
must implement some callbacks (a default
implementation is provided with use Hare.RPC.Server
)
defmodule MyServer do
use Hare.RPC.Server
@config Application.get_env(:my_app, :my_server)
def start_link(conn, handler) do
Hare.RPC.Server.start_link(__MODULE__, conn, @config, handler)
end
#
# Callbacks
#
def init(handler) do
{:ok, %{handler: handler}}
end
def handle_message(payload, meta, %{handler: handler} = state) do
callback = &Hare.RPC.Server.reply(meta, &1)
handler.(payload, callback)
{:noreply, state}
end
end
The above snippet implements a rpc server. It receives a handler function when starts and keeps it in its internal state.
When a message is received from the queue, the handle_message/3
callback
is called with the payload of the message, some metadata, and the internal
state of the rpc server.
On the example above the handle_message/3
builds a callback to be used
to send a response to the client, and calls the handler with the payload
and the callback.
This handler call may start another process, that will be able to respond to the client when the response is built.
For more information about all available callbacks and its possible return values
check Hare.RPC.Server
documentation.