lib/smart_city/ingestion.ex

defmodule SmartCity.Ingestion do
  require Logger

  alias SmartCity.Helpers
  alias SmartCity.Ingestion.Transformation

  @moduledoc """
  Struct defining an ingestion update event.

  ```javascript
  const Ingestion = {
    "id": "",
    "name", "",
    "allow_duplicates": boolean,
    "cadence": "",
    "extractSteps": [],
    "schema": [],
    "targetDatasets": ["", ""],
    "sourceFormat": "",
    "topLevelSelector": "",
    "transformations": [],
  }
  ```
  """
  @type not_required(type) :: type | nil

  @type t :: %SmartCity.Ingestion{
          id: String.t(),
          name: String.t(),
          allow_duplicates: not_required(boolean()),
          cadence: not_required(String.t()),
          extractSteps: list(map()),
          schema: list(map()),
          sourceFormat: String.t(),
          targetDatasets: list(String.t()),
          topLevelSelector: not_required(String.t()),
          transformations: list(Transformation.t())
        }

  @derive Jason.Encoder
  defstruct id: nil,
            name: nil,
            allow_duplicates: true,
            cadence: "never",
            extractSteps: [],
            schema: [],
            targetDatasets: [],
            sourceFormat: nil,
            topLevelSelector: nil,
            transformations: []

  use Accessible

  @doc """
  Returns a new `SmartCity.Ingestion`.
  Can be created from `Map` with string or atom keys.
  Raises an `ArgumentError` when passed invalid input

  ## Parameters

  - msg: Map with string or atom keys that defines the ingestion metadata

  Required Keys:
      - targetDatasets
      - sourceFormat
      - name

  - cadence will default to "never"
  - allow_duplicates will default to true
  """
  @spec new(map()) :: SmartCity.Ingestion.t()
  def new(%{"targetDatasets" => _} = msg) do
    msg
    |> Helpers.to_atom_keys()
    |> new()
  end

  def new(
        %{
          id: _,
          name: _,
          targetDatasets: _,
          sourceFormat: type,
          schema: schema,
          extractSteps: extract_steps,
          transformations: transformations
        } = msg
      ) do
    msg
    |> Map.put(:schema, Helpers.to_atom_keys(schema))
    |> Map.put(:extractSteps, Helpers.to_atom_keys(extract_steps))
    |> Map.put(:transformations, Enum.map(transformations, &Transformation.new/1))
    |> Map.replace!(:sourceFormat, Helpers.mime_type(type))
    |> create()
  end

  def new(%{targetDataset: target_dataset} = msg) do
    Logger.error(
      "Legacy ingestion detected. targetDataset field was used instead of targetDatasets. This field is deprecated to be removed after June 2023."
    )

    msg
    |> Map.put(:targetDatasets, [target_dataset])
    |> Map.delete(:targetDataset)
    |> new()
  end

  def new(msg) do
    raise ArgumentError, "Invalid ingestion metadata: #{inspect(msg)}"
  end

  defp create(%__MODULE__{} = struct) do
    struct |> Map.from_struct() |> create()
  end

  defp create(map), do: struct(%__MODULE__{}, map)
end