lib/spear/event.ex

defmodule Spear.Event do
  @moduledoc """
  A simplified event struct

  This event struct is easier to work with than the protobuf definitions for
  AppendReq and ReadResp records
  """

  require Spear.Records.Streams, as: Streams
  require Spear.Records.Persistent, as: Persistent
  require Spear.Records.Shared, as: Shared

  defstruct [:id, :type, :body, :link, metadata: %{}]

  @typedoc """
  A struct representing an EventStoreDB event

  `t:Spear.Event.t/0`s may be created to write to the EventStoreDB with
  `new/3`. `t:Spear.Event.t/0`s will be lazily mapped into
  gRPC-compatible structs before being written to the EventStoreDB with
  `to_proposed_message/2`.

  `t:Spear.Event.t/0`s typically look different between events which are
  written to- and events which are read from the EventStoreDB. Read events
  contain more metadata which pertains to EventStoreDB specifics like
  the creation timestamp of the event.

  ## Links

  The `:link` field of a `t:Spear.Event.t/0` can contain another
  `t:Spear.Event.t/0` struct. Link events are pointers to other events and are
  used by the EventStoreDB to provide the projections feature without having
  to duplicate events across streams. Links do not usually contain any
  information useful to consumers, but clients must keep track of links in
  order to keep an accurate position in a projected stream, or in order to
  `Spear.ack/3`/`Spear.nack/4` the projected events in a persistent
  subscription.

  The `:from` option in functions like `Spear.read_stream/3`, `Spear.stream!/3`
  or `Spear.subscribe/4` will use the link event's `.metadata.stream_revision`
  when reading projected streams. `Spear.ack/3` and `Spear.nack/4` take
  the link's `:id` field when passed a `t:Spear.Event.t/0`. When not passing
  `t:Spear.Event.t/0`s (for example, if curating the stream revisions or IDs
  in a database), the `revision/1` and `id/1` functions may be used to return
  the proper metadata for standard subscriptions and persistent subscriptions,
  respectively.

  ## Examples

      iex> Spear.stream!(conn, "es_supported_clients") |> Enum.take(1)
      [
        %Spear.Event{
          body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
          id: "1fc908c1-af32-4d06-a9bd-3bf86a833fdf",
          metadata: %{
            commit_position: 18446744073709551615,
            content_type: "application/json",
            created: ~U[2021-04-01 21:11:38.196799Z],
            custom_metadata: "",
            prepare_position: 18446744073709551615,
            stream_name: "es_supported_clients",
            stream_revision: 0
          },
          type: "grpc-client",
          link: nil
        }
      ]
      iex> Spear.Event.new("grpc-client", %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
      %Spear.Event{
        body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
        id: "b952575a-1014-404d-ba20-f0904df7954e",
        metadata: %{content_type: "application/json", custom_metadata: ""},
        type: "grpc-client",
        link: nil
      }
  """
  @typedoc since: "0.1.0"
  @type t :: %__MODULE__{
          id: String.t(),
          type: String.t(),
          body: term(),
          link: t() | nil,
          metadata: map()
        }

  @doc """
  Creates an event struct

  This function does not append the event to a stream on its own, but can
  provide events to `Spear.append/4` which will append events to a stream.

  `type` is any string used to declare how the event is typed. This is very
  arbitrary and may coincide with struct names or may be hard-coded per event.

  ## Options

  * `:id` - (default: `Spear.Event.uuid_v4()`) the event's ID. See the section
    on event IDs below.
  * `:content_type` - (default: `"application/json"`) the encoding used to
    turn the event's body into binary data. If the content-type is
    `"application/json"`, the EventStoreDB and Spear (in
    `Spear.Event.from_read_response/2`)
  * `:custom_metadata` - (default: `""`) an event field outside the body
    meant as a bag for storing custom attributes about an event. Usage of this
    field is not obligatory: leaving it blank is perfectly normal.

  ## Event IDs

  EventStoreDB uses event IDs to provide an idempotency feature. Any event
  written to the EventStoreDB with an already existing ID will be not be
  duplicated.

  ```elixir
  iex> event = Spear.Event.new("grpc-client", %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"})
  %Spear.Event{
    body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
    id: "1e654b2a-ff04-4af8-887f-052442edcd83",
    metadata: %{content_type: "application/json", custom_metadata: ""},
    type: "grpc-client"
  }
  iex> [event] |> Spear.append(conn, "idempotency_test")
  :ok
  iex> [event] |> Spear.append(conn, "idempotency_test")
  :ok
  iex> Spear.stream!(conn, "idempotency_test") |> Enum.to_list()
  [
    %Spear.Event{
      body: %{"languages" => ["typescript", "javascript"], "runtime" => "NodeJS"},
      id: "1e654b2a-ff04-4af8-887f-052442edcd83",
      metadata: %{
        commit_position: 18446744073709551615,
        content_type: "application/json",
        created: ~U[2021-04-07 21:53:40.395681Z],
        custom_metadata: "",
        prepare_position: 18446744073709551615,
        stream_name: "idempotency_test",
        stream_revision: 0
      },
      type: "grpc-client"
    }
  ]
  ```

  ## Event Store DB Event Metadata

  All names starting with $ are reserved space for internal use, the most commonly used are the following:

  - `$correlationId`:	The application level correlation ID associated with this message.
  - `$causationId`:	The application level causation ID associated with this message.

  In order to use these fields, you must pass them as custom metadata:

  ```elixir
  iex> custom_metadata =  Jason.encode!(%{"$correlationId" => "...", "$causationId" => "..."})
  ...> Spear.Event.new("my_event", %{"id" => 1}, custom_metadata: custom_metadata)
  %Spear.Event{
    id: "d77c1abc-0200-4804-81cd-eca726911166",
    type: "my_event",
    body: %{"id" => 1},
    link: nil,
    metadata: %{
      content_type: "application/json",
      custom_metadata: "{\"$causationId\":\"...\",\"$correlationId\":\"...\"}"
    }
  }
  ```

  > #### Custom Metadata Format {: .tip}
  >
  > In order to leverage the EventStoreDB System Projections such as `$by_correlation_id` or JS Projections; you must
  > pass the custom metadata as JSON.

  ## Examples

      File.stream!("data.csv")
      |> MyCsvParser.parse_stream()
      |> Stream.map(fn [id, type, amount] ->
        Spear.Event.new("ChargeDeclared",
          %{id: id, type: type, amount: amount}
        )
      end)
      |> Spear.append(conn, "ChargesFromCsvs", batch_size: 20)
  """
  @doc since: "0.1.0"
  @spec new(String.t(), term(), Keyword.t()) :: t()
  def new(type, body, opts \\ []) when is_binary(type) do
    %__MODULE__{
      body: body,
      type: type,
      id: Keyword.get(opts, :id, uuid_v4()),
      metadata: %{
        content_type: Keyword.get(opts, :content_type, "application/json"),
        custom_metadata: Keyword.get(opts, :custom_metadata, <<>>)
      }
    }
  end

  @doc """
  Converts a `Spear.Event` into an append-request record which proposes a new
  message

  Note that each event must be individually structured as an `AppendReq`
  record in order to be written to an EventStoreDB. The RPC definition for
  writing events specifies a stream input, though, so all `AppendReq` events
  passed to `Spear.append/4` will be batched into a single write operation.
  This write operation appears to be transactional: any events in a single
  call to `Spear.append/4` will only be appended if all events can be appended.

  ```protobuf
  rpc Append (stream AppendReq) returns (AppendResp);
  ```

  These messages are serialized to wire data before being sent to the
  EventStoreDB when using `Spear.append/4` to write events via protobuf encoding.

  `encoder_mapping` is a mapping of content-types to 1-arity encode functions.
  The default is

  ```elixir
  %{"application/json" => &Jason.encode!/1}
  ```

  The `t:Spear.Event.t/0`'s `.metadata.content_type` value will be searched
  in this map. If an encoder is found for that content-type, the event body
  will be encoded with the encoding function. If no encoder is found, the
  event body will be passed as-is.

  To set up an encoder for something like Erlang term format, an encoding
  map like the following could be used

  ```elixir
  %{"application/vnd.erlang-term-format" => &:erlang.term_to_binary/1}
  ```

  In order to disable JSON encoding, pass an empty map `%{}` as the
  `encoder_mapping`

  ## Examples

      iex> events
      [%Spear.Event{}, %Spear.Event{}, ..]
      iex> events |> Enum.map(&Spear.Event.to_proposed_message/1)
      [{:"event_store.client.streams.AppendReq", ..}, ..]
  """
  @doc since: "0.1.0"
  @spec to_proposed_message(t(), encoder_mapping :: %{}, type :: :append | :batch_append) ::
          tuple()
  def to_proposed_message(
        event,
        encoder_mapping \\ %{"application/json" => &Jason.encode!/1},
        # coveralls-ignore-start
        type \\ :append
        # coveralls-ignore-stop
      )

  def to_proposed_message(event, encoder_mapping, :append) do
    encoder = Map.get(encoder_mapping, event.metadata.content_type, & &1)

    Streams.append_req(
      content:
        {:proposed_message,
         Streams.append_req_proposed_message(
           custom_metadata: event.metadata.custom_metadata,
           data: encoder.(event.body),
           id: Shared.uuid(value: {:string, event.id}),
           metadata: %{"content-type" => event.metadata.content_type, "type" => event.type}
         )}
    )
  end

  def to_proposed_message(event, encoder_mapping, :batch_append) do
    encoder = Map.get(encoder_mapping, event.metadata.content_type, & &1)

    Streams.batch_append_req_proposed_message(
      custom_metadata: event.metadata.custom_metadata,
      data: encoder.(event.body),
      id: Shared.uuid(value: {:string, event.id}),
      metadata: %{"content-type" => event.metadata.content_type, "type" => event.type}
    )
  end

  @doc """
  Converts a read-response message to a `Spear.Event`

  This function is applied by `Stream.map/2` onto streams returned by
  reading operations such as `Spear.stream!/3`, `Spear.read_stream/3`, etc.
  by default. This can be turned off by passing the `raw?: true` opt to
  a reading function.

  This function follows links. For example, if an read event belongs to a
  projected stream such as an event type stream, this function will give the
  event body of the source event, not the link. Forcing the return of the
  link body can be accomplished with the `:link?` option set to `true` (it is
  `false` by default).

  ## Options

  * `:link?` - (default: `false`) forces returning the body of the link event
    for events read from projected streams. Has no effect on events from non-
    projected streams.
  * `:json_decoder` - (default: `Jason.decode!/2`) a 2-arity function to use
    for events with a `"content-type"` of `"application/json"`.

  All remaining options passed as `opts` other than `:link?` and
  `:json_decoder` are passed to the second argument of the `:json_decoder`
  2-arity function.

  ## JSON decoding

  Event bodies are commonly written to the EventStoreDB in JSON format as the
  format is a human-readable and supported in nearly any language. Events
  carry a small piece of metadata in the `ReadResp.ReadEvent.RecordedEvent`'s
  `:metadata` map field which declares the content-type of the event body:
  `"content-type"`. This function will automatically attempt to decode any
  events which declare an `"application/json"` content-type as JSON using
  the `:json_decoder` 2-arity function option. Other content-types will not
  trigger any automatic behavior.

  `Spear` takes an optional dependency on the `Jason` library as it is
  currently the most popular JSON (en/de)coding library. If you add this
  project to the `deps/0` in a `mix.exs` file and wish to take advantage of
  the automatic JSON decoding functionality, you may also need to include
  `:jason`. As an optional dependency, `:jason` is not included in your
  dependencies just by dependending on `:spear`.

  ```elixir
  # mix.exs
  def deps do
    [
      {:spear, ">= 0.0.0"},
      {:jason, ">= 0.0.0"},
      ..
    ]
  end
  ```

  Other JSON (en/de)coding libraries may be swapped in, such as with `Poison`

  ```elixir
  iex> Spear.stream!(conn, "es_supported_clients", raw?: true)
  ...> |> Stream.map(&Spear.Event.from_read_response(&1, json_decoder: &Poison.decode!/2, keys: :atoms))
  ```

  ## Examples

      Spear.stream!(conn, "es_supported_clients", raw?: true)
      |> Stream.map(&Spear.Event.from_read_response/1)
      |> Enum.to_list()
      # => [%Spear.Event{}, %Spear.Event{}, ..]
  """
  @doc since: "0.1.0"
  @spec from_read_response(tuple(), Keyword.t()) :: t()
  def from_read_response(read_response, opts \\ [])

  def from_read_response({_type, {:event, _event}} = read_response, opts) do
    {force_follow_link?, remaining_opts} = Keyword.pop(opts, :link?, false)

    read_response
    |> destructure_read_response(force_follow_link?)
    |> record_to_map()
    |> from_recorded_event(remaining_opts)
  end

  @doc """
  Converts an event into a checkpoint

  This is useful when storing stream positions in `Spear.subscribe/4`
  subscriptions to the `:all` stream.
  """
  @doc since: "0.1.0"
  @spec to_checkpoint(t()) :: Spear.Filter.Checkpoint.t()
  # coveralls-ignore-start
  def to_checkpoint(%__MODULE__{metadata: metadata}) do
    struct(
      Spear.Filter.Checkpoint,
      Map.take(metadata, ~w[commit_position prepare_position subscription]a)
    )
  end

  # coveralls-ignore-stop

  @doc false
  def from_recorded_event(
        %{
          custom_metadata: custom_metadata,
          commit_position: commit_position,
          id: Shared.uuid() = uuid,
          data: body,
          metadata: metadata,
          prepare_position: prepare_position,
          stream_identifier: Shared.stream_identifier(stream_name: stream_name),
          stream_revision: stream_revision
        },
        opts
      ) do
    # metadata comes in as [{k, v}, ..]
    metadata = Map.new(metadata)
    content_type = Map.get(metadata, "content-type")
    {decoder, remaining_opts} = Keyword.pop(opts, :json_decoder, &Jason.decode!/2)

    maybe_decoded_body =
      if content_type == "application/json" do
        decoder.(body, remaining_opts)
      else
        body
      end

    %__MODULE__{
      id: Spear.Uuid.from_proto(uuid),
      type: Map.get(metadata, "type"),
      body: maybe_decoded_body,
      metadata:
        %{
          content_type: content_type,
          created: Map.get(metadata, "created") |> parse_created_stamp(),
          prepare_position: prepare_position,
          commit_position: commit_position,
          custom_metadata: custom_metadata,
          stream_name: stream_name,
          stream_revision: stream_revision
        }
        |> Map.merge(opts[:metadata] || %{})
    }
  end

  def from_recorded_event({event, link}, opts) do
    resolved_event = from_recorded_event(event, opts)
    link_event = from_recorded_event(link, opts)

    put_in(resolved_event.link, link_event)
  end

  defp destructure_read_response(
         Streams.read_resp(
           content:
             {:event,
              Streams.read_resp_read_event(
                link: :undefined,
                event: Streams.read_resp_read_event_recorded_event() = event
              )}
         ),
         _link?
       ) do
    event
  end

  defp destructure_read_response(
         Persistent.read_resp(
           content:
             {:event,
              Persistent.read_resp_read_event(
                link: :undefined,
                event: Persistent.read_resp_read_event_recorded_event() = event
              )}
         ),
         _link?
       ) do
    event
  end

  defp destructure_read_response(
         Streams.read_resp(
           content:
             {:event,
              Streams.read_resp_read_event(
                event: :undefined,
                link: Streams.read_resp_read_event_recorded_event() = event
              )}
         ),
         _link?
       ) do
    event
  end

  # coveralls-ignore-start
  defp destructure_read_response(
         Persistent.read_resp(
           content:
             {:event,
              Persistent.read_resp_read_event(
                event: :undefined,
                link: Persistent.read_resp_read_event_recorded_event() = event
              )}
         ),
         _link?
       ) do
    event
  end

  # coveralls-ignore-stop

  defp destructure_read_response(
         Streams.read_resp(
           content:
             {:event,
              Streams.read_resp_read_event(
                link: Streams.read_resp_read_event_recorded_event() = event
              )}
         ),
         true = _link?
       ) do
    event
  end

  # coveralls-ignore-start
  defp destructure_read_response(
         Persistent.read_resp(
           content:
             {:event,
              Persistent.read_resp_read_event(
                link: Persistent.read_resp_read_event_recorded_event() = event
              )}
         ),
         true = _link?
       ) do
    event
  end

  # coveralls-ignore-stop

  defp destructure_read_response(
         Streams.read_resp(
           content:
             {:event,
              Streams.read_resp_read_event(
                event: Streams.read_resp_read_event_recorded_event() = event,
                link: Streams.read_resp_read_event_recorded_event() = link
              )}
         ),
         false = _link?
       ) do
    {event, link}
  end

  # coveralls-ignore-start
  defp destructure_read_response(
         Persistent.read_resp(
           content:
             {:event,
              Persistent.read_resp_read_event(
                event: Persistent.read_resp_read_event_recorded_event() = event,
                link: Persistent.read_resp_read_event_recorded_event() = link
              )}
         ),
         false = _link?
       ) do
    {event, link}
  end

  # coveralls-ignore-stop

  defp record_to_map({event, link}) do
    {record_to_map(event), record_to_map(link)}
  end

  defp record_to_map(Streams.read_resp_read_event_recorded_event() = event) do
    Streams.read_resp_read_event_recorded_event(event) |> Map.new()
  end

  defp record_to_map(Persistent.read_resp_read_event_recorded_event() = event) do
    Persistent.read_resp_read_event_recorded_event(event) |> Map.new()
  end

  defp parse_created_stamp(nil), do: nil

  defp parse_created_stamp(stamp) when is_binary(stamp) do
    with {ticks_since_epoch, ""} <- Integer.parse(stamp),
         {:ok, datetime} <- Spear.parse_stamp(ticks_since_epoch) do
      datetime
    else
      _ -> nil
    end
  end

  # a note about this UUID.v4 generator
  #
  # there are a few Elixir UUID generator projects with the most popular
  # (according to hex.pm being zyro/elixir-uuid (permalink https://github.com/zyro/elixir-uuid/tree/346581c7e89872e0e263e10f079e566cf1fc3a68))
  # which at the time of authoring this library appears to be abandoned
  #
  # that project's current implementation of UUID appears to have been
  # optimized in 2019 with help from `@whatyouhide` (https://github.com/zyro/elixir-uuid/commit/9c3fcd2e3090970fa209750cb2f6e102736f5fed)
  #
  # and that optimization appears to be related to a change to Ecto.UUID
  # by `@michalmuskala` in 2016 (https://github.com/elixir-ecto/ecto/commit/158854e588756092f4cac26fa771e219c95dba06#diff-ea16139510c45e1d438b33818ca96742e2f6ba875ed178d807de0436dc920ca9)
  #
  # it's not exactly clear to me how to license this as it appears duplicated
  # across `ecto` and `zyro/elixir-uuid`, with the origin of the material being
  # `ecto`
  #
  # I'll say "this code is not novel and probably belongs to Ecto" but I've
  # heard that laywers generally don't like the term "probably."
  #
  # If a reader has advice on how to properly attribute this, please open an
  # issue :)

  @doc """
  Produces a random UUID v4 in human-readable format

  ## Examples

      iex> Spear.Event.uuid_v4
      "98d3a5e2-ceb4-4a78-8084-97edf9452823"
      iex> Spear.Event.uuid_v4
      "2629ea4b-d165-45c9-8a2f-92b5e20b894e"
  """
  @doc since: "0.1.0"
  @spec uuid_v4() :: binary()
  defdelegate uuid_v4(), to: Spear.Uuid

  @doc """
  Produces a consistent UUID v4 in human-readable format given any input
  data structure

  This function can be used to generate a consistent UUID for a data structure
  of any shape. Under the hood it uses `:erlang.phash2/1` to hash the data
  structure, which should be portable across many environments.

  This function can be taken advantage of to generate consistent event
  IDs for the sake of idempotency (see the Event ID section in `new/3`
  for more information). Pass the `:id` option to `new/3` to override the
  default random UUID generation.

  Note that it this implementation is naive and not easily portable across
  programming languages because of the reliance on `:erlang.phash2/1`.
  A v5 UUID can be used instead to the same effect with more portability,
  however a v5 UUID generator is not included in this library.

  ## Examples

      iex> Spear.Event.uuid_v4 %{"foo" => "bar"}
      "33323639-3934-4339-b332-363939343339"
      iex> Spear.Event.uuid_v4 %{"foo" => "bar"}
      "33323639-3934-4339-b332-363939343339"
  """
  @doc since: "0.1.0"
  defdelegate uuid_v4(term), to: Spear.Uuid

  @doc """
  Returns the revision of the event, following the event's link if provided

  ## Examples

      iex> Spear.Event.revision(%Spear.Event{link: nil, metadata: %{stream_revision: 1, ..}, ..})
      1
      iex> Spear.Event.revision(
      ...>   %Spear.Event{
      ...>     link: %Spear.Event{metadata: %{stream_revision: 1, ..}, ..},
      ...>     metadata: %{stream_revision: 0, ..},
      ...>     ..
      ...>   }
      ...> )
      1
  """
  @doc since: "0.9.0"
  @spec revision(t()) :: non_neg_integer()
  def revision(%__MODULE__{link: %__MODULE__{} = link}), do: revision(link)
  def revision(%__MODULE__{metadata: %{stream_revision: revision}}), do: revision

  @doc """
  Returns the ID of the event, following the event's link if provided

  ## Examples

      iex> Spear.Event.id(%Spear.Event{link: nil, id: "817cf20b-6791-4979-afdd-da4b03e02007", ..)
      "817cf20b-6791-4979-afdd-da4b03e02007"
      iex> Spear.Event.id(
      ...>   %Spear.Event{
      ...>     link: %Spear.Event{id: "976601b0-3775-442e-b98c-5f56af809402", ..},
      ...>     id: "817cf20b-6791-4979-afdd-da4b03e02007",
      ...>     ..
      ...>   }
      ...> )
      "976601b0-3775-442e-b98c-5f56af809402"
  """
  @doc since: "0.9.0"
  @spec id(t()) :: String.t()
  def id(%__MODULE__{link: %__MODULE__{} = link}), do: id(link)
  def id(%__MODULE__{id: id}), do: id

  @doc false
  def event?(Streams.read_resp(content: {:event, _event})), do: true
  def event?(%__MODULE__{}), do: true
  def event?(_), do: false
end