lib/tai/orders/order_transition_worker.ex

defmodule Tai.Orders.OrderTransitionWorker do
  use GenServer
  alias Tai.Orders.{Order, Services}

  @moduledoc """
  The OrderTransitionWorker acts as a locking mechanism to ensure an order
  can only be updated sequentially to avoid race conditions when receiving
  the accepted response from a request and asynchronously receiving the result
  of that request.

  Below is an example of the sequence that can occur when creating an order:

  send HTTP request to create order ->
                                      <- asynchronously receive on stream that order created successfully
                                       <- response from HTTP request that create order was accepted

  Multiple orders can be updated in parallel by using 2 or more workers. Orders
  are deterministically sent to the same worker using the following algorithm.

  ```
  hash(order.client_id) % order_transition_worker_count
  ```
  """

  @type order :: Order.t()
  @type client_id :: Order.client_id()
  @type attrs :: Services.ApplyOrderTransition.attrs()

  @spec start_link(pos_integer) :: GenServer.on_start()
  def start_link(idx) do
    name = process_name(idx)
    GenServer.start_link(__MODULE__, idx, name: name)
  end

  @spec process_name(pos_integer) :: atom
  def process_name(idx), do: :"#{__MODULE__}_#{idx}"

  @spec apply(client_id, attrs) :: {:ok, order} | {:error, term}
  def apply(client_id, transition_attrs) do
    client_id
    |> worker_idx()
    |> process_name()
    |> GenServer.call({:apply, client_id, transition_attrs})
  end

  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_call({:apply, client_id, transition_attrs}, _from, state) do
    result = Services.ApplyOrderTransition.call(client_id, transition_attrs)
    {:reply, result, state}
  end

  defp worker_idx(client_id) do
    config = Tai.Config.parse()
    hash = Murmur.hash_x86_32(client_id)
    rem(hash, config.order_transition_workers)
  end
end