defmodule Couchx.Adapter do
@moduledoc """
Adapter to get basic query functionality into `Ecto` with `CouchDB`.
## Configuration
It uses the same as Ecto pattern to config the Dbs with this format:
```
config :my_app, MyRepo,
username: "username",
password: "password",
database: "db_name",
hostname: "localhost",
protocol: "http",
port: 5984
```
## Usage
Couchx supports 1 main repo and many dynamic supervised repos.
A dynamic repo will allow you to have multiple db connections in your application.
To achieve this, you will need to setup a `DynamicSupervisor` and a `Registry` in the application like:
```
def start(_type, _args) do
children = [
{DynamicSupervisor, strategy: :one_for_one, name: CouchxSupervisor}
{Registry, keys: :unique, name: CouchxRegistry},
...
]
...
end
```
The Restry name is tied up to the code so it must be called `CouchxRegistry`.
The main Repo is configured as any other Ecto Repo, so you can start it in the application just adding it to the children list.
```
def start(_type, _args) do
children = [
MyDb.Repo
]
...
end
```
### Dynamic Repo queries
The dynamic repos are implemente with a Macro that you can get into your repo as:
```
use CouchxDyncamicTepo, otp_app: :my_app, name: :my_repo
```
This is used to setup a `run` function, with a callback as argument.
To execute actions in a dynamic repo we follow this pattern:
```
MyDynamicRepo.run( ->
MyDynamicRepo.get(MyStruct, doc_id)
end)
```
Any Repo call inside the callback function will be run in a dynamically supervised connection.
"""
import Couchx.CouchId
@behaviour Ecto.Adapter
@behaviour Ecto.Adapter.Schema
@behaviour Ecto.Adapter.Queryable
@query_map [
==: "$eq",
or: "$or",
and: "$and",
in: "$in"
]
defmacro __before_compile__(_env), do: :ok
def init(config) do
config = put_conn_id(config)
log = Keyword.get(config, :log, :debug)
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}
spec = couchdb_supervisor_spec(config)
{:ok, spec, %{telemetry: telemetry, opts: [returning: true], config: config}}
end
def ensure_all_started(_repo, _type), do: HTTPoison.start
def checkout(_adapter, _config, result), do: result
def dumpers({:map, _}, type), do: [&Ecto.Type.embedded_dump(type, &1, :json)]
def dumpers(_primitive, type), do: [type]
def loaders({:map, _}, type), do: [&Ecto.Type.embedded_load(type, &1, :json)]
def loaders(_primitive, type), do: [type]
def autogenerate(:id), do: nil
def autogenerate(:binary_id) do
Ecto.UUID.cast!(Ecto.UUID.bingenerate)
end
def checked_out?(arg), do: arg
def insert_all(_, _, _, _, _, _, _, _), do: nil
def prepare(:all, query) do
%{wheres: wheres} = query
keys = Enum.map(wheres, &parse_where/1)
{:nocache, {System.unique_integer([:positive]), keys}}
end
def prepare(:delete_all, _query) do
{:nocache, {System.unique_integer([:positive]), [:delete]}}
end
def insert(meta, repo, fields, _on_conflict, returning, _options) do
data = Enum.into(fields, %{})
|> build_id(repo)
url = URI.encode_www_form(data._id)
body = Jason.encode!(data)
{:ok, response} = Couchx.DbConnection.insert(meta[:pid], url, body)
response = Map.merge(data, %{_id: response["id"], _rev: response["rev"]})
values = Enum.map(returning, fn(k)-> Map.get(response, k) end)
{:ok, Enum.zip(returning, values)}
end
def execute(:view, meta, design, view, key, query_opts) do
opts = query_opts
|> Enum.into(%{})
|> Map.merge(%{key: key})
Couchx.DbConnection.get(meta[:pid], "_design/#{design}/_view/#{view}", opts)
|> parse_view_response(opts[:include_docs])
end
def execute(meta, query_meta, query_cache, params, _opts) do
{_, {_, keys}} = query_cache
%{select: select} = query_meta
{all_fields, module} = fetch_fields(query_meta.sources)
namespace = build_namespace(module)
fields_meta = fields_meta(select[:from])
fields = case select[:postprocess] do
{:map, keyfields} ->
Keyword.keys(keyfields)
|> Enum.map(&Atom.to_string/1)
_-> all_fields
end
case do_query(meta[:pid], keys, namespace, params) do
{:ok, %{"rows" => rows}} ->
Enum.map(rows, fn(row)->
row
|> Map.get("doc")
|> Map.take(fields)
|> Map.values
end) |> execute_response
{:ok, response} ->
process_docs(response, fields, fields_meta)
|> execute_response
[] ->
{0, []}
{:error, reason} ->
raise Couchx.DbError, message: "#{reason}"
end
end
def create_admin(server, name, password) do
Couchx.DbConnection.create_admin(server, name, password)
end
def create_db(server, name) do
Couchx.DbConnection.create_db(server, name)
end
def delete_admin(server, name) do
Couchx.DbConnection.delete_admin(server, name)
end
def delete_db(server, name) do
Couchx.DbConnection.delete_db(server, name)
end
defp parse_where([]), do: []
defp parse_where(%Ecto.Query.BooleanExpr{expr: expr}) do
{condition, _, fields} = expr
build_query_condition(condition, fields)
end
defp build_query_condition(_, [{{_, [], [{_, [], [_]}, key]}, [], []}, value]) do
%{ key => value }
end
defp build_query_condition(condition, fields) do
%{ @query_map[condition] => build_query(fields) }
end
defp build_query(fields) do
Enum.map(fields, &build_field_condition/1)
end
defp build_field_condition({:^, [], [0]}), do: :primary_key
defp build_field_condition({{_, _, [{_, _, [0]}, key]}, _, _}), do: %{key => :empty}
defp build_field_condition({expr, _, [{{_, _, [_, key]}, _, _}, value]}) do
%{ key => %{ @query_map[expr] => value } }
end
defp execute_response([]), do: []
defp execute_response(values) when is_list(values) do
[item | _] = values
if is_list(item) do
{length(values), values}
else
{1, [values]}
end
end
defp fetch_fields({{resource, nil, _}}) do
module = ["Elixir", ".", resource]
|> Enum.map(&Inflex.singularize/1)
|> Enum.map(&String.capitalize/1)
|> Enum.join
|> String.to_existing_atom
fetch_fields({{resource, module, nil}})
end
defp fetch_fields({{_resource, module, _}}) do
fields = module.__struct__
|> Map.keys
|> Kernel.--([:__struct__, :__meta__])
|> Enum.map(&Atom.to_string/1)
{fields, module}
end
defp do_query(server, [%{_id: {:^, [], [0, _total]}}], namespace, ids) when is_list(ids) do
do_query(server, [%{_id: ids}], namespace, [])
end
defp do_query(server, [%{_id: ids}], namespace, []) when is_list(ids) do
doc_ids = Enum.map(ids, &namespace_id(namespace, &1))
|> Enum.map(&URI.decode_www_form/1)
Couchx.DbConnection.get(server, "_all_docs", [keys: Jason.encode!(doc_ids), include_docs: true])
end
defp do_query(server, [%{"$eq" => [%{_id: :empty}, :primary_key]}], namespace, [id | _]) do
Couchx.DbConnection.get(server, namespace_id(namespace, id))
end
defp do_query(server, [%{_id: {:^, [], [0]}}], namespace, [id | _]) do
Couchx.DbConnection.get(server, namespace_id(namespace, id))
end
defp do_query(server, [:delete], namespace, []) do
{:ok, %{"rows" => rows}} = Couchx.DbConnection.get(server, "_all_docs", [limit: 100, include_docs: true, startkey: Jason.encode!(namespace), endkey: Jason.encode!("#{namespace}/{}")])
docs = Enum.map(rows, fn(%{"doc" => doc})-> %{_id: doc["_id"], _rev: doc["_rev"], _deleted: true} end)
Couchx.DbConnection.bulk_docs(server, docs)
end
defp do_query(server, [], namespace, []) do
{:ok, %{"rows" => rows}} = Couchx.DbConnection.get(server, "_all_docs", [include_docs: true, limit: 100, include_docs: true, startkey: Jason.encode!(namespace), endkey: Jason.encode!("#{namespace}/{}")])
Enum.map(rows, &Map.get(&1, "doc"))
end
defp do_query(_, _, _, _), do: {:error, :not_implemented}
defp build_namespace(module) do
module
|> to_string
|> String.split(".")
|> List.last
|> Macro.underscore
end
defp namespace_id(namespace, id) do
if Regex.match?(~r{^#{namespace}/}, id) do
URI.encode_www_form(id)
else
namespace
|> Kernel.<>("/#{id}")
|> URI.encode_www_form
end
end
defp build_id(data, %{schema: resource}) do
resource
|> to_string
|> String.split(".")
|> List.last
|> Macro.underscore
|> Kernel.<>("/#{base_id(data._id)}")
|> update_data_id(data)
end
defp update_data_id(id, data) do
Map.put(data, :_id, id)
end
defp parse_view_response({:ok, %{"rows" => rows}}, true) do
rows
|> Enum.map(&Map.get(&1, "doc"))
|> Enum.map(&build_structs/1)
end
defp parse_view_response({:ok, %{"rows" => rows}}, _), do: rows
defp build_structs(map) do
doc_type = Map.get(map, "_id")
|> String.replace(~r{(/.+)}, "")
|> Macro.camelize
module = :"Elixir.SDB.#{doc_type}" # TODO: pass module name in view execute
doc = Enum.reduce(map, %{}, &keys_to_atoms/2)
struct(module, doc)
end
defp keys_to_atoms({key, value}, acc) do
Map.put(acc, String.to_atom(key), value)
end
defp put_conn_id(config), do: config ++ [id: config[:name]]
defp couchdb_supervisor_spec(config) do
{
config[:id],
{
DynamicSupervisor,
:start_child,
[
CouchxSupervisor,
{Couchx.DbConnection, config}
]
},
:permanent,
:infinity,
:worker,
[config[:id]]
}
end
defp fields_meta({_, {_, _, _, fields_meta}}), do: fields_meta
defp fields_meta(_), do: nil
defp process_docs(rows, fields, meta) when is_list(rows) do
Enum.map(rows, &process_docs(&1, fields, meta))
end
defp process_docs(doc, fields, nil) do
doc
|> Map.take(fields)
|> Map.values
end
# TODO: move to process docs module to be imported
defp process_docs(doc, fields, meta) do
template = doc_template(meta)
doc = Map.take(doc, fields)
template
|> Map.merge(doc)
|> Map.values
end
defp doc_template(fields) do
fields
|> Enum.reduce(%{}, fn({key, type}, acc)->
Map.put(acc, "#{key}", default_value(type))
end)
end
defp default_value(:string), do: ""
defp default_value(:integer), do: 0
defp default_value(:boolean), do: false
defp default_value({:array, _}), do: []
defp default_value(:map), do: %{}
defp default_value({:map, _}), do: %{}
defp default_value(:binary_id), do: ""
# Pending implementation
def delete(meta, _meta_schema, params, _opts) do
doc_id = URI.encode_www_form(params[:_id])
Couchx.DbConnection.get(meta[:pid], doc_id)
|> find_to_delete(meta[:pid], doc_id)
end
def insert_all(_a, _b, _c, _d, _e, _f, _g) do
end
def update(meta, _repo, fields, identity, returning, _) do
data = for {key, val} <- fields, into: %{}, do: {Atom.to_string(key), val}
doc_id = URI.encode_www_form(identity[:_id])
{:ok, response} = Couchx.DbConnection.get(meta[:pid], doc_id)
values = Map.merge(response, data)
body = Jason.encode!(values)
{:ok, response} = Couchx.DbConnection.insert(meta[:pid], doc_id, body)
values = fetch_insert_values(response, values, returning)
{:ok, Enum.zip(returning, values)}
end
def update!(meta, repo, fields, identity, returning, a) do
{:ok, values} = update(meta, repo, fields, identity, returning, a)
values
end
def stream(_a, _b, _c, _d, _e) do
end
defp fetch_insert_values(%{"ok" => true}, response, returning) do
data = for {key, val} <- response, into: %{}, do: {String.to_atom(key), val}
Enum.map(returning, fn(k)->
Map.get(data, k)
end)
end
defp fetch_insert_values(_, _, _) do
raise "Fail to save document"
end
defp find_to_delete({:ok, doc}, pid, doc_id) do
Couchx.DbConnection.delete(pid, doc_id, doc["_rev"])
|> handle_delete_response
end
defp find_to_delete({:error, error}, _, _) do
raise error
end
defp handle_delete_response({:ok, _}) do
{:ok, []}
end
defp handle_delete_response({:error, error}) do
raise error
end
end