lib/nx/container.ex

defprotocol Nx.Container do
  @moduledoc """
  A protocol that teaches Nx how to traverse data structures
  non-recursively.

  `Nx` and `defn` expect the arguments to be numbers, tensors,
  or one of the following composite data types:

    1. tuples of numbers/tensors
    2. maps of any key with numbers/tensors as values
    3. any struct that implements `Nx.Container`

  If you need to pass additional values, you can implement
  or derive this protocol. For example:

      @derive {Nx.Container,
               containers: [:field_name, :other_field]}
      defstruct [:field_name, :other_fields, ...]

  The `:containers` option is required and it must specify a
  list of fields that contains tensors. Inside `defn`, the
  container fields will be automatically converted to tensor
  expressions. All other fields will be reset to their default
  value, unless you explicitly declare them to be kept:

      @derive {Nx.Container,
               containers: [:field_name, :other_field],
               keep: [:another_field]}
      defstruct [:field_name, :other_fields, ...]

  Note the functions in this module are not recursive.
  If you want to deeply traverse and reduce containers,
  use the functions in `Nx.Defn.Composite` instead.

  > **Careful!**: If you keep a field, its value will be part
  > of the `Nx.Defn` compiler cache key (i.e. therefore if you
  > give a struct with two different values for a kept field,
  > `Nx.Defn` will have to compile and cache it twice).
  > You must only keep fields that you are certain to be used
  > inside `defn` during compilation time.

  ## Serialization

  If you `@derive {Nx.Container, ...}`, it will automatically
  define a serialization function with the container and keep
  fields you declare. If you expect a struct to be serialized,
  then you must be careful to evolve its schema over time in
  a compatible way. In particular, removing fields will lead to
  crashes. If you change the type of a field value, previously
  serialized structs may still hold the old type. And if you add
  new fields, previously serialized structs won't have such fields
  and therefore be deserialized with its default value.
  """

  @doc """
  Traverses non-recursively tensors in a data structure with `acc` and `fun`.

  `fun` is invoked with each tensor or tensor container in the
  data structure plus an accumulator. It must return a two element
  tuple with the updated value and accumulator.

  This function returns the updated container and the accumulator.

  Given `fun` may receive containers, it is not recursive by default.
  See `Nx.Defn.Composite.traverse/3` for a recursive variant.
  """
  @spec traverse(t(), acc, (t(), acc -> {t(), acc})) :: {t(), acc} when acc: term()
  def traverse(data, acc, fun)

  @doc """
  Reduces non-recursively tensors in a data structure with `acc` and `fun`.

  `fun` is invoked with each tensor or tensor container in the
  data structure plus an accumulator. It must return the new
  accumulator.

  This function the final accumulator.

  Given `fun` may receive containers, it is not recursive by default.
  See `Nx.Defn.Composite.reduce/3` for a recursive variant.
  """
  @spec reduce(t(), acc, (t(), acc -> acc)) :: acc when acc: term()
  def reduce(data, acc, fun)

  @doc """
  Defines how this container must be serialized to disk.

  It receives the container and it must return a three element tuple
  of `{module, list_of_container_tuples, metadata}` where:

    * the `module` to deserialize the container
    * a list of tuples in the shape `{key, container}` with containers to be further serialized
    * additional metadata for serialization/deserialization

  On deserialization, `module.deserialize(list_of_container_tuples, metadata)`
  will be invoked.
  """
  @spec serialize(t()) :: {module(), [{term(), t()}], term()}
  def serialize(struct)
end

defimpl Nx.Container, for: Tuple do
  def traverse(tuple, acc, fun) do
    tuple
    |> Tuple.to_list()
    |> Enum.map_reduce(acc, fun)
    |> then(fn {list, acc} -> {List.to_tuple(list), acc} end)
  end

  def reduce(tuple, acc, fun) do
    tuple
    |> Tuple.to_list()
    |> Enum.reduce(acc, fun)
  end

  def serialize(tuple) do
    pairs = for v <- Tuple.to_list(tuple), do: {[], v}
    {__MODULE__, pairs, :ok}
  end

  def deserialize(pairs, :ok) do
    pairs |> Enum.map(&elem(&1, 1)) |> List.to_tuple()
  end
end

defimpl Nx.Container, for: Map do
  def traverse(map, acc, fun) do
    map
    |> Map.to_list()
    |> Enum.sort()
    |> Enum.map_reduce(acc, fn {k, v}, acc ->
      {v, acc} = fun.(v, acc)
      {{k, v}, acc}
    end)
    |> then(fn {list, acc} -> {Map.new(list), acc} end)
  end

  def reduce(map, acc, fun) do
    map
    |> Map.to_list()
    |> Enum.sort()
    |> Enum.reduce(acc, fn {_, v}, acc -> fun.(v, acc) end)
  end

  def serialize(map) do
    {__MODULE__, Map.to_list(map), :ok}
  end

  def deserialize(pairs, :ok) do
    Map.new(pairs)
  end
end

defimpl Nx.Container, for: [Integer, Float, Complex, Nx.Tensor] do
  def traverse(tensor, acc, fun), do: {tensor, fun.(tensor, acc)}
  def reduce(tensor, acc, fun), do: fun.(tensor, acc)
  def serialize(_), do: raise("cannot be serialized directly")
end

defimpl Nx.Container, for: Any do
  defmacro __deriving__(module, struct, options) do
    containers = Keyword.fetch!(options, :containers)
    keep = Keyword.get(options, :keep, [])

    container_pattern = Enum.map(containers, &field_var(struct, &1))
    keep_pattern = Enum.map(keep, &field_var(struct, &1))
    full_pattern = container_pattern ++ keep_pattern

    updates =
      for field <- containers do
        var = Macro.var(field, Nx.Container)

        quote do
          {unquote(var), acc} = fun.(unquote(var), acc)
        end
      end

    reduces =
      for field <- containers do
        var = Macro.var(field, Nx.Container)

        quote do
          acc = fun.(unquote(var), acc)
        end
      end

    return =
      struct
      |> Map.to_list()
      |> Keyword.drop(keep ++ containers)
      |> Macro.escape()
      |> Keyword.merge(full_pattern)

    quote do
      defimpl Nx.Container, for: unquote(module) do
        def traverse(%{unquote_splicing(full_pattern)} = struct, acc, fun) do
          unquote_splicing(updates)
          {%{unquote_splicing(return)}, acc}
        end

        def reduce(%{unquote_splicing(container_pattern)} = struct, acc, fun) do
          unquote_splicing(reduces)
          acc
        end

        def serialize(%{unquote_splicing(full_pattern)} = struct) do
          {__MODULE__, [unquote_splicing(container_pattern)], [unquote_splicing(keep_pattern)]}
        end

        def deserialize(containers, keep) do
          struct!(unquote(module), containers ++ keep)
        end
      end
    end
  end

  defp field_var(struct, field) do
    unless Map.has_key?(struct, field) do
      raise ArgumentError,
            "cannot derive Nx.Container for struct #{inspect(struct.__struct__)} " <>
              "because it does not have field #{inspect(field)}"
    end

    {field, Macro.var(field, Nx.Container)}
  end

  def traverse(data, _acc, _fun) do
    raise Protocol.UndefinedError, protocol: @protocol, value: data
  end

  def reduce(data, _acc, _fun) do
    raise Protocol.UndefinedError, protocol: @protocol, value: data
  end

  def serialize(data) do
    raise Protocol.UndefinedError, protocol: @protocol, value: data
  end
end