Skip to main content

lib/ferric_store/flow.ex

defmodule FerricStore.Flow do
  @moduledoc """
  High-level FerricFlow command helpers.

  Functions build the same native command arguments as the Python SDK while
  keeping defaults simple: create returns an ack, claim returns compact jobs with
  attributes, and terminal commands return an ack unless `return_record: true`.
  """

  alias FerricStore.Client
  alias FerricStore.Codec.Raw
  alias FerricStore.Protocol

  def create(client, id, opts),
    do:
      Client.native(
        client,
        Protocol.opcode(:flow_create),
        create_payload(id, opts),
        client_opts(opts)
      )

  def enqueue(client, id, opts), do: create(client, id, Keyword.put_new(opts, :state, "queued"))

  def create_many(client, items, opts) do
    payload = create_many_payload(items, opts)

    Client.native(
      client,
      Protocol.opcode(:flow_create_many),
      compact_or_typed(payload, &Protocol.compact_flow_create_many_payload/1),
      client_opts(opts)
    )
  end

  def get(client, id, opts \\ []) do
    args = [id]
    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))

    args =
      append_read_payload(
        args,
        Keyword.get(opts, :payload),
        Keyword.get(opts, :payload_max_bytes)
      )

    args = append_values(args, Keyword.get(opts, :values), Keyword.get(opts, :value_max_bytes))
    Client.command(client, "FLOW.GET", args)
  end

  def list(client, opts \\ []) do
    Client.command(client, "FLOW.LIST", [Keyword.fetch!(opts, :type) | list_args(opts)])
  end

  def history(client, id, opts \\ []) do
    args = [id]
    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))
    args = append_bool(args, "VALUES", Keyword.get(opts, :values))
    Client.command(client, "FLOW.HISTORY", args)
  end

  def claim_due(client, type, opts) do
    case Client.native(
           client,
           Protocol.opcode(:flow_claim_due),
           claim_due_payload(type, opts),
           client_opts(opts)
         ) do
      {:error, _error} = error -> error
      jobs when is_list(jobs) -> Enum.map(jobs, &normalize_claim_job/1)
      other -> other
    end
  end

  def transition(client, id, opts),
    do: Client.command(client, "FLOW.TRANSITION", transition_args(id, opts))

  def complete(client, id, opts),
    do:
      Client.native(
        client,
        Protocol.opcode(:flow_complete),
        complete_payload(id, opts),
        client_opts(opts)
      )

  def complete_many(client, jobs, opts \\ []) do
    payload = complete_many_payload(jobs, opts)

    Client.native(
      client,
      Protocol.opcode(:flow_complete_many),
      compact_or_typed(payload, &Protocol.compact_flow_complete_many_payload/1),
      client_opts(opts)
    )
  end

  def retry(client, id, opts), do: Client.command(client, "FLOW.RETRY", retry_args(id, opts))
  def fail(client, id, opts), do: Client.command(client, "FLOW.FAIL", fail_args(id, opts))
  def cancel(client, id, opts), do: Client.command(client, "FLOW.CANCEL", cancel_args(id, opts))

  def value_put(client, value, opts \\ []) do
    codec = Keyword.get(opts, :codec, Raw)

    payload =
      %{"value" => codec.encode(value), "now_ms" => Keyword.get(opts, :now_ms, now_ms())}
      |> put_if_present("partition_key", Keyword.get(opts, :partition_key))
      |> put_if_present("owner_flow_id", Keyword.get(opts, :owner_flow_id))
      |> put_if_present("name", Keyword.get(opts, :name))
      |> put_if_present("override", Keyword.get(opts, :override))
      |> put_if_present("ttl_ms", Keyword.get(opts, :ttl_ms))
      |> put_if_present("local_cache", Keyword.get(opts, :local_cache))

    Client.native(client, Protocol.opcode(:flow_value_put), payload, client_opts(opts))
  end

  def value_mget(client, refs, opts \\ []) when is_list(refs) do
    payload =
      %{"refs" => refs}
      |> put_if_present("max_bytes", Keyword.get(opts, :max_bytes))
      |> put_if_present("value_max_bytes", Keyword.get(opts, :value_max_bytes))
      |> put_if_present("payload_max_bytes", Keyword.get(opts, :payload_max_bytes))

    Client.native(client, Protocol.opcode(:flow_value_mget), payload, client_opts(opts))
  end

  def signal(client, id, opts) do
    args = [id, "SIGNAL", Keyword.fetch!(opts, :signal)]
    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))
    args = append(args, "IDEMPOTENCY", Keyword.get(opts, :idempotency_key))
    args = append_many(args, "IF_STATE", Keyword.get(opts, :if_state))
    args = append(args, "TRANSITION_TO", Keyword.get(opts, :transition_to))
    args = append(args, "RUN_AT", Keyword.get(opts, :run_at_ms))
    args = append(args, "NOW", Keyword.get(opts, :now_ms, now_ms()))
    args = append(args, "PRIORITY", Keyword.get(opts, :priority))
    args = append_named_values(args, Keyword.get(opts, :codec, Raw), opts)
    Client.command(client, "FLOW.SIGNAL", args)
  end

  def create_args(id, opts) do
    codec = Keyword.get(opts, :codec, Raw)
    now = Keyword.get(opts, :now_ms, now_ms())

    args = [
      id,
      "TYPE",
      Keyword.fetch!(opts, :type),
      "STATE",
      Keyword.get(opts, :state, "queued"),
      "NOW",
      now
    ]

    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))
    args = append_encoded(args, "PAYLOAD", codec, Keyword.get(opts, :payload))
    args = append(args, "PARENT_FLOW_ID", Keyword.get(opts, :parent_flow_id))
    args = append(args, "ROOT_FLOW_ID", Keyword.get(opts, :root_flow_id))
    args = append(args, "CORRELATION_ID", Keyword.get(opts, :correlation_id))
    args = append(args, "RUN_AT", Keyword.get(opts, :run_at_ms, now))
    args = append(args, "PRIORITY", Keyword.get(opts, :priority))
    args = append_bool(args, "IDEMPOTENT", Keyword.get(opts, :idempotent))
    args = append(args, "RETENTION_TTL_MS", Keyword.get(opts, :retention_ttl_ms))
    args = append_attributes(args, opts)
    append_named_values(args, codec, opts)
  end

  def create_payload(id, opts) do
    codec = Keyword.get(opts, :codec, Raw)
    now = Keyword.get(opts, :now_ms, now_ms())

    %{
      "id" => id,
      "type" => Keyword.fetch!(opts, :type),
      "state" => Keyword.get(opts, :state, "queued"),
      "now_ms" => now,
      "run_at_ms" => Keyword.get(opts, :run_at_ms, now)
    }
    |> put_if_present("partition_key", Keyword.get(opts, :partition_key))
    |> put_if_present("payload", encoded_or_nil(codec, Keyword.get(opts, :payload)))
    |> put_if_present("payload_ref", Keyword.get(opts, :payload_ref))
    |> put_if_present(
      "parent_id",
      Keyword.get(opts, :parent_flow_id) || Keyword.get(opts, :parent_id)
    )
    |> put_if_present("root_id", Keyword.get(opts, :root_flow_id) || Keyword.get(opts, :root_id))
    |> put_if_present("correlation_id", Keyword.get(opts, :correlation_id))
    |> put_if_present("priority", Keyword.get(opts, :priority))
    |> put_if_present("retention_ttl_ms", Keyword.get(opts, :retention_ttl_ms))
    |> put_if_present("attributes", stringify_map(Keyword.get(opts, :attributes)))
    |> put_if_present("values", encode_value_map(codec, Keyword.get(opts, :values)))
    |> put_if_present("value_refs", stringify_map(Keyword.get(opts, :value_refs)))
  end

  def create_many_payload(items, opts) when is_list(items) do
    codec = Keyword.get(opts, :codec, Raw)
    now = Keyword.get(opts, :now_ms, now_ms())

    %{
      "items" => Enum.map(items, &create_many_item(&1, codec)),
      "type" => Keyword.fetch!(opts, :type),
      "state" => Keyword.get(opts, :state, "queued"),
      "now_ms" => now,
      "run_at_ms" => Keyword.get(opts, :run_at_ms, now)
    }
    |> put_if_present("partition_key", Keyword.get(opts, :partition_key))
    |> put_if_present("independent", Keyword.get(opts, :independent))
    |> put_if_present("return", many_return_mode(opts))
    |> put_if_present("priority", Keyword.get(opts, :priority))
    |> put_if_present("retention_ttl_ms", Keyword.get(opts, :retention_ttl_ms))
    |> put_if_present("attributes", stringify_map(Keyword.get(opts, :attributes)))
    |> put_if_present("values", encode_value_map(codec, Keyword.get(opts, :values)))
    |> put_if_present("value_refs", stringify_map(Keyword.get(opts, :value_refs)))
  end

  def claim_due_args(type, opts) do
    args = [type]
    args = append_many(args, "STATE", Keyword.get(opts, :states) || Keyword.get(opts, :state))

    args =
      args ++
        [
          "WORKER",
          Keyword.fetch!(opts, :worker),
          "LEASE_MS",
          Keyword.get(opts, :lease_ms, 30_000),
          "LIMIT",
          Keyword.get(opts, :limit, 1)
        ]

    args = append(args, "NOW", Keyword.get(opts, :now_ms))
    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))

    args =
      case Keyword.get(opts, :partition_keys) do
        nil -> args
        keys -> args ++ ["PARTITIONS", length(keys)] ++ keys
      end

    args = append(args, "PRIORITY", Keyword.get(opts, :priority))

    args =
      if Keyword.get(opts, :include_record, false) do
        args
      else
        append(
          args,
          "RETURN",
          claim_return_mode(
            Keyword.get(opts, :include_state, false),
            Keyword.get(opts, :include_attributes, true)
          )
        )
      end

    args = append(args, "BLOCK", Keyword.get(opts, :block_ms))

    args =
      append_read_payload(
        args,
        Keyword.get(opts, :payload),
        Keyword.get(opts, :payload_max_bytes)
      )

    args = append_values(args, Keyword.get(opts, :values), Keyword.get(opts, :value_max_bytes))
    args = append_bool(args, "RECLAIM_EXPIRED", Keyword.get(opts, :reclaim_expired))
    append(args, "RECLAIM_RATIO", Keyword.get(opts, :reclaim_ratio))
  end

  def claim_due_payload(type, opts) do
    %{
      "type" => type,
      "worker" => Keyword.fetch!(opts, :worker),
      "lease_ms" => Keyword.get(opts, :lease_ms, 30_000),
      "limit" => Keyword.get(opts, :limit, 1),
      "return" =>
        claim_return_mode(
          Keyword.get(opts, :include_state, false),
          Keyword.get(opts, :include_attributes, true)
        )
    }
    |> put_if_present("now_ms", Keyword.get(opts, :now_ms))
    |> put_claim_state(opts)
    |> put_if_present("partition_key", Keyword.get(opts, :partition_key))
    |> put_if_present("partition_keys", Keyword.get(opts, :partition_keys))
    |> put_if_present("priority", Keyword.get(opts, :priority))
    |> put_if_present("block_ms", Keyword.get(opts, :block_ms))
    |> put_if_present("payload", Keyword.get(opts, :payload))
    |> put_if_present("payload_max_bytes", Keyword.get(opts, :payload_max_bytes))
    |> put_if_present("values", Keyword.get(opts, :values))
    |> put_if_present("value_max_bytes", Keyword.get(opts, :value_max_bytes))
    |> put_if_present("reclaim_expired", Keyword.get(opts, :reclaim_expired))
    |> put_if_present("reclaim_ratio", Keyword.get(opts, :reclaim_ratio))
  end

  def transition_args(id, opts) do
    codec = Keyword.get(opts, :codec, Raw)
    now = Keyword.get(opts, :now_ms, now_ms())

    args = [
      id,
      Keyword.fetch!(opts, :from_state),
      Keyword.fetch!(opts, :to_state),
      "LEASE_TOKEN",
      Keyword.fetch!(opts, :lease_token),
      "FENCING",
      Keyword.fetch!(opts, :fencing_token),
      "NOW",
      now
    ]

    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))
    args = append_encoded(args, "PAYLOAD", codec, Keyword.get(opts, :payload))
    args = append(args, "RUN_AT", Keyword.get(opts, :run_at_ms, now))
    args = append(args, "PRIORITY", Keyword.get(opts, :priority))
    args = append_attributes(args, opts)
    append_named_values(args, codec, opts)
  end

  def complete_args(id, opts), do: terminal_args(id, opts, "RESULT")
  def complete_payload(id, opts), do: terminal_payload(id, opts, "result")

  def complete_many_payload(jobs, opts \\ []) when is_list(jobs) do
    codec = Keyword.get(opts, :codec, Raw)

    %{
      "items" => Enum.map(jobs, &complete_many_item/1),
      "now_ms" => Keyword.get(opts, :now_ms, now_ms())
    }
    |> put_if_present("partition_key", Keyword.get(opts, :partition_key))
    |> put_if_present("independent", Keyword.get(opts, :independent))
    |> put_if_present("return", many_return_mode(opts))
    |> put_if_present("result", encoded_or_nil(codec, Keyword.get(opts, :result)))
    |> put_if_present("payload", encoded_or_nil(codec, Keyword.get(opts, :payload)))
    |> put_if_present("ttl_ms", Keyword.get(opts, :ttl_ms))
    |> put_if_present("attributes_merge", stringify_map(Keyword.get(opts, :attributes_merge)))
    |> put_if_present("attributes_delete", Keyword.get(opts, :attributes_delete))
    |> put_if_present("values", encode_value_map(codec, Keyword.get(opts, :values)))
    |> put_if_present("value_refs", stringify_map(Keyword.get(opts, :value_refs)))
    |> put_if_present("drop_values", Keyword.get(opts, :drop_values))
    |> put_if_present("override_values", Keyword.get(opts, :override_values))
  end

  def retry_args(id, opts) do
    id
    |> terminal_args(opts, "ERROR")
    |> append("RUN_AT", Keyword.get(opts, :run_at_ms))
  end

  def fail_args(id, opts), do: terminal_args(id, opts, "ERROR")

  def cancel_args(id, opts) do
    args = [
      id,
      "FENCING",
      Keyword.fetch!(opts, :fencing_token),
      "NOW",
      Keyword.get(opts, :now_ms, now_ms())
    ]

    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))
    args = append(args, "REASON", Keyword.get(opts, :reason) || Keyword.get(opts, :error))
    args = append(args, "TTL", Keyword.get(opts, :ttl_ms))
    args = append_attributes(args, opts)
    append_named_values(args, Keyword.get(opts, :codec, Raw), opts)
  end

  defp terminal_args(id, opts, value_name) do
    codec = Keyword.get(opts, :codec, Raw)

    args = [
      id,
      Keyword.fetch!(opts, :lease_token),
      "FENCING",
      Keyword.fetch!(opts, :fencing_token),
      "NOW",
      Keyword.get(opts, :now_ms, now_ms())
    ]

    args = append(args, "PARTITION", Keyword.get(opts, :partition_key))

    args =
      append_encoded(
        args,
        value_name,
        codec,
        Keyword.get(opts, :result) || Keyword.get(opts, :error)
      )

    args = append_encoded(args, "PAYLOAD", codec, Keyword.get(opts, :payload))
    args = append(args, "TTL", Keyword.get(opts, :ttl_ms))
    args = append_attributes(args, opts)
    append_named_values(args, codec, opts)
  end

  defp terminal_payload(id, opts, value_name) do
    codec = Keyword.get(opts, :codec, Raw)

    %{
      "id" => id,
      "lease_token" => Keyword.fetch!(opts, :lease_token),
      "now_ms" => Keyword.get(opts, :now_ms, now_ms())
    }
    |> put_if_present("fencing_token", Keyword.get(opts, :fencing_token))
    |> put_if_present("partition_key", Keyword.get(opts, :partition_key))
    |> put_if_present(
      value_name,
      encoded_or_nil(codec, Keyword.get(opts, :result) || Keyword.get(opts, :error))
    )
    |> put_if_present("payload", encoded_or_nil(codec, Keyword.get(opts, :payload)))
    |> put_if_present("ttl_ms", Keyword.get(opts, :ttl_ms))
    |> put_if_present("attributes", stringify_map(Keyword.get(opts, :attributes)))
    |> put_if_present("attributes_merge", stringify_map(Keyword.get(opts, :attributes_merge)))
    |> put_if_present("attributes_delete", Keyword.get(opts, :attributes_delete))
    |> put_if_present("values", encode_value_map(codec, Keyword.get(opts, :values)))
    |> put_if_present("value_refs", stringify_map(Keyword.get(opts, :value_refs)))
    |> put_if_present("drop_values", Keyword.get(opts, :drop_values))
    |> put_if_present("override_values", Keyword.get(opts, :override_values))
  end

  defp list_args(opts) do
    []
    |> append("STATE", Keyword.get(opts, :state))
    |> append("PARTITION", Keyword.get(opts, :partition_key))
    |> append("COUNT", Keyword.get(opts, :count))
    |> append("FROM", Keyword.get(opts, :from_ms))
    |> append("TO", Keyword.get(opts, :to_ms))
    |> append_bool("REV", Keyword.get(opts, :rev))
    |> append_attribute_filters(Keyword.get(opts, :attributes))
  end

  defp append(args, _name, nil), do: args
  defp append(args, name, value), do: args ++ [name, value]

  defp put_if_present(map, _key, nil), do: map
  defp put_if_present(map, key, value), do: Map.put(map, key, value)

  defp compact_or_typed(payload, compact_fun) do
    case compact_fun.(payload) do
      {:ok, compact_payload} -> Protocol.custom_payload(compact_payload)
      :error -> payload
    end
  end

  defp client_opts(opts), do: Keyword.take(opts, [:timeout, :lane_id])

  defp put_claim_state(map, opts) do
    case {Keyword.get(opts, :states), Keyword.get(opts, :state)} do
      {states, _state} when is_list(states) -> Map.put(map, "states", states)
      {nil, nil} -> map
      {nil, state} -> Map.put(map, "state", state)
      {state, _} -> Map.put(map, "state", state)
    end
  end

  defp append_bool(args, _name, nil), do: args
  defp append_bool(args, name, true), do: args ++ [name, true]
  defp append_bool(args, name, false), do: args ++ [name, false]

  defp append_read_payload(args, false, _max_bytes), do: args ++ ["NOPAYLOAD"]

  defp append_read_payload(args, _payload, max_bytes) when is_integer(max_bytes),
    do: args ++ ["PAYLOAD", "MAXBYTES", max_bytes]

  defp append_read_payload(args, true, _max_bytes), do: args ++ ["PAYLOAD"]
  defp append_read_payload(args, _payload, _max_bytes), do: args

  defp append_encoded(args, _name, _codec, nil), do: args
  defp append_encoded(args, name, codec, value), do: args ++ [name, codec.encode(value)]

  defp encoded_or_nil(_codec, nil), do: nil
  defp encoded_or_nil(codec, value), do: codec.encode(value)

  defp stringify_map(nil), do: nil

  defp stringify_map(map) when is_map(map),
    do: Map.new(map, fn {key, value} -> {to_string(key), value} end)

  defp encode_value_map(_codec, nil), do: nil

  defp encode_value_map(codec, map) when is_map(map) do
    Map.new(map, fn {key, value} -> {to_string(key), codec.encode(value)} end)
  end

  defp many_return_mode(opts) do
    if Keyword.get(opts, :return_ok_on_success, false), do: "OK_ON_SUCCESS"
  end

  defp create_many_item(id, _codec) when is_binary(id), do: [id, ""]
  defp create_many_item({id, payload}, codec), do: [id, codec.encode(payload)]

  defp create_many_item(%{} = item, codec) do
    id = Map.get(item, "id") || Map.get(item, :id)
    partition_key = Map.get(item, "partition_key") || Map.get(item, :partition_key)
    payload = encoded_or_nil(codec, Map.get(item, "payload") || Map.get(item, :payload)) || ""

    if partition_key, do: [id, partition_key, payload], else: [id, payload]
  end

  defp complete_many_item(%{"partition_key" => partition_key} = job)
       when not is_nil(partition_key) do
    [
      Map.fetch!(job, "id"),
      partition_key,
      Map.fetch!(job, "lease_token"),
      Map.fetch!(job, "fencing_token")
    ]
  end

  defp complete_many_item(%{
         "id" => id,
         "lease_token" => lease_token,
         "fencing_token" => fencing_token
       }) do
    [id, lease_token, fencing_token]
  end

  defp complete_many_item({id, lease_token, fencing_token}), do: [id, lease_token, fencing_token]

  defp complete_many_item({id, partition_key, lease_token, fencing_token}),
    do: [id, partition_key, lease_token, fencing_token]

  defp append_many(args, _name, nil), do: args

  defp append_many(args, name, values) when is_list(values),
    do: Enum.reduce(values, args, &append(&2, name, &1))

  defp append_many(args, name, value), do: append(args, name, value)

  defp append_values(args, nil, nil), do: args

  defp append_values(args, values, max_bytes) do
    args = append_many(args, "VALUE", values)
    append(args, "VALUE_MAX_BYTES", max_bytes)
  end

  defp append_attributes(args, opts) do
    args = append_attribute_filters(args, Keyword.get(opts, :attributes), "ATTRIBUTE")
    args = append_attribute_filters(args, Keyword.get(opts, :attributes_merge), "ATTRIBUTE_MERGE")
    append_many(args, "ATTRIBUTE_DELETE", Keyword.get(opts, :attributes_delete))
  end

  defp append_attribute_filters(args, attributes, prefix \\ "ATTRIBUTE")
  defp append_attribute_filters(args, nil, _prefix), do: args

  defp append_attribute_filters(args, attributes, prefix) when is_map(attributes) do
    Enum.reduce(attributes, args, fn {name, value}, acc ->
      acc ++ [prefix, to_string(name), value]
    end)
  end

  defp append_named_values(args, codec, opts) do
    args =
      Enum.reduce(Keyword.get(opts, :values, %{}) || %{}, args, fn {name, value}, acc ->
        acc ++ ["VALUE", to_string(name), codec.encode(value)]
      end)

    args =
      Enum.reduce(Keyword.get(opts, :value_refs, %{}) || %{}, args, fn {name, ref}, acc ->
        acc ++ ["VALUE_REF", to_string(name), ref]
      end)

    args = append_many(args, "DROP_VALUE", Keyword.get(opts, :drop_values))
    append_many(args, "OVERRIDE_VALUE", Keyword.get(opts, :override_values))
  end

  defp claim_return_mode(false, false), do: "JOBS_COMPACT"
  defp claim_return_mode(true, false), do: "JOBS_COMPACT_STATE"
  defp claim_return_mode(false, true), do: "JOBS_COMPACT_ATTRS"
  defp claim_return_mode(true, true), do: "JOBS_COMPACT_STATE_ATTRS"

  defp normalize_claim_job([id, partition_key, lease_token, fencing_token, state, attributes]) do
    %{
      "id" => id,
      "partition_key" => partition_key,
      "lease_token" => lease_token,
      "fencing_token" => fencing_token,
      "state" => state,
      "attributes" => attributes
    }
  end

  defp normalize_claim_job([id, partition_key, lease_token, fencing_token, attributes]) do
    %{
      "id" => id,
      "partition_key" => partition_key,
      "lease_token" => lease_token,
      "fencing_token" => fencing_token,
      "attributes" => attributes
    }
  end

  defp normalize_claim_job([id, partition_key, lease_token, fencing_token]) do
    %{
      "id" => id,
      "partition_key" => partition_key,
      "lease_token" => lease_token,
      "fencing_token" => fencing_token
    }
  end

  defp normalize_claim_job([id, lease_token, fencing_token]) do
    %{"id" => id, "lease_token" => lease_token, "fencing_token" => fencing_token}
  end

  defp normalize_claim_job(job), do: job

  defp now_ms, do: System.system_time(:millisecond)
end