lib/couchx/adapter.ex

defmodule Couchx.Adapter do
  alias Couchx.PrepareQuery
  alias Couchx.QueryHandler
  alias Couchx.Constraint
  alias Couchx.DocumentState

  @moduledoc """
  Adapter to get basic query functionality into `Ecto` with `CouchDB`.

  ## Configuration

  It uses the same as Ecto pattern to config the Dbs with this format:

  ```
  config :my_app, MyRepo,
    username: "username",
    password: "password",
    database: "db_name",
    hostname: "localhost",
    protocol: "http",
    port: 5984
  ```

  ## Usage

  Couchx supports 1 main repo and many dynamic supervised repos.
  A dynamic repo will allow you to have multiple db connections in your application.
  To achieve this, you will need to setup a `DynamicSupervisor` and a `Registry` in the application like:

  ```
    def start(_type, _args) do
      children = [
        {DynamicSupervisor, strategy: :one_for_one, name: CouchxSupervisor}
        {Registry, keys: :unique, name: CouchxRegistry},
        ...
      ]
      ...
    end
  ```

  The Restry name is tied up to the code so it must be called `CouchxRegistry`.

  The main Repo is configured as any other Ecto Repo, so you can start it in the application just adding it to the children list.

  ```
    def start(_type, _args) do
      children = [
        MyDb.Repo
      ]
      ...
    end
  ```

  ### Dynamic Repo queries

  The dynamic repos are implemente with a Macro that you can get into your repo as:

  ```
    use CouchxDyncamicTepo, otp_app: :my_app, name: :my_repo
  ```

  This is used to setup a `run` function, with a callback as argument.
  To execute actions in a dynamic repo we follow this pattern:

  ```
    MyDynamicRepo.run( ->
      MyDynamicRepo.get(MyStruct, doc_id)
    end)
  ```

  Any Repo call inside the callback function will be run in a dynamically supervised connection.


  ## Migrations

  Couchx comes with a Mango index generator.

  ### Example

      $ mix couchx.gen.mango_index -r MyApp.Repo -n my-mango-index -f username,email

  This will create a file under `priv/my_app/repo/index/my-mango-index.exs` with contents as:

  ````
  defmodule MyApp.Repo.Index.MyMangoIndex do
    use Couchx.MangoIndex, repo_name: MyApp.Repo

    def up do
      create_index "my-mango-index" do
        %{
           fields: ["username", "email"],
         }
      end
    end

    def down do
      drop_index "my-mango-index"
    end
  end

  ````

  The Map inside the `create_index` block will be added to the `index` json object, so any structure that can go there can be added here.
  Currently only supported methods are

  ### create_index(String.t(), (-> Map.t()))

    - name: ID and Name for the index to be created in CouchDB, this will be used as `id` for the document persisted.
    - fun: A block that returns a formated Map for the index to be created, it will be parsed as JSON to the body of the index document.

  ### drop_index(String.t())

    - name: Id and Name for the index document to be deleted

  ### Examples

      $ mix couchx.mango_index

      Will add all indexes store under `priv/my_app/repo/index/` paths

      $ mix couchx.mango_index.down -r MyApp.Repo -n my-mango-index,my-other-index

      It will call down function on the Migration files

      ```
        priv/my_app/repo/index/my-mango-index.exs
        priv/my_app/repo/index/my-other-index.exs
      ```

      Removing the documents from the database.

  """
  import Couchx.CouchId

  @behaviour Ecto.Adapter
  @behaviour Ecto.Adapter.Schema
  @behaviour Ecto.Adapter.Queryable

  @encodable_keys ~w[key keys startkey endkey start_key end_key]a

  defmacro __before_compile__(_env), do: :ok

  def init(config) do
    config           = put_conn_id(config)
    log              = Keyword.get(config, :log, :debug)
    telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
    telemetry        = {config[:repo], log, telemetry_prefix ++ [:query]}
    spec             = couchdb_supervisor_spec(config)

    {:ok, spec, %{telemetry: telemetry, opts: [returning: true], config: config}}
  end

  def ensure_all_started(_repo, _type), do: HTTPoison.start
  def checkout(_adapter, _config, result), do: result

  def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)]
  def dumpers(_primitive, type), do: [type]

  def loaders({:map, _}, type), do: [&Ecto.Type.embedded_load(type, &1, :json)]
  def loaders(_primitive, type), do: [type]

  def autogenerate(:id), do: nil
  def autogenerate(:binary_id) do
    Ecto.UUID.cast!(Ecto.UUID.bingenerate)
  end

  def checked_out?(arg), do: arg

  def prepare(:all, query) do
    prepared_query = PrepareQuery.call(query)
    {:nocache, {System.unique_integer([:positive]), prepared_query}}
  end

  def prepare(:delete_all, _query) do
    {:nocache, {System.unique_integer([:positive]), [:delete]}}
  end

  def insert(meta, repo, fields, _on_conflict, returning, _options) do
    constraints = Constraint.call(meta[:pid], repo, fields)

    constraints
    |> DocumentState.merge_constraints
    |> do_insert(repo, constraints, fields, returning, meta)
  end

  def insert_all(meta, _repo, _fields, data, _on_conflict, schema, _returning, _opts) do
    docs = Enum.map(data, &Enum.into(&1, %{}))

    {:ok, res} = Couchx.DbConnection.bulk_docs(meta[:pid], docs)
    {:ok, Enum.map(res, &parse_bulk_response(&1, data, schema))}
  end

  def parse_bulk_response(%{"error" => _error, "id" => doc_id}, data, schema) do
    parse_bulk_response(%{"rev" => nil, "id" => doc_id}, data, schema)
  end

  def parse_bulk_response(%{"rev" => rev, "id" => doc_id}, data, schema) do
    fillers = Enum.map(schema, fn(_)-> nil end)
    doc_template = Enum.zip(schema, fillers)
    response_data = Enum.find(data, fn([{:_id, id} | _]) -> id == doc_id end) ++ [_rev: rev]
    doc = Keyword.merge(doc_template, response_data)

    Enum.map(schema, fn(key)-> Keyword.get(doc, key) end)
  end

  def execute(:view, meta, design, view, key, query_opts) do
    query_opts = query_opts ++ [key: Jason.encode!(key)]
    execute(:view, meta, design, view, query_opts)
  end

  def execute(:view, meta, design, view, query_opts) do
    opts = prepare_view_options(query_opts)
    Couchx.DbConnection.get(meta[:pid], "_design/#{design}/_view/#{view}", opts)
    |> parse_view_response(opts[:include_docs], query_opts[:module])
  end

  def execute(:find, meta, selector, fields, opts) do
    query = %{selector: selector, fields: fields}

    Couchx.DbConnection.find(meta[:pid], query, opts)
    |> parse_view_response(opts[:include_docs], opts[:module])
  end

  def execute(:request, meta, method, path, opts) do
    Couchx.DbConnection.raw_request(meta[:pid], method, path, opts)
    |> parse_view_response(opts[:include_docs], opts[:module])
  end

  def execute(meta, query_meta, query_cache, params, _opts) do
    {_, {_, query}}        = query_cache
    %{select: select}      = query_meta
    keys                   = query[:keys]
    query_options          = query[:options]
    {all_fields, module}   = fetch_fields(query_meta.sources)
    namespace              = build_namespace(module)

    fields_meta = fields_meta(select[:from])

    fields = case select[:postprocess] do
      {:map, keyfields} ->
        Keyword.keys(keyfields)
        |> Enum.map(&Atom.to_string/1)
      _-> all_fields
    end

    query = if select[:take] do
      %{fields: select[:take]}
    else
      %{}
    end

    do_query(meta[:pid], keys, namespace, params, Map.merge(query, query_options))
    |> QueryHandler.query_results(fields, fields_meta)
  end

  def create_admin(server, name, password) do
    Couchx.DbConnection.create_admin(server, name, password)
  end

  def create_db(server, name) do
    Couchx.DbConnection.create_db(server, name)
  end

  def delete_admin(server, name) do
    Couchx.DbConnection.delete_admin(server, name)
  end

  def delete_db(server, name) do
    Couchx.DbConnection.delete_db(server, name)
  end

  defp fetch_fields({{resource, nil, _}}) do
    module = ["Elixir", ".", resource]
               |> Enum.map(&Inflex.singularize/1)
               |> Enum.map(&String.capitalize/1)
               |> Enum.join
               |> String.to_existing_atom

    fetch_fields({{resource, module, nil}})
  end

  defp fetch_fields({{_resource, module, _}}) do
    fields = module.__struct__
               |> Map.keys
               |> Kernel.--([:__struct__, :__meta__])
               |> Enum.map(&Atom.to_string/1)

    {fields, module}
  end

  defp do_query(server, [%{_id: {:^, [], [0, _total]}}], namespace, ids, select) when is_list(ids) do
    do_query(server, [%{_id: ids}], namespace, [], select)
  end

  defp do_query(server, [%{_id: ids}], namespace, [], _select) when is_list(ids) do
    doc_ids = Enum.map(ids, &namespace_id(namespace, &1))
              |> Enum.map(&URI.decode_www_form/1)

    Couchx.DbConnection.all_docs(server, doc_ids, include_docs: true)
    |> sanitize_collection
  end

  defp do_query(server, [%{"$eq" => [%{_id: :empty}, :primary_key]}], namespace, [id | _], select) do
    do_query(server, [%{_id: id}], namespace, [], select)
  end

  defp do_query(server, [%{_id: {:^, [], [0]}}], namespace, [id | _], _select) do
    do_query(server, [%{_id: id}], namespace, [], [])
  end

  defp do_query(server, [%{_id: id}], namespace, [], []) do
    Couchx.DbConnection.get(server, namespace_id(namespace, id))
  end

  defp do_query(server, [%{_id: id}], namespace, [], select) when is_list(select) do
    namespaced_id = unencoded_namespace_id(namespace, id)
    query = select_query(%{_id: namespaced_id}, select)

    Couchx.DbConnection.find(server, query)
  end

  defp do_query(server, [%{_id: id}], namespace, [], _) do
    Couchx.DbConnection.get(server, namespace_id(namespace, id))
  end

  defp do_query(server, [:delete], namespace, [], _select) do
    {:ok, %{"rows" => rows}} = Couchx.DbConnection.get(server, "_all_docs", [limit: 100, include_docs: true, startkey: Jason.encode!(namespace), endkey: Jason.encode!("#{namespace}/{}")])
    docs = Enum.map(rows, fn(%{"doc" => doc})-> %{_id: doc["_id"], _rev: doc["_rev"], _deleted: true} end)
    Couchx.DbConnection.bulk_docs(server, docs)
  end

  defp do_query(server, [], namespace, [], query_options) do
    limit = query_options[:limit] || 100
    orders= query_options[:sort]

    opts =  [
      include_docs: true,
      limit: limit,
      include_docs: true,
      startkey: Jason.encode!(namespace),
      endkey: Jason.encode!("#{namespace}/{}")
    ]

    descending = if orders do
      [default_order | _] = orders

      default_order
      |> Map.values
      |> List.flatten
      |> List.first
      |> Kernel.==(:desc)
    end

    opts = if descending do
      startkey = opts[:startkey]
      endkey = opts[:endkey]

      Keyword.replace(opts, :startkey, endkey)
      |> Keyword.replace(:endkey, startkey)
      |> Kernel.++([descending: true])
    else
      opts
    end

    {:ok, %{"rows" => rows}} = Couchx.DbConnection.get(server, "_all_docs", opts)
    Enum.map(rows, &Map.get(&1, "doc"))
  end

  defp do_query(server, properties, namespace, values, query_options) when is_list(properties) do
    selector = Enum.reduce(properties, %{type: namespace}, &process_property(&1, &2, values))
    query = select_query(selector, query_options)
    Couchx.DbConnection.find(server, query)
  end

  defp select_query(selector, options) do
    %{selector: selector}
    |> Map.merge(options)
  end

  defp process_property({key, {:^, [], [value_index]}}, acc, values) do
    value = Enum.fetch!(values, value_index)
    Map.put(acc, key, value)
  end

  defp process_property({key, value}, acc, _values) do
    Map.put(acc, key, value)
  end

  defp process_property(property, acc, values) do
    Enum.reduce(property, %{}, &process_property(&1, &2, values))
    |> Map.merge(acc)
  end

  #defp do_query(_, _, _, _), do: {:error, :not_implemented}

  defp build_namespace(module) do
    module
      |> to_string
      |> String.split(".")
      |> List.last
      |> Macro.underscore
  end

  defp namespace_id(namespace, id) do
    if Regex.match?(~r{^#{namespace}/}, id) do
      URI.encode_www_form(id)
    else
      namespace
        |> Kernel.<>("/#{id}")
        |> URI.encode_www_form
    end
  end

  defp typed_document(data, %{schema: resource}) do
    Map.put(data, :type, build_namespace(resource))
  end

  defp build_id(data, %{schema: resource}) do
    resource
      |> to_string
      |> String.split(".")
      |> List.last
      |> Macro.underscore
      |> Kernel.<>("/#{base_id(data._id)}")
      |> update_data_id(data)
  end

  defp update_data_id(id, data) do
    Map.put(data, :_id, id)
  end

  defp parse_view_response({:ok, %{"rows" => rows}}, true, module_name) do
    rows
    |> Enum.map(&Map.get(&1, "doc"))
    |> Enum.map(&build_structs(&1, module_name))
  end

  defp parse_view_response({:ok, %{"rows" => rows}}, _, _), do: rows
  defp parse_view_response({:ok, %{"bookmark" => _, "docs" => docs}}, _, _), do: docs
  defp parse_view_response({:ok, raw_response}, _, _), do: raw_response

  defp parse_view_response({:error, _} = error, _, _), do: error

  defp build_structs(map, module_name) do
    doc = Enum.reduce(map, %{}, &keys_to_atoms/2)
    module_name = fetch_module_name(map, module_name)
    struct(module_name, doc)
  end

  defp fetch_module_name(map, nil) do
    doc_type = Map.get(map, "_id")
               |> String.replace(~r{(/.+)}, "")
               |> Macro.camelize

    :"Elixir.SDB.#{doc_type}"
  end

  defp fetch_module_name(_map, name), do: name

  defp keys_to_atoms({key, value}, acc) do
    Map.put(acc, String.to_atom(key), value)
  end

  defp put_conn_id(config), do: config ++ [id: config[:name]]

  defp couchdb_supervisor_spec(config) do
    {
      config[:id],
      {
        DynamicSupervisor,
        :start_child,
        [
          CouchxSupervisor,
          {Couchx.DbConnection, config}
        ]
      },
      :permanent,
      :infinity,
      :worker,
      [config[:id]]
    }
  end

  defp fields_meta({_, {_, _, _, fields_meta}}), do: fields_meta
  defp fields_meta(_), do: nil

  # Pending implementation

  def delete(meta, _meta_schema, params, _opts) do
    doc_id = URI.encode_www_form(params[:_id])
    Couchx.DbConnection.get(meta[:pid], doc_id)
    |> find_to_delete(meta[:pid], doc_id)
  end

  def insert_all(_a, _b, _c, _d, _e, _f, _g) do
  end

  def update(meta, repo, fields, identity, returning, _opts) do
    data            = for {key, val} <- fields, into: %{}, do: {Atom.to_string(key), val}
    doc_id          = URI.encode_www_form(identity[:_id])
    {:ok, response} = Couchx.DbConnection.get(meta[:pid], doc_id)

    prev_fields = for {key, val} <- response, do: {String.to_atom(key), val}
    constraints = Constraint.call(meta[:pid], repo, fields, prev_fields)

    constraints
    |> DocumentState.merge_constraints
    |> do_update(constraints, doc_id, response, data, returning, meta[:pid])
  end


  def update!(meta, repo, fields, identity, returning, a) do
    {:ok, values} = update(meta, repo, fields, identity, returning, a)
    values
  end

  def stream(_a, _b, _c, _d, _e) do
  end

  def do_insert(errors, _, _, _, _, _)
    when length(errors) > 0 do
    {:invalid, errors}
  end

  def do_insert(_errors, repo, constraints, fields, returning, meta) do
    data = Enum.into(fields, %{})
           |> build_id(repo)
           |> typed_document(repo)

    url  = URI.encode_www_form(data._id)
    body = Jason.encode!(data)

    constraints
    |> DocumentState.process_constraints(meta[:pid])
    |> try_to_persist_insert(data, returning, meta, url, body)
  end

  defp try_to_persist_insert({:invalid, constraints}, _, _, _, _, _) do
    {:invalid, constraints[:invalid]}
  end

  defp try_to_persist_insert(%{invalid: constraints}, _, _, _, _, _) do
    {:invalid, constraints}
  end

  defp try_to_persist_insert(%{error: errors}, _data, _returning, _meta, _url, _body) do
    {:error, errors}
  end

  defp try_to_persist_insert(%{ok: _}, data, returning, meta, url, body) do
    {:ok, response} = Couchx.DbConnection.insert(meta[:pid], url, body)
    response = Map.merge(data, %{_id: response["id"], _rev: response["rev"]})
    values = Enum.map(returning, fn(k)-> Map.get(response, k) end)
    {:ok, Enum.zip(returning, values)}
  end

  defp do_update(errors, _constraints, _id, _response, _data, _returning, _server)
    when length(errors) > 0 do
      {:invalid, errors}
  end

  defp do_update(_errors, constraints, doc_id, response, data, returning, server) do
    constraints
    |> DocumentState.process_constraints(server)
    |> try_to_persist_update(doc_id, response, returning, data, server)
  end

  defp try_to_persist_update({:invalid, constraints}, _, _, _, _, _) do
    {:invalid, constraints[:invalid]}
  end

  defp try_to_persist_update(%{invalid: constraints}, _, _, _, _, _) do
    {:invalid, constraints}
  end

  defp try_to_persist_update(%{error: errors}, _doc_id, _response, _returning, _data, _server) do
    {:error, errors}
  end

  defp try_to_persist_update(%{ok: _}, doc_id, response, returning, data, server) do
    values = Map.merge(response, data)
    body   = Jason.encode!(values)

    case Couchx.DbConnection.insert(server, doc_id, body) do
      {:ok, response} ->
        values = fetch_insert_values(response, values, returning)
        {:ok, Enum.zip(returning, values)}
      {:error, error} ->
        {:error, error}
    end
  end

  defp fetch_insert_values(%{"ok" => true}, response, returning) do
    data = for {key, val} <- response, into: %{}, do: {String.to_atom(key), val}

    Enum.map(returning, fn(k)->
      Map.get(data, k)
    end)
  end

  defp fetch_insert_values(_, _, _) do
    raise "Fail to save document"
  end

  defp find_to_delete({:ok, doc}, pid, doc_id) do
    Couchx.DbConnection.delete(pid, doc_id, doc["_rev"])
    |> handle_delete_response
  end

  defp find_to_delete({:error, error}, _, _) do
    raise error
  end

  defp handle_delete_response({:ok, _}) do
    {:ok, []}
  end

  defp handle_delete_response({:error, error}) do
    raise error
  end

  defp sanitize_collection({:ok, %{"rows" => rows}}) do
    rows = Enum.filter(rows, &Map.get(&1, "doc"))
    {:ok, %{"rows" => rows}}
  end

  defp sanitize_collection({:error, error}) do
    {:error, error}
  end

  defp unencoded_namespace_id(namespace, id) do
    namespace
    |> namespace_id(id)
    |> URI.decode_www_form
  end

  defp prepare_view_options(options) do
    @encodable_keys
    |> Enum.reduce(options, fn key, acc ->
       Keyword.replace(acc, key, Jason.encode!(options[key]))
    end)
    |> Enum.into(%{})
  end
end