lib/ash_csv/data_layer.ex

defmodule AshCsv.DataLayer do
  @behaviour Ash.DataLayer

  alias Ash.Actions.Sort

  @impl true
  def can?(_, :read), do: true
  def can?(_, :create), do: true
  def can?(_, :update), do: true
  def can?(_, :upsert), do: true
  def can?(_, :destroy), do: true
  def can?(_, :sort), do: true
  def can?(_, :filter), do: true
  def can?(_, :limit), do: true
  def can?(_, :offset), do: true
  def can?(_, :boolean_filter), do: true
  def can?(_, :transact), do: true
  def can?(_, {:filter_expr, _}), do: true
  def can?(_, :nested_expressions), do: true
  def can?(_, :expression_calculation_sort), do: true
  def can?(_, {:sort, _}), do: true
  def can?(_, _), do: false

  @csv %Spark.Dsl.Section{
    name: :csv,
    examples: [
      """
      csv do
        file "priv/data/tags.csv"
        create? true
        header? true
        separator '-'
        columns [:id, :name]
      end
      """
    ],
    schema: [
      file: [
        type: :string,
        doc: "The file to read the data from",
        required: true
      ],
      create?: [
        type: :boolean,
        doc:
          "Whether or not the file should be created if it does not exist (this will only happen on writes)",
        default: false
      ],
      header?: [
        type: :boolean,
        default: false,
        doc: "If the csv file has a header that should be skipped"
      ],
      separator: [
        type: {:custom, __MODULE__, :separator_opt, []},
        default: ?,,
        doc: "The separator to use, defaults to a comma. Pass in a character (not a string)."
      ],
      columns: [
        type: {:custom, __MODULE__, :columns_opt, []},
        doc: "The order that the attributes appear in the columns of the CSV"
      ]
    ]
  }

  @deprecated "See `AshCsv.DataLayer.Info.file/1"
  defdelegate file(resource), to: AshCsv.DataLayer.Info

  @deprecated "See `AshCsv.DataLayer.Info.columns/1"
  defdelegate columns(resource), to: AshCsv.DataLayer.Info

  @deprecated "See `AshCsv.DataLayer.Info.separator/1"
  defdelegate separator(resource), to: AshCsv.DataLayer.Info

  @deprecated "See `AshCsv.DataLayer.Info.header?/1"
  defdelegate header?(resource), to: AshCsv.DataLayer.Info

  @deprecated "See `AshCsv.DataLayer.Info.create?/1"
  defdelegate create?(resource), to: AshCsv.DataLayer.Info

  @impl true
  def limit(query, offset, _), do: {:ok, %{query | limit: offset}}

  @impl true
  def offset(query, offset, _), do: {:ok, %{query | offset: offset}}

  @impl true
  def filter(query, filter, _resource) do
    {:ok, %{query | filter: filter}}
  end

  @impl true
  def sort(query, sort, _resource) do
    {:ok, %{query | sort: sort}}
  end

  @doc false
  def columns_opt(columns) do
    if Enum.all?(columns, &is_atom/1) do
      {:ok, columns}
    else
      {:error, "Expected all columns to be atoms"}
    end
  end

  @doc false
  def separator_opt(val) when is_integer(val) do
    {:ok, val}
  end

  def separator_opt(val) do
    {:error, "Expected a character for separator, got #{val}"}
  end

  @sections [@csv]

  @moduledoc """
  The data layer implementation for AshCsv
  """
  use Spark.Dsl.Extension, sections: @sections

  defmodule Query do
    @moduledoc false
    defstruct [:resource, :sort, :filter, :limit, :offset, :api]
  end

  @impl true
  def run_query(query, resource) do
    case read_file(resource) do
      {:ok, results} ->
        offset_records =
          results
          |> filter_matches(query.filter, query.api)
          |> Sort.runtime_sort(query.sort, api: query.api)
          |> Enum.drop(query.offset || 0)

        if query.limit do
          {:ok, Enum.take(offset_records, query.limit)}
        else
          {:ok, offset_records}
        end

      {:error, error} ->
        {:error, error}
    end
  rescue
    e in File.Error ->
      if create?(resource) do
        {:ok, []}
      else
        {:error, e}
      end
  end

  @impl true
  def create(resource, changeset) do
    case run_query(%Query{resource: resource}, resource) do
      {:ok, records} ->
        create_from_records(records, resource, changeset, false)

      {:error, error} ->
        {:error, error}
    end
  end

  @impl true
  def upsert(resource, changeset, keys) do
    pkey = Ash.Resource.Info.primary_key(resource)
    keys = keys || pkey

    if Enum.any?(keys, &is_nil(Ash.Changeset.get_attribute(changeset, &1))) do
      create(resource, changeset)
    else
      key_filters =
        Enum.map(keys, fn key ->
          {key,
           Ash.Changeset.get_attribute(changeset, key) || Map.get(changeset.params, key) ||
             Map.get(changeset.params, to_string(key))}
        end)

      query = Ash.Query.do_filter(resource, and: [key_filters])

      resource
      |> resource_to_query(changeset.api)
      |> Map.put(:filter, query.filter)
      |> Map.put(:tenant, changeset.tenant)
      |> run_query(resource)
      |> case do
        {:ok, []} ->
          create(resource, changeset)

        {:ok, [result]} ->
          to_set = Ash.Changeset.set_on_upsert(changeset, keys)

          changeset =
            changeset
            |> Map.put(:attributes, %{})
            |> Map.put(:data, result)
            |> Ash.Changeset.force_change_attributes(to_set)

          update(resource, changeset)

        {:ok, _} ->
          {:error, "Multiple records matching keys"}
      end
    end
  end

  @impl true
  def update(resource, changeset) do
    resource
    |> do_read_file()
    |> do_update(resource, changeset)
  end

  @impl true
  def destroy(resource, %{data: record}) do
    resource
    |> do_read_file()
    |> do_destroy(resource, record)
  end

  defp cast_stored(resource, keys) do
    Enum.reduce_while(keys, {:ok, resource.__struct__}, fn {key, value}, {:ok, record} ->
      with attribute when not is_nil(attribute) <- Ash.Resource.Info.attribute(resource, key),
           {:value, value} when not is_nil(value) <- {:value, stored_value(value, attribute)},
           {:ok, loaded} <- Ash.Type.cast_stored(attribute.type, value) do
        {:cont, {:ok, struct(record, [{key, loaded}])}}
      else
        {:value, nil} ->
          {:cont, {:ok, struct(record, [{key, nil}])}}

        nil ->
          {:halt, {:error, "#{key} is not an attribute"}}

        :error ->
          {:halt, {:error, "#{key} could not be loaded"}}
      end
    end)
  end

  defp stored_value(value, attribute) do
    if value == "" and Ash.Type.ecto_type(attribute.type) not in [:string, :uuid, :binary_id] do
      nil
    else
      value
    end
  end

  @impl true
  def resource_to_query(resource, api) do
    %Query{resource: resource, api: api}
  end

  @impl true
  def transaction(resource, fun, _timeout, _) do
    file = file(resource)

    :global.trans(
      {{:csv, file}, System.unique_integer()},
      fn ->
        try do
          Process.put({:csv_in_transaction, file(resource)}, true)
          {:res, fun.()}
        catch
          {{:csv_rollback, ^file}, value} ->
            {:error, value}
        end
      end,
      [node() | :erlang.nodes()],
      0
    )
    |> case do
      {:res, result} -> {:ok, result}
      {:error, error} -> {:error, error}
      :aborted -> {:error, "transaction failed"}
    end
  end

  @impl true
  def rollback(resource, error) do
    throw({{:csv_rollback, file(resource)}, error})
  end

  @impl true
  def in_transaction?(resource) do
    Process.get({:csv_in_transaction, file(resource)}, false) == true
  end

  def filter_matches(records, nil, _api), do: records

  def filter_matches(records, filter, api) do
    {:ok, records} = Ash.Filter.Runtime.filter_matches(api, records, filter)
    records
  end

  # sobelow_skip ["Traversal.FileModule"]
  defp do_destroy({:ok, results}, resource, record) do
    columns = columns(resource)

    pkey = Ash.Resource.Info.primary_key(resource)

    changeset_pkey = Map.take(record, pkey)

    results
    |> Enum.reduce_while({:ok, []}, fn result, {:ok, results} ->
      key_vals =
        columns
        |> Enum.zip(result)
        |> Enum.reject(fn {key, _value} ->
          key == :_
        end)

      cast(resource, key_vals, pkey, changeset_pkey, result, results)
    end)
    |> case do
      {:ok, rows} ->
        lines =
          rows
          |> CSV.encode(separator: separator(resource))
          |> Enum.to_list()

        lines =
          if header?(resource) do
            [header(resource) | lines]
          else
            lines
          end

        resource
        |> file()
        |> File.write(lines, [:write])
        |> case do
          :ok ->
            :ok

          {:error, error} ->
            {:error, "Error while writing to CSV: #{inspect(error)}"}
        end
    end
  end

  defp do_destroy({:error, error}, _, _), do: {:error, error}

  defp cast(resource, key_vals, pkey, changeset_pkey, result, results) do
    case cast_stored(resource, key_vals) do
      {:ok, casted} ->
        if Map.take(casted, pkey) == changeset_pkey do
          {:cont, {:ok, results}}
        else
          {:cont, {:ok, [result | results]}}
        end

      {:error, error} ->
        {:halt, {:error, error}}
    end
  end

  defp do_update({:error, error}, _, _) do
    {:error, error}
  end

  # sobelow_skip ["Traversal.FileModule"]
  defp do_update({:ok, results}, resource, changeset) do
    columns = columns(resource)

    pkey = Ash.Resource.Info.primary_key(resource)

    changeset_pkey =
      Enum.into(pkey, %{}, fn key ->
        {key, Ash.Changeset.get_attribute(changeset, key)}
      end)

    results
    |> Enum.reduce_while({:ok, []}, fn result, {:ok, results} ->
      key_vals =
        columns
        |> Enum.zip(result)
        |> Enum.reject(fn {key, _value} ->
          key == :_
        end)

      dump(resource, changeset, results, result, key_vals, pkey, changeset_pkey)
    end)
    |> case do
      {:ok, rows} ->
        lines =
          rows
          |> CSV.encode(separator: separator(resource))
          |> Enum.to_list()

        if File.exists?(file(resource)) do
          :ok
        else
          if create?(resource) do
            File.mkdir_p!(Path.dirname(file(resource)))
            File.write!(file(resource), header(resource))
            :ok
          else
            {:error, "Error while writing to CSV: #{inspect(:enoent)}"}
          end
        end

        lines =
          if header?(resource) do
            [header(resource) | lines]
          else
            lines
          end

        resource
        |> file()
        |> File.write(lines, [:write])
        |> case do
          :ok ->
            {:ok, struct(changeset.data, changeset.attributes)}

          {:error, error} ->
            {:error, "Error while writing to CSV: #{inspect(error)}"}
        end
    end
  end

  defp dump(resource, changeset, results, result, key_vals, pkey, changeset_pkey) do
    case cast_stored(resource, key_vals) do
      {:ok, casted} ->
        if Map.take(casted, pkey) == changeset_pkey do
          dump_row(resource, changeset, results)
        else
          {:cont, {:ok, [result | results]}}
        end

      {:error, error} ->
        {:halt, {:error, error}}
    end
  end

  defp dump_row(resource, changeset, results) do
    Enum.reduce_while(Enum.reverse(columns(resource)), {:ok, []}, fn key, {:ok, row} ->
      value = Ash.Changeset.get_attribute(changeset, key)

      {:cont, {:ok, [to_string(value) | row]}}
    end)
    |> case do
      {:ok, new_row} ->
        {:cont, {:ok, [new_row | results]}}

      {:error, error} ->
        {:halt, {:error, error}}
    end
  end

  defp read_file(resource) do
    columns = columns(resource)

    resource
    |> do_read_file()
    |> case do
      {:ok, results} ->
        do_cast_stored(results, columns, resource)

      {:error, error} ->
        {:error, error}
    end
  end

  defp do_cast_stored(results, columns, resource) do
    results
    |> Enum.reduce_while({:ok, []}, fn result, {:ok, results} ->
      key_vals =
        columns
        |> Enum.zip(result)
        |> Enum.reject(fn {key, _value} ->
          key == :_
        end)

      case cast_stored(resource, key_vals) do
        {:ok, casted} -> {:cont, {:ok, [casted | results]}}
        {:error, error} -> {:halt, {:error, error}}
      end
    end)
  end

  # sobelow_skip ["Traversal.FileModule"]
  defp do_read_file(resource, retry? \\ false) do
    amount_to_drop =
      if header?(resource) do
        1
      else
        0
      end

    resource
    |> file()
    |> File.stream!()
    |> Stream.drop(amount_to_drop)
    |> CSV.decode(separator: separator(resource))
    |> Enum.reduce_while({:ok, []}, fn
      {:ok, result}, {:ok, results} ->
        {:cont, {:ok, [result | results]}}

      {:error, error}, _ ->
        {:halt, {:error, error}}
    end)
  rescue
    e in File.Error ->
      if e.reason == :enoent && !retry? do
        file = file(resource)
        File.mkdir_p!(Path.dirname(file))
        File.write!(file(resource), header(resource))
        do_read_file(resource, true)
      else
        reraise e, __STACKTRACE__
      end
  end

  # sobelow_skip ["Traversal.FileModule"]
  defp create_from_records(records, resource, changeset, retry?) do
    pkey = Ash.Resource.Info.primary_key(resource)
    pkey_value = Map.take(changeset.attributes, pkey)

    if Enum.find(records, fn record -> Map.take(record, pkey) == pkey_value end) do
      {:error, "Record is not unique"}
    else
      row =
        Enum.reduce_while(columns(resource), {:ok, []}, fn key, {:ok, row} ->
          value = Map.get(changeset.attributes, key)

          {:cont, {:ok, [to_string(value) | row]}}
        end)

      case row do
        {:ok, row} ->
          lines =
            [Enum.reverse(row)]
            |> CSV.encode(separator: separator(resource))
            |> Enum.to_list()

          result =
            if File.exists?(file(resource)) do
              :ok
            else
              if create?(resource) do
                File.mkdir_p!(Path.dirname(file(resource)))
                File.write!(file(resource), header(resource))
                :ok
              else
                {:error, "Error while writing to CSV: #{inspect(:enoent)}"}
              end
            end

          case result do
            {:error, error} ->
              {:error, error}

            :ok ->
              resource
              |> file()
              |> File.write(lines, [:append])
              |> case do
                :ok ->
                  {:ok, struct(resource, changeset.attributes)}

                {:error, :enoent} when retry? ->
                  {:error, "Error while writing to CSV: #{inspect(:enoent)}"}

                {:error, :enoent} ->
                  if create?(resource) do
                    create_from_records(records, resource, changeset, true)
                  else
                    {:error, "Error while writing to CSV: #{inspect(:enoent)}"}
                  end

                {:error, error} ->
                  {:error, "Error while writing to CSV: #{inspect(error)}"}
              end
          end

        {:error, error} ->
          {:error, error}
      end
    end
  end

  defp header(resource) do
    if header?(resource) do
      separator =
        case separator(resource) do
          sep when is_integer(sep) ->
            <<sep>>

          sep ->
            to_string(sep)
        end

      resource |> columns() |> Enum.join(separator) |> Kernel.<>("\n")
    else
      ""
    end
  end
end