defmodule Csv.Schema do
@moduledoc """
Csv schema is a library helping you to build Ecto schema-like files having as source a csv file.
The idea behind this library is give the possibility to create, at compile-time,
getters function for a CSV inside codebase.
APIs related to this macro are similar to Ecto.Schema; Eg.
defmodule Person do
use Csv.Schema, headers: true, separator: ?,
alias Csv.Schema.Parser
schema path: "path/to/person.csv" do
field :id, "id", key: true
field :first_name, "first_name", filter_by: true
field :last_name, "last_name", sort: :asc
field :identifier, ["first_name", "last_name"], key: true, join: " "
field :email, "email", unique: true
field :gender, "gender", filter_by: true, sort: :desc
field :ip_address, "ip_address"
field :date_of_birth, "date_of_birth", parser: &Parser.date!(&1, "{0M}/{0D}/{0YYYY}")
end
end
It is possible to define the schema with `data: ` param in order to directly use a string to geerate content
...
@content \"\"\"
id,first_name,last_name,email,gender,ip_address,date_of_birth
1,Ivory,Overstreet,ioverstreet0@businessweek.com,Female,30.138.91.62,10/22/2018
2,Ulick,Vasnev,uvasnev1@vkontakte.ru,Male,35.15.164.70,01/19/2018
3,Chloe,Freemantle,cfreemantle2@parallels.com,Female,133.133.113.255,08/13/2018
\"\"\"
schema data: @content do
...
end
At the end of compilation now your module is a Struct and has 3 kind of getters:
- `by_{key_field_name}` returns single records object or nil.
- `filter_by_{field_name}` returns list of records matching provided property.
- `get_all` returns all records
Back to the example in the module will be created:
- `__MODULE__.by_id/1` expecting integer as arg
- `__MODULE__.filter_by_name/1` expecting string as arg
- `__MODULE__.by_fiscal_code/1` expecting string as arg
- `__MODULE__.get_all/0`
Some example definitions could be found [here](https://github.com/primait/csv_schema/tree/master/examples)
"""
alias Csv.Schema
alias Csv.Schema.{Field, Parser}
@type name :: String.t() | atom
@type row :: map | list
@type order :: :asc | :desc
@doc """
- `separator` it's possible to set a separator argument to macro to let the macro split csv for you using provided separator.
- `header` if your csv file does not have `headers`, it's possible to set headers to false and configure fields by index (1..N)
"""
defmacro __using__(opts) do
quote do
import unquote(__MODULE__)
@type t :: %__MODULE__{}
@headers Keyword.get(unquote(opts), :headers, true)
@separator Keyword.get(unquote(opts), :separator, ?,)
end
end
@doc """
Schema macro helps you to build a block of fields. First parameter should be
the relative path to csv file in your project. Second parameter should be a `field` list
included in `do`-`end` block
"""
defmacro schema([data: string], do: block) do
quote do
to_stream = fn -> unquote(string) |> String.split("\n") |> Enum.reject(&(&1 == "")) |> Stream.map(& &1) end
Module.put_attribute(__MODULE__, :to_stream, to_stream)
unquote(__register__(block))
unquote(__explode__())
end
end
defmacro schema([path: file_path], do: block) do
quote do
Module.put_attribute(__MODULE__, :external_resource, unquote(file_path))
Module.put_attribute(__MODULE__, :to_stream, fn -> File.stream!(unquote(file_path)) end)
unquote(__register__(block))
unquote(__explode__())
end
end
@doc false
@spec __register__(non_neg_integer) :: {:__block__, [], list}
def __register__(block) do
quote do
Module.put_attribute(__MODULE__, :in_, true)
Module.register_attribute(__MODULE__, :fields, accumulate: true)
Module.register_attribute(__MODULE__, :struct_fields, accumulate: true)
Module.register_attribute(__MODULE__, :keys, accumulate: true)
unquote(block)
Module.delete_attribute(__MODULE__, :in_)
end
end
@doc false
@spec __explode__() :: {:__block__, [], list}
def __explode__ do
quote unquote: false do
defstruct Module.get_attribute(__MODULE__, :struct_fields)
fields = Module.get_attribute(__MODULE__, :fields)
headers? = Module.get_attribute(__MODULE__, :headers)
separator = Module.get_attribute(__MODULE__, :separator)
to_stream = Module.get_attribute(__MODULE__, :to_stream)
content = Parser.csv!(to_stream.(), headers?, separator)
num_of_rows = Enum.count(content)
#
## Validation
#
Schema.__validate__(content, fields, headers?)
#
## Destination module function generators
#
@spec __id__(non_neg_integer) :: t | nil
generators = %{
internal_id: fn id, changeset ->
def __id__(unquote(id)), do: struct!(__MODULE__, unquote(Macro.escape(changeset)))
end,
default_internal_id: fn ->
def __id__(_), do: nil
end,
by: fn translation_map, name ->
@spec unquote(:"by_#{name}")(any) :: t | nil
def unquote(:"by_#{name}")(value) do
unquote(Macro.escape(translation_map)) |> Map.get(value) |> __MODULE__.__id__()
end
end,
filter_by: fn translation_map, name ->
@spec unquote(:"filter_by_#{name}")(any) :: [t]
def unquote(:"filter_by_#{name}")(value) do
unquote(Macro.escape(translation_map)) |> Map.get(value, []) |> Enum.map(&__MODULE__.__id__/1)
end
end,
get_all: fn num_of_rows ->
@spec get_all :: %Stream{}
def get_all, do: Stream.map(1..unquote(num_of_rows), &__MODULE__.__id__/1)
@spec get_all(:materialized) :: list(t)
def get_all(:materialized), do: Enum.map(1..unquote(num_of_rows), &__MODULE__.__id__/1)
end
}
#
## Generation
#
Schema.__gen__(content, fields, generators)
#
## Cleanup
#
Module.delete_attribute(__MODULE__, :separator)
Module.delete_attribute(__MODULE__, :struct_fields)
Module.delete_attribute(__MODULE__, :fields)
Module.delete_attribute(__MODULE__, :headers)
Module.delete_attribute(__MODULE__, :to_stream)
end
end
@doc """
Configure a new field (csv column). Parameters are
- `name` - new struct field name
- `column` - header name or column index (if headers: false) in csv file
- `opts` - list of configuration values
- `:key` - boolean; at most one key must be set. It is something similar to a primary key
- `:unique` - boolean; creates a function `by_{name}` for you
- `:filter_by` - boolean; do i create a `filter_by_{name}` function for this field for you?
- `:parser` - function; parser function used to get_changeset data from string to custom type
- `:sort` - `:asc` or `:desc`; It sorts according to Erlang's term ordering with `nil` exception
- `:join` - string; if present it joins the given fields into a binary using the separator
"""
defmacro field(name, col, opts \\ []) do
quote do
if Module.get_attribute(__MODULE__, :in_, false) do
@fields Schema.__field__(__MODULE__, unquote(name), unquote(col), unquote(opts))
else
raise "Using 'field' macro outside 'schema' macro"
end
end
end
@doc false
@spec __field__(module, term, term, []) :: Field.t()
def __field__(module, name, column, options) do
name = Parser.atom!(name)
if module |> Module.get_attribute(:struct_fields) |> List.keyfind(name, 0) do
raise ArgumentError, "Field #{inspect(name)} already set in schema"
end
Module.put_attribute(module, :struct_fields, {name, nil})
Field.new(name, column, options)
end
#
## Entrypoint for function generation
#
@doc false
@spec __gen__(%Stream{}, list(Field.t()), %{atom => function}) :: :ok
def __gen__(csv_stream, fields, generators) do
changesets = get_changesets(csv_stream, fields)
gen_internal_id(changesets, Map.fetch!(generators, :internal_id), Map.fetch!(generators, :default_internal_id))
gen_by(changesets, fields, Map.fetch!(generators, :by))
gen_filter_by(changesets, fields, Map.fetch!(generators, :filter_by))
gen_get_all(changesets, Map.fetch!(generators, :get_all))
end
@spec gen_internal_id(list(map), function, function) :: :ok
defp gen_internal_id(changesets, internal_id, default_internal_id) do
Enum.each(changesets, &internal_id.(get_id(&1), Map.delete(&1, :__id__)))
default_internal_id.()
end
@spec gen_by(list(map), list(Field.t()), function) :: :ok
defp gen_by(changesets, fields, by) do
Enum.each(fields, fn
%Field{name: name, key: key, unique: unique} when key or unique ->
changesets
|> Enum.reduce(%{}, fn changeset, acc ->
case Map.get(changeset, name) do
nil -> acc
val -> Map.put(acc, val, Map.get(changeset, :__id__))
end
end)
|> by.(name)
_ ->
:ok
end)
end
@spec gen_filter_by(list(map), list(Field.t()), function) :: :ok
defp gen_filter_by(changesets, fields, filter_by) do
Enum.each(fields, fn
%Field{name: name, filter_by: true} ->
changesets
|> Enum.group_by(fn changeset -> Map.get(changeset, name) end)
|> Enum.reduce(%{}, fn {key, values}, acc -> Map.put(acc, key, Enum.map(values, &Map.get(&1, :__id__))) end)
|> filter_by.(name)
_ ->
:ok
end)
end
@spec gen_get_all(list(map), function) :: :ok
defp gen_get_all(changesets, get_all), do: changesets |> Enum.count() |> get_all.()
#
## Changeset
#
@spec get_changesets(%Stream{}, list(Field.t())) :: list
defp get_changesets(content, fields) do
content |> Enum.map(&to_changeset(&1, fields)) |> sort_changeset(fields) |> set_id()
end
@spec to_changeset(map, list(Field.t())) :: map
defp to_changeset(row, fields) do
Enum.reduce(fields, %{}, fn %Field{name: name, column: column, parser: parser, join: join}, acc ->
Map.put(acc, name, parser.(get_value(row, column, join)))
end)
end
@spec set_id(list(map)) :: list(map)
defp set_id(changesets), do: changesets |> Enum.with_index(1) |> Enum.map(&set_index(&1))
@spec set_index({row, non_neg_integer}) :: row
defp set_index({row, index}) when is_map(row), do: Map.put(row, :__id__, index)
defp set_index({row, index}) when is_list(row), do: [index | row]
@spec get_id(row) :: term
defp get_id(collection), do: get_value(collection, :__id__)
@spec get_value(row, String.t() | atom | number, String.t()) :: term
defp get_value(collection, columns, join \\ "")
defp get_value(collection, columns, join) when is_list(columns) do
columns |> Enum.map(&get_value(collection, &1, join)) |> Enum.join(join)
end
defp get_value(collection, elem, _) when is_map(collection), do: Map.get(collection, elem)
defp get_value(collection, :__id__, _) when is_list(collection), do: Enum.at(collection, 0)
defp get_value(collection, elem, _) when is_list(collection), do: Enum.at(collection, elem - 1)
@spec sort_changeset(list(map), list(Field.t())) :: list(map)
defp sort_changeset(changesets, fields) do
Enum.reduce(fields, changesets, fn
%Field{sort: nil}, cs -> cs
%Field{name: name, sort: sort}, cs -> Enum.sort_by(cs, &Map.get(&1, name), &sorter(&1, &2, sort))
end)
end
@spec sorter(any, any, order) :: boolean
defp sorter(nil, _, :asc), do: false
defp sorter(nil, _, :desc), do: true
defp sorter(_, nil, :asc), do: true
defp sorter(_, nil, :desc), do: false
defp sorter(value1, value2, :asc), do: value1 <= value2
defp sorter(value1, value2, :desc), do: value1 > value2
#
## Validations
#
@doc false
@spec __validate__(%Stream{}, list(Field.t()), boolean) :: :ok | no_return
def __validate__(content, fields, headers) do
validate_csv_not_empty(content)
validate_csv_has_fields(content, fields, headers)
validate_key(content, fields)
validate_unique(content, fields)
end
@spec validate_csv_not_empty(%Stream{}) :: :ok | no_return
defp validate_csv_not_empty(content) do
if content |> Stream.take(1) |> Enum.count() > 0, do: :ok, else: raise("Provided csv is empty")
end
@spec validate_csv_has_fields(%Stream{}, list(Field.t()), boolean) :: :ok | no_return
defp validate_csv_has_fields(content, fields, true) do
content
|> Stream.take(1)
|> Enum.each(fn row ->
if match_all_fields?(row, fields), do: :ok, else: raise("Not all fields are mapped to csv")
end)
end
defp validate_csv_has_fields(content, fields, _) do
content
|> Stream.take(1)
|> Enum.each(fn row ->
fields
|> Enum.map(fn %Field{column: column} -> column end)
|> Enum.reject(fn
column when is_list(column) -> Enum.all?(column, fn col -> col in 1..(length(row) - 1) end)
column -> column in 1..(length(row) - 1)
end)
|> case do
[] -> :ok
cl -> raise "Indexes #{inspect(cl)} should be between 1 and #{length(row)}"
end
end)
end
@spec match_all_fields?(row, list(Field.t())) :: boolean
defp match_all_fields?(row, fields) do
Enum.all?(fields, fn
%Field{column: column} when is_list(column) -> Enum.all?(column, &Map.has_key?(row, &1))
%Field{column: column} -> Map.has_key?(row, column)
end)
end
@spec validate_key(%Stream{}, list(Field.t())) :: :ok | no_return
defp validate_key(content, fields) do
case Enum.filter(fields, & &1.key) do
[] -> :ok
[field] -> valid_key_field?(content, field)
fields -> raise "Multiple keys defined (#{fields |> Enum.map(& &1.column) |> Enum.join(", ")})"
end
end
@spec valid_key_field?(%Stream{}, list(Field.t())) :: :ok | no_return
defp valid_key_field?(content, field) do
values = Enum.map(content, &get_value(&1, field.column, field.join))
unique = values |> Enum.uniq() |> Enum.reject(fn value -> is_nil(value) || value == "" end) |> Enum.count()
if Enum.count(values) != unique do
raise "Key field #{field.column} contains empty, nil or not unique values: #{inspect(get_duplicates(values))}"
else
:ok
end
end
@spec validate_unique(%Stream{}, list(Field.t())) :: :ok | no_return
defp validate_unique(content, fields) do
Enum.each(fields, fn
%Field{column: column, key: true, join: join} -> unique_or_raise(content, column, join)
%Field{column: column, unique: true, join: join} -> unique_or_raise(content, column, join)
_ -> :ok
end)
end
@spec unique_or_raise(%Stream{}, term, String.t()) :: :ok | no_return
defp unique_or_raise(content, column, join) do
values = content |> Stream.map(&get_value(&1, column, join)) |> Enum.reject(&(is_nil(&1) || &1 == ""))
if Enum.count(values) != values |> Enum.uniq() |> Enum.count() do
raise "Field #{column}, set as unique, contains duplicates: #{inspect(get_duplicates(values))}"
else
:ok
end
end
@spec get_duplicates(list) :: list
defp get_duplicates(list) do
list
|> Enum.reduce({%{}, %{}}, fn x, {elems, dupes} ->
if Map.has_key?(elems, x), do: {elems, Map.put(dupes, x, nil)}, else: {Map.put(elems, x, nil), dupes}
end)
|> elem(1)
|> Map.keys()
end
end