lib/tai/orders/worker.ex

defmodule Tai.Orders.Worker do
  use GenServer

  alias Tai.Venues.Adapter

  alias Tai.Orders.{
    OrderTransitionWorker,
    Services,
    SubmissionFactory,
    Responses,
    Transition
  }

  defmodule State do
    defstruct ~w[tasks]a
  end

  @type submission :: SubmissionFactory.submission()
  @type order :: Tai.Orders.Order.t()
  @type status :: atom
  @type transition :: Transition.t()
  @type invalid_status_error_reason :: {:invalid_status, was :: status, transition}
  @type create_result :: {:ok, order} | {:error, Adapter.create_order_error_reason()}
  @type cancel_result ::
          {:ok, order}
          | {:error, invalid_status_error_reason | Adapter.cancel_order_error_reason()}
  @type amend_attrs :: %{
          optional(:price) => Decimal.t(),
          optional(:qty) => Decimal.t()
        }
  @type amend_result ::
          {:ok, updated :: order}
          | {:error, invalid_status_error_reason | Adapter.amend_order_error_reason()}
  @type amend_bulk_reject_reason :: invalid_status_error_reason
  @type amend_bulk_result :: [{:ok, updated :: order} | {:error, amend_bulk_reject_reason}]

  def start_link(_) do
    state = %State{tasks: %{}}
    GenServer.start_link(__MODULE__, state)
  end

  @spec create(pid, submission) :: create_result
  def create(pid, submission) do
    GenServer.call(pid, {:create, submission})
  end

  @spec cancel(pid, order) :: cancel_result
  def cancel(pid, order) do
    GenServer.call(pid, {:cancel, order})
  end

  @spec amend(pid, order, amend_attrs) :: amend_result
  def amend(pid, order, attrs) do
    GenServer.call(pid, {:amend, order, attrs})
  end

  @spec amend_bulk(pid, [{order, amend_attrs}]) :: amend_bulk_result()
  def amend_bulk(pid, amend_set) do
    GenServer.call(pid, {:amend_bulk, amend_set})
  end

  @impl true
  def init(state) do
    {:ok, state}
  end

  @impl true
  def handle_call({:create, submission}, _from, state) do
    submission
    |> enqueue()
    |> case do
      {:ok, order} ->
        state =
          with_task(
            fn ->
              if Tai.Settings.send_orders?() do
                try do
                  order
                  |> send_create_to_venue()
                  |> parse_create_response()
                rescue
                  e ->
                    {e, __STACKTRACE__}
                    |> rescue_create_venue_adapter_error(order)
                end
              else
                order.client_id
                |> skip()
              end
            end,
            state
          )

        {:reply, {:ok, order}, state}

      {:error, _} = error ->
        {:reply, error, state}
    end
  end

  @impl true
  def handle_call({:cancel, order}, _from, state) do
    order
    |> pend_cancel()
    |> case do
      {:ok, order_pending_cancel} ->
        state =
          with_task(
            fn ->
              try do
                order_pending_cancel
                |> send_cancel_to_venue()
                |> parse_cancel_response()
              rescue
                e ->
                  {e, __STACKTRACE__}
                  |> rescue_cancel_venue_adapter_error(order_pending_cancel)
              end
            end,
            state
          )

        {:reply, {:ok, order_pending_cancel}, state}

      {:error, {:invalid_status, _, _}} = error ->
        {:reply, error, state}
    end
  end

  @impl true
  def handle_call({:amend, order, attrs}, _from, state) do
    order
    |> pend_amend()
    |> case do
      {:ok, order_pending_amend} ->
        state =
          with_task(
            fn ->
              try do
                order_pending_amend
                |> send_amend_to_venue(attrs)
                |> parse_amend_response()
              rescue
                e ->
                  {e, __STACKTRACE__}
                  |> rescue_amend_venue_adapter_error(order_pending_amend)
              end
            end,
            state
          )

        {:reply, {:ok, order_pending_amend}, state}

      {:error, {:invalid_status, _, _}} = error ->
        {:reply, error, state}
    end
  end

  @impl true
  def handle_call({:amend_bulk, amend_set}, _from, state) do
    pending_orders = pend_amend_bulk(amend_set)

    state =
      with_task(
        fn ->
          try do
            pending_orders
            |> Enum.reduce([], fn
              {:ok, pending_order}, acc ->
                {_, order_attributes} =
                  Enum.find(amend_set, fn {order, _} ->
                    order.client_id == pending_order.client_id
                  end)

                [{pending_order, order_attributes} | acc]

              {:error, _}, acc ->
                acc
            end)
            |> send_amend_bulk_to_venue()
            |> parse_amend_bulk_response(amend_set)
          rescue
            e ->
              {e, __STACKTRACE__}
              |> rescue_amend_bulk_venue_adapter_error(amend_set)
          end
        end,
        state
      )

    {:reply, pending_orders, state}
  end

  defp with_task(callback, state) do
    task = Task.async(callback)
    tasks = Map.put(state.tasks, task.ref, task)
    %{state | tasks: tasks}
  end

  @impl true
  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
    tasks = Map.delete(state.tasks, ref)
    state = %{state | tasks: tasks}
    {:noreply, state}
  end

  @impl true
  def handle_info({_ref, _result}, state) do
    {:noreply, state}
  end

  ###################
  # create order
  ###################
  defp enqueue(submission) do
    Services.EnqueueOrder.call(submission)
  end

  defp send_create_to_venue(order) do
    result = Tai.Venues.Client.create_order(order)
    {result, order}
  end

  defp parse_create_response({{:ok, %Responses.CreateAccepted{} = response}, order}) do
    OrderTransitionWorker.apply(order.client_id, %{
      venue_order_id: response.id,
      last_received_at: Tai.Time.monotonic_to_date_time!(response.received_at),
      last_venue_timestamp: response.venue_timestamp,
      __type__: :accept_create
    })
  end

  defp parse_create_response({{:error, reason}, order}) do
    OrderTransitionWorker.apply(order.client_id, %{
      reason: reason,
      __type__: :venue_create_error
    })
  end

  defp rescue_create_venue_adapter_error({error, stacktrace}, order) do
    OrderTransitionWorker.apply(order.client_id, %{
      error: error,
      stacktrace: stacktrace,
      __type__: :rescue_create_error
    })
  end

  defp skip(client_id) do
    OrderTransitionWorker.apply(client_id, %{
      __type__: :skip
    })
  end

  ###################
  # cancel order
  ###################
  defp pend_cancel(order) do
    OrderTransitionWorker.apply(order.client_id, %{
      __type__: :pend_cancel
    })
  end

  defp send_cancel_to_venue(order) do
    {order, Tai.Venues.Client.cancel_order(order)}
  end

  defp parse_cancel_response({order, {:ok, %Responses.CancelAccepted{} = response}}) do
    OrderTransitionWorker.apply(order.client_id, %{
      last_received_at: Tai.Time.monotonic_to_date_time!(response.received_at),
      last_venue_timestamp: response.venue_timestamp,
      __type__: :accept_cancel
    })
  end

  defp parse_cancel_response({order, {:error, reason}}) do
    OrderTransitionWorker.apply(order.client_id, %{
      reason: reason,
      __type__: :venue_cancel_error
    })
  end

  defp rescue_cancel_venue_adapter_error({error, stacktrace}, order) do
    OrderTransitionWorker.apply(order.client_id, %{
      error: error,
      stacktrace: stacktrace,
      __type__: :rescue_cancel_error
    })
  end

  ###################
  # amend order
  ###################
  defp pend_amend(order) do
    OrderTransitionWorker.apply(order.client_id, %{
      __type__: :pend_amend
    })
  end

  def send_amend_to_venue(order, attrs) do
    {order, Tai.Venues.Client.amend_order(order, attrs)}
  end

  defp parse_amend_response({order, {:ok, %Responses.AmendAccepted{} = response}}) do
    OrderTransitionWorker.apply(order.client_id, %{
      last_received_at: response.received_at,
      last_venue_timestamp: response.venue_timestamp,
      __type__: :accept_amend
    })
  end

  defp parse_amend_response({order, {:error, reason}}) do
    OrderTransitionWorker.apply(order.client_id, %{
      reason: reason,
      __type__: :venue_amend_error
    })
  end

  defp rescue_amend_venue_adapter_error({error, stacktrace}, order) do
    OrderTransitionWorker.apply(order.client_id, %{
      error: error,
      stacktrace: stacktrace,
      __type__: :rescue_amend_error
    })
  end

  ###################
  # amend bulk order
  ###################
  defp pend_amend_bulk(amend_set) do
    amend_set
    |> Enum.map(fn {order, _} ->
      pend_amend(order)
    end)
  end

  defp send_amend_bulk_to_venue(orders) do
    Tai.Venues.Client.amend_bulk_orders(orders)
  end

  defp parse_amend_bulk_response(
         {:ok, %Responses.AmendBulk{orders: amend_responses}},
         amend_set
       ) do
    amend_responses
    |> Enum.map(fn amend_response ->
      order =
        amend_set
        |> Enum.find(fn {o, _} -> o.venue_order_id == amend_response.id end)
        |> elem(0)

      OrderTransitionWorker.apply(order.client_id, %{
        last_received_at: amend_response.received_at,
        last_venue_timestamp: amend_response.venue_timestamp,
        __type__: :accept_amend
      })
    end)
  end

  defp parse_amend_bulk_response({:error, reason}, amend_set) do
    amend_set
    |> Enum.map(fn {order, _} ->
      OrderTransitionWorker.apply(order.client_id, %{
        reason: reason,
        __type__: :venue_amend_error
      })
    end)
  end

  defp rescue_amend_bulk_venue_adapter_error({error, stacktrace}, amend_set) do
    amend_set
    |> Enum.map(fn {order, _} ->
      OrderTransitionWorker.apply(order.client_id, %{
        error: error,
        stacktrace: stacktrace,
        __type__: :rescue_amend_error
      })
    end)
  end
end