lib/smart_city/data.ex

defmodule SmartCity.Data do
  @moduledoc """
  Message struct shared amongst all SmartCity microservices.

  ```javascript
  const DataMessage = {
      "dataset_ids": "",        // list(UUID)
      "ingestion_id":"",       // UUID
      "extraction_start_time": "", // iso8601
      "payload": {},
      "_metadata": {           // cannot be used safely
          "orgName": "",       // ~r/^[a-zA-Z_]+$/
          "dataName": "",      // ~r/^[a-zA-Z_]+$/
          "stream": true
      },
      "operational": {
          "timing": [{
              "startTime": "", // iso8601
              "endTime": "",   // iso8601
              "app": "",       // microservice generating timing data
              "label": ""      // label for this particular timing data
          }]
      }
  }
  ```
  """

  alias SmartCity.Data
  alias SmartCity.Data.Timing
  alias SmartCity.Helpers

  @type t :: %SmartCity.Data{
          :dataset_ids => list(String.t()),
          :ingestion_id => String.t(),
          :extraction_start_time => DateTime.t(),
          :operational => %{
            :timing => list(SmartCity.Data.Timing.t())
          },
          :payload => String.t(),
          :_metadata => %{
            :org => String.t(),
            :name => String.t(),
            :stream => boolean()
          },
          :version => String.t()
        }
  @type payload :: String.t()

  @derive Jason.Encoder
  @enforce_keys [:dataset_ids, :ingestion_id, :extraction_start_time, :payload, :_metadata, :operational]
  defstruct version: "0.1",
            _metadata: %{org: nil, name: nil, stream: false},
            dataset_ids: [],
            ingestion_id: nil,
            extraction_start_time: nil,
            payload: nil,
            operational: %{timing: []}

  @doc """
  Returns a new `SmartCity.Data` struct. `SmartCity.Data.Timing`
    structs will be created along the way.

  Can be created from:
    - map with string keys
    - map with atom keys
    - JSON

  ## Examples

      iex> SmartCity.Data.new(%{dataset_ids: ["a_guid"], ingestion_id: "b_guid", extraction_start_time: "2019-05-06T19:51:41+00:00", payload: "the_data", _metadata: %{org: "scos", name: "example"}, operational: %{timing: [%{app: "app name", label: "function name", start_time: "2019-05-06T19:51:41+00:00", end_time: "2019-05-06T19:51:51+00:00"}]}})
      {:ok, %SmartCity.Data{
          dataset_ids: ["a_guid"],
          ingestion_id: "b_guid",
          extraction_start_time: "2019-05-06T19:51:41+00:00",
          payload: "the_data",
          _metadata: %{org: "scos", name: "example"},
          operational: %{
          timing: [%SmartCity.Data.Timing{ app: "app name", end_time: "2019-05-06T19:51:51+00:00", label: "function name", start_time: "2019-05-06T19:51:41+00:00"}]
        }
      }}
  """
  @spec new(map() | String.t()) :: {:ok, SmartCity.Data.t()} | {:error, String.t()}
  def new(msg) when is_binary(msg) do
    with {:ok, decoded} <- Jason.decode(msg) do
      new(decoded)
    end
  end

  def new(%{"dataset_ids" => _} = msg) do
    %{
      dataset_ids: msg["dataset_ids"],
      ingestion_id: msg["ingestion_id"],
      extraction_start_time: msg["extraction_start_time"],
      operational: Helpers.to_atom_keys(msg["operational"]),
      payload: msg["payload"],
      _metadata: Helpers.to_atom_keys(msg["_metadata"])
    }
    |> new()
  end

  def new(%{
        dataset_ids: dataset_ids,
        ingestion_id: ingestion_id,
        extraction_start_time: extraction_start_time,
        operational: operational,
        payload: payload,
        _metadata: metadata
      }) do
    timings = Map.get(operational, :timing, [])

    struct =
      struct(__MODULE__, %{
        dataset_ids: dataset_ids,
        ingestion_id: ingestion_id,
        extraction_start_time: extraction_start_time,
        payload: payload,
        _metadata: metadata,
        operational: %{operational | timing: Enum.map(timings, &Timing.new/1)}
      })

    {:ok, struct}
  rescue
    e -> {:error, e}
  end

  def new(msg) do
    {:error, "Invalid data message: #{inspect(msg)}"}
  end

  @doc """
  Defines the string that will be the payload of the last message in a dataset.
  """
  defmacro end_of_data(), do: quote(do: "END_OF_DATA")

  @doc """
  Encodes `SmartCity.Data` into JSON. Typically used right before sending as a Kafka message.
  """
  @spec encode(SmartCity.Data.t()) ::
          {:ok, String.t()} | {:error, Jason.EncodeError.t() | Exception.t()}
  def encode(%__MODULE__{} = message) do
    Jason.encode(message)
  end

  @doc """
  Encodes `SmartCity.Data` into JSON. Typically used right before sending as a Kafka message.

  Raises an error if it fails to convert to a JSON string.
  """
  @spec encode!(SmartCity.Data.t()) :: String.t()
  def encode!(%__MODULE__{} = message) do
    Jason.encode!(message)
  end

  @doc """
  Adds a `SmartCity.Data.Timing` to the list of timings in this `SmartCity.Data`. The timing will be validated to ensure both start and end times have been set.

  Returns a `SmartCity.Data` struct with `new_timing` prepended to existing timings list.

  ## Parameters
    - message: A `SmartCity.Data`
    - new_timing: A timing you want to add. Must have `start_time` and `end_time` set
  """
  @spec add_timing(
          SmartCity.Data.t(),
          SmartCity.Data.Timing.t()
        ) :: SmartCity.Data.t()
  def add_timing(
        %__MODULE__{operational: %{timing: timing}} = message,
        %Data.Timing{} = new_timing
      ) do
    case Timing.validate(new_timing) do
      {:ok, new_timing} -> put_in_operational(message, :timing, [new_timing | timing])
      {:error, errors} -> raise ArgumentError, "Invalid Timing: #{errors}"
    end
  end

  @doc """
  Creates a new `SmartCity.Data` struct using `new/1` and adds timing information to the message.

  Returns a `SmartCity.Data` struct with `new_timing` prepended to existing timings list.

  ## Parameters
    - message: A `SmartCity.Data`
    - app: The application that is asking to create the new `SmartCity.Data`. Ex. `reaper` or `voltron`
  """
  @spec timed_new(map(), String.t()) :: {:ok, SmartCity.Data.t()} | {:error, String.t()}
  def timed_new(msg, app) do
    label = inspect(&Data.new/1)

    case Timing.measure(app, label, fn -> new(msg) end) do
      {:ok, msg, timing} -> {:ok, msg |> add_timing(timing)}
      error -> error
    end
  end

  @doc """
  Transforms the `SmartCity.Data` `payload` field with the given unary function and replaces it in the message.

  Additionally, returns a `SmartCity.Data` struct with `new_timing` prepended to existing timings list.

  ## Parameters
    - message: A `SmartCity.Data`
    - app: The application that is asking to create the new `SmartCity.Data`. Ex. `reaper` or `voltron`
    - function: an arity 1 (/1) function that will transform the payload in the provided message
  """
  @spec timed_transform(
          SmartCity.Data.t(),
          String.t(),
          (payload() -> {:ok, term()} | {:error, term()})
        ) :: {:ok, SmartCity.Data.t()} | {:error, String.t()}
  def timed_transform(%Data{} = msg, app, function) when is_function(function, 1) do
    label = inspect(function)

    case Timing.measure(app, label, fn -> function.(msg.payload) end) do
      {:ok, result, timing} -> {:ok, msg |> add_timing(timing) |> Map.replace!(:payload, result)}
      error -> error
    end
  end

  @doc """
  Get all timings on this Data

  Returns a list of  `SmartCity.Data.Timing` structs or `[]`

  ## Parameters
    - data_message: The message to extract timings from
  """
  @spec get_all_timings(SmartCity.Data.t()) :: list(SmartCity.Data.Timing.t())
  def get_all_timings(%__MODULE__{operational: %{timing: timing}}), do: timing

  # Private functions
  defp put_in_operational(%__MODULE__{operational: operational} = message, key, value) do
    %{message | operational: Map.put(operational, key, value)}
  end
end