At the beginning we were sending records to kinesis one by one. This led to some errors and lost of events.
But Amazon Kinesis can also send events in batch which is more efficient.
Also Elixir has a GenStage which is appropriate in our case. Moreover if each analytic type handle the sending of there own batch this will avoid that the all application crash and restart in case of an error and increase the throughput.
D = Dynamic Supervisor GS = GenStage component
                                              +---------------------+
                                              |                     |
          +-----------------------------------+     Application     +----------------------------------------------+
          |                    +--------------+                     +------------------+                           |
          |                    |              +----------+----------+                  |                           |
          |                    |                         |                             |                           |
          |                    |                         |                             |                           |
          v                    v                         v                             v                           v
+---------+---------+  +-------+------+  +---------------+---------------+  +----------+------------+  +-----------+----------+
|                   |  |              |  |                               |  |                       |  |                      |
| ServerSupervisor  |  |   Registry   |  | ConsumerDynamicSupervisor(D)  |  | ProducerSupervisor(D) |  | RecordsSupervisor(D) |
|                   |  |              |  |                               |  |                       |  |                      |
+---------+---------+  +--------------+  +---------------+---------------+  +----------+------------+  +-----------+----------+
          |                                              |                             |                           |
          |                         +--------------------|-----------------------------|---------------------------|-----------------+
          v                         |                    v                             v                           v                 |
 +--------+-------+                 |        +-----------+------------+         +------+-------+              +----+---+             |
 |                |                 |        |                        |         |              |              |        |             |
 | DispatchServer |                 |        | ConsumerSupervisor(GS) |         | Producer(GS) |              | Record |             |
 |                |                 |        |                        |         |              |              |        |             |
 +----------------+                 |        +-----------+------------+         +--------------+              +--------+             |
                                    |                    |                                                                           |
                                    |                    |                                                                           |
                                    |                    v                                                                           |
                                    |              +-----+----+                                                                      |
                                    |              |          |                                                                      |
                                    |              | +--------+-+                                                                    |
                                    |              | |          |                                                                    |
                                    |              +-+ +--------+-+                                                                  |
                                    |                | |          |                                                                  |
                                    |                +-+ Consumer |                                                                  |
                                    |                  |          |                                                                  |
                                    |                  +----------+                                                  Per record type |
                                    |                                                                                                |
                                    +------------------------------------------------------------------------------------------------+
This is an exemple with a MyEvent analytic module
When a module wants to record an event, it calls MyEvent.record(event).
This function will first determine if it needs to start a GenServer.
In the case is required to start one, MyEvent.record/1 will call the start_child of the RecordsSupervisor.
RecordsSupervisor is a dynamic supervisor meaning that it can start a GenServer and supervise it dynamically (for more information on DynamicSupervisor)
Because MyEvent use the Records module, the handling of starting the GenServer is transparent to you, but this is what is happening behind the scene.
On the initialisation of the GenServer it will:
- Create an ETStable tunned for concurrent reading and writting to store the incomming event to be streamed.
- Call ProducerSupervisor.start_child/1with his own module name as parameter. As for theRecordsSupervisorProduceSupervisoris also aDynamicSupervisor. It will start dynamically aProducer, which in turn will do those actions:- Get is own PID
- Call ConsumerDynanicSupervisor.start_child/2module with the record module name and thePID. As before, this is aDynamicSupervisorwhich will start aConsumerSupervisor. TheConsumerSupervisormodule is actually a ConsumerSupervisor fronGenStagewhich will subscribed to thePIDreceived as argument and can handle amax_demanddefined in configuration and start a process callingConsumer.start_link/2. The extra parameter passed toConsumer.start_link/2is the record module name.
- Start the internal timer to call himself every X seconds (defined in the configuration).
- Create or get the current queue of the records calling record_module.create_or_get_link/0.
 
- Get is own 
- Add the event into the queue.
As for the timer of the Producer it will:
- Call record_module.create_batches_and_flush/0which will:- Get all the current events in the queue
- Create chunks of the same size (defined in the configuration)
- Flush the current queue
- Returns those chunks
 
- Add up the new chunks with the current internal batch queue.
- If the number of batches is enough, it will dispatch them through the GenStagemechanism. Otherwise nothing else is done.
When a new batch is received in the GenStage pipe, the ConsumerSupervisor will create the correct amount of Consumer processes.
The Consumer will start a task and if the batch is not empty will call the record_module.handle_event/1 with the current batch as parameter.
The handle_event function will send the batch of events to Kinesis with only one request.
It is allowed to send to several streams if required.
defmodule MyApp.MyEvent do
  use Analytics.Records
  alias Analytics.Adapter.Kinesis.Message
  def record(my_data), do: send_record(my_data)
  @impl true
  def handle_event(events) do
    stream  =
      events
      |> Enum.map(fn my_data ->
        event_id = generate_event_id()
        %Message{
           partition_key: to_string(my_data.id),
           data: ~s/{"event_id": "#{event_id}", "data": "#{my_data}"}/
         }
      end)
    # `send_data/3`, `Analytics.Records`
    send_data(__MODULE__, "my_stream", stream)
  rescue
    exception ->
      reraise exception, __STACKTRACE__
  end
  defp event_json(event_id) do
    timestamp = Timex.now() |> Timex.to_unix()
    [
      "{",
      [~s("event_id": "), event_id, ~s(")],
      "}"
    ]
    |> List.to_string()
  end
end