lib/etcd_ex.ex

defmodule EtcdEx do
  @moduledoc """
  An Elixir client for Etcd.

  This is an interface to a background process that represents a single connection
  to an Etcd cluster node.

  ## Usage

  To establish a connection with a single Etcd node, use `start_link/1`. The
  `endpoint` argument passed to it accepts the same options as
  `Mint.HTTP2.connect/4`.

  Upon start, the background process already connects to Etcd, and refuses start
  otherwise.

  Use the functions `get/4`, `put/5`, `delete/4` to read and modify Etcd keys.

  Leases are operated through `grant/3`, `revoke/3` and `keep_alive/3`. You can
  check the validity of  lease using `ttl/4` and list all leases at Etcd with
  `leases/2`.

  Watch for key changes from a watching process using `watch/5`. They can be
  canceled at any moment by using `cancel_watch/3`. Upon watching process
  termination, all watches assigned to a process are automatically canceled,
  all watch references invalidated and the gRPC watch stream is also closed.
  Case the connection to Etcd is interrupted, the background process will
  reconnect all watches, maintaining consistency of revision numbers in the
  transition.

  It is possible to list all keys being watched by a process with
  `list_watches/3`.
  """

  alias EtcdEx.Types

  @type start_opt ::
          {:options, [option]}
          | {:transport, transport}
          | {:transport_opts, transport_opts}

  @type option ::
          {:mode, :connect_all | :random}
          | {:name, String.t()}
          | {:password, String.t()}
          | {:retry, non_neg_integer}
          | {:retry_timeout, pos_integer}
          | {:connect_timeout, timeout}
          | {:auto_sync_interval_ms, timeout}
          | extended_options

  @type transport :: :tcp | :tls | :ssl

  @type transport_opts :: [:gen_tcp.connect_option()] | [:ssl.connect_option()]

  @type extended_options :: {atom, any}

  @type conn :: EtcdEx.Connection.t()

  @type watch_ref :: reference
  @type watching_process :: pid

  @default_timeout :timer.seconds(5)

  @doc """
  Returns a specification to start `EtcdEx` under a supervisor.
  """
  @spec child_spec([start_opt]) :: Supervisor.child_spec()
  defdelegate child_spec(init_opts), to: EtcdEx.Connection

  @doc """
  Starts the `EtcdEx` client as a worker process.

  Manually it can be started as:

      EtcdEx.start_link(name: MyApp.Etcd)

  In your supervisor tree, you would write:

      Supervisor.start_link([
        {EtcdEx, name: MyApp.Etcd}
      ], strategy: :one_for_one)

  ## Options

  All the following keys are optional:

    * `:name` - the process name of the client connection. If you don't use a process
      name, then you should pass the process `pid` at every `EtcdEx` function instead
      of the name.
    * `:endpoint` - a tuple consisting of `{scheme, address, port, connect_opts}`.
      The `scheme` is either `:http` or `:https`, `address` and `port` the Etcd
      node address/port, and `connect_opts` follw the same options as supported
      by `Mint.HTTP2.connect/4`. Defaults to `{:http, "localhost", 2379, []}`.
    * `:keep_alive_interval` - the period to send keep-alive pings to the
      Etcd node. Set `:infinity` to disable keep-alive checks. Should be any
      integer value `>= 10_000`. This option can be used in conjunction with
      `:keep_alive_timeout` to properly disconnect if the Etcd node is not
      responding to network traffic. Defaults to 10 seconds.
    * `:keep_alive_timeout` - the time after sending a keep-alive ping when the
       ping will be considered unacknowledged. Used in conjunction with
       `:keep_alive_interval`. Set to `:infinity` to disable keep-alive checks.
       Should be any integer value `>= 10_000`. Defaults to 10 seconds.
  """
  @spec start_link([start_opt]) :: GenServer.on_start()
  defdelegate start_link(start_opts), to: EtcdEx.Connection

  @doc """
  Gets one or a range of key-value pairs from Etcd.

  The etcd3 data model indexes all keys over a flat binary key space. This
  differs from other key-value store systems that use a hierarchical system of
  organizing keys into directories. Instead of listing keys by directory, keys
  are listed by key intervals `[a, b)`.

  These intervals are often referred to as "ranges" in etcd3. Operations over
  ranges are more powerful than operations on directories. Like a hierarchical
  store, intervals support single key lookups via `[a, a+1)` (e.g.,
  `["a", "a\\x00")` looks up `"a"`) and directory lookups by encoding keys by
  directory depth. In addition to those operations, intervals can also encode
  prefixes; for example the interval `["a", "b")` looks up all keys prefixed by
  the string `"a"`.

  By convention, ranges for a request are denoted by the `key` and `:range_end`
  option. The `key` argument is the first key of the range and should be
  non-empty.  The `:range_end` is the key following the last key of the range.
  If `:range_end` is not given or empty, the range is defined to contain only
  the `key` argument.  If `:range_end` is key plus one (e.g., `"aa"+1 == "ab"`,
  `"a\\xff"+1 == "b"`), then the range represents all keys prefixed with key.
  If both `key` and `:range_end` are `"\\x00"`, then range represents all keys.
  If `:range_end` is `"\\x00"`, the range is all keys greater than or equal to
  the `key` argument.

  ## Options

  The following keys are optional:

    * `:range_end` - the key range to fetch.
    * `:prefix` - if true, sets up `:range_end` as `a+1`.
    * `:from_key` - if true, sets up `:range_end` as `key` to represent all
      keys after `key` argument.
    * `:limit` - the maximum number of keys returned for the request. When
      limit is set to 0 (the default), it is treated as no limit.
    * `:revision` - the point-in-time of the key-value store to use for the
      range. If `:revision` is less or equal to zero, the range is over the
      latest key-value store.  If the revision is compacted, ErrCompacted is
      returned as a response.
    * `:sort` - the key-value field to sort and the ordering for the returned
      response.
    * `:serializable` - sets the range request to use serializable member-local
      reads. By default, Range is linearizable; it reflects the current
      consensus of the cluster. For better performance and availability, in
      exchange for possible stale reads, a serializable range request is served
      locally without needing to reach consensus with other nodes in the
      cluster.
    * `:keys_only` - return only the keys and not the values.
    * `:count_only` - return only the count of the keys in the range.
    * `:min_mod_revision` - the lower bound for key mod revisions; filters out
      lesser mod revisions.
    * `:max_mod_revision` - the upper bound for key mod revisions; filters out
      greater mod revisions.
    * `:min_create_revision` - the lower bound for key create revisions;
      filters out lesser create revisions.
    * `:max_create_revision` - the upper bound for key create revisions;
      filters out greater create revisions.
    * `:timeout` - indicates max time to wait for a response. Defaults to
      `:infinity`.

  ## Response

  The response is formed by a map such as:

      %{
        header: %{
          cluster_id: 16182920199522267672,
          member_id: 5619527364212512701,
          revision: 47077139,
          raft_term: 13
        },
        kvs [
          ...
        ],
        more: false,
        count: 1024
      }

  Details:

    * `:header` - all responses from etcd have an attached response header
      which includes cluster metadata for the response:
      * `:cluster_id` - the ID of the cluster generating the response.
      * `:member_id` - the ID of the member generating the response.
      * `:revision` - the revision of the key-value store when generating the
        response.
    * `:kvs` - the list of key-value pairs matched by the range request. When
      `:count_only` is `true`, `:kvs` is empty.
    * `:more` - indicates if there are more keys to return in the requested
      range if `:limit` is set.
    * `:count` - the total number of keys satisfying the range request.

  ## Key-Value pair

  The key-value pairs under `:kvs` are in the form of:

      %{
        create_revision: 46752355,
        key: "/foo",
        lease: 0,
        mod_revision: 46752355,
        value: "",
        version: 1
      }

    * `:key` - key bytes in binary. An empty key is not allowed.
    * `:value` - value bytes in binary.
    * `:version` - version is the version of the key. A deletion resets the
      version to zero and any modification of the key increases its version.
    * `:create_revision` - revision of the last creation on the key.
    * `:mod_revision` - revision of the last modification on the key.
    * `:lease` - the ID of the lease attached to the key. If lease is 0, then
      no lease is attached to the key.
  """
  @spec get(conn, Types.key(), [Types.get_opt()], timeout) ::
          {:ok, any} | {:error, Mint.Types.error()}
  def get(conn, key, opts \\ [], timeout \\ @default_timeout)
      when is_binary(key) and is_list(opts) do
    EtcdEx.Connection.unary(conn, :get, [key, opts], timeout)
  end

  @doc """
  Puts a key-value pair to Etcd.

  Both arguments `key` and `value` must be binaries.

  ## Options

  `put` accepts the following options:

    * `:lease` - the lease ID to associate with the key in the key-value store.
      A lease value of 0 indicates no lease.
    * `:prev_kv` - when set, responds with the key-value pair data before the
      update from this put call.
    * `:ignore_value` - when set, update the key without changing its current
      value. Returns an error if the key does not exist.
    * `:ignore_lease` - when set, update the key without changing its current
      lease. Returns an error if the key does not exist.
    * `:timeout` - indicates max time to wait for a response. Defaults to
      `:infinity`.
  """
  @spec put(conn, Types.key(), Types.value(), [Types.put_opt()], timeout) ::
          {:ok, any} | {:error, Mint.Types.error()}
  def put(conn, key, value, opts \\ [], timeout \\ @default_timeout)
      when is_binary(key) and is_binary(value) and is_list(opts) do
    EtcdEx.Connection.unary(conn, :put, [key, value, opts], timeout)
  end

  @doc """
  Deletes key-value pair(s) from Etcd.

  ## Options

  `delete` accepts the following options:

    * `:range_end` - the key range to delete.
    * `:prefix` - if true, sets up `:range_end` as `a+1`.
    * `:from_key` - if true, sets up `:range_end` as `key` to represent all
      keys after `key` argument.
    * `:prev_kv` - when set, return the contents of the deleted key-value
      pairs.
    * `:timeout` - indicates max time to wait for a response. Defaults to
      `:infinity`.
  """
  @spec delete(conn, Types.key(), [Types.delete_opt()], timeout) ::
          {:ok, any} | {:error, Mint.Types.error()}
  def delete(conn, key, opts \\ [], timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :delete, [key, opts], timeout)
  end

  @doc """
  Obtain a lease.

  The argument `ttl` is the advisory time-to-live, in seconds.

  The only option available is `:timeout` which indicates how long to wait for
  a response from the Etcd cluster.

  Leases are a mechanism for detecting client liveness. The cluster grants
  leases with a time-to-live. A lease expires if the etcd cluster does not
  receive a keepAlive within a given TTL period.

  To tie leases into the key-value store, each key may be attached to at most
  one lease. When a lease expires or is revoked, all keys attached to that
  lease will be deleted. Each expired key generates a delete event in the event
  history.

  ## Response

  The response looks like:

      %{
        ID: 4658271320501810998,
        TTL: 300,
        error: "",
        header: %{
          cluster_id: 16182920199522267672,
          member_id: 6198688855164797093,
          raft_term: 13,
          revision: 47427980
        }
      }

    * `:ID` - the lease ID for the granted lease.
    * `:TTL` - is the server selected time-to-live, in seconds, for the lease.
  """
  @spec grant(conn, Types.ttl(), timeout) :: {:ok, any} | {:error, Mint.Types.error()}
  def grant(conn, ttl, timeout \\ @default_timeout) when is_integer(ttl) and ttl >= 0 do
    EtcdEx.Connection.unary(conn, :grant, [ttl], timeout)
  end

  @doc """
  Revokes a previously granted lease.

  The response is of the form:

      %{
        header: %{
          cluster_id: 16182920199522267672,
          member_id: 6198688855164797093,
          raft_term: 13,
          revision: 47452709
        }
      }
  """
  @spec revoke(conn, Types.lease_id(), timeout) ::
          {:ok, any} | {:error, Mint.Types.error()}
  def revoke(conn, lease_id, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :revoke, [lease_id], timeout)
  end

  @doc """
  Refreshes an existing lease.

  The `lease_id` should have been obtained from a previous call to `grant/3`.

  ## Response

  The response looks like:

      %{
        ID: 4658271320501810998,
        TTL: 300,
        header: %{
          cluster_id: 16182920199522267672,
          member_id: 6198688855164797093,
          raft_term: 13,
          revision: 47428392
        }
      }

    * `:ID` - the lease that was refreshed with a new TTL.
    * `:TTL` - the new time-to-live, in seconds, that the lease has remaining.
  """
  @spec keep_alive(conn, Types.lease_id(), timeout) ::
          {:ok, any} | {:error, Mint.Types.error()}
  def keep_alive(conn, lease_id, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :keep_alive, [lease_id], timeout)
  end

  @doc """
  Checks the advisory time-to-live from a lease.

  The `lease_id` should have been obtained from a previous call to `grant/3`.

  If `with_keys` argument is `true`, the response will have the `:keys` field
  containing a list of all keys associated with the `lease_id`.

  ## Response

  The response looks like:

      %{
        ID: 4658271320501810998,
        TTL: 231,
        grantedTTL: 300,
        header: %{
          cluster_id: 16182920199522267672,
          member_id: 6198688855164797093,
          raft_term: 13,
          revision: 47429656
        },
        keys: []
      }

    * `:ID` - the queried lease ID.
    * `:TTL` - the time-to-live, in seconds, that the lease has remaining.
    * `:grantedTTL` - the time, in seconds, of the time-to-live requested when
      the lease was granted.
    * `:keys` - if `keys` is `true`, it will contain a list of keys that have
      been assigned to the lease.
  """
  @spec ttl(conn, Types.lease_id(), [Types.ttl_opt()], timeout) ::
          {:ok, any} | {:error, Mint.Types.error()}
  def ttl(conn, lease_id, opts \\ [], timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :ttl, [lease_id, opts], timeout)
  end

  @doc """
  List all leases from the Etcd cluster.

  The response is of the form:

      %{
        header: %{
          cluster_id: 16182920199522267672,
          member_id: 6198688855164797093,
          raft_term: 13,
          revision: 47430034
        },
        leases: [
          %{ID: 6322351403492000182},
          %{ID: 4658271320501761384},
          %{ID: 4658271320501772555}
        ]
      }

    * `:leases` - list of leases.
  """
  @spec leases(conn, timeout) :: {:ok, any} | {:error, Mint.Types.error()}
  def leases(conn, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :leases, [], timeout)
  end

  @doc """
  Opens a watch stream and watches for changes on keys.

  An etcd3 watch waits for changes to keys by continuously watching from a
  given revision, either current or historical, and streams key updates back to
  the client.

  On success, the return value has the form `{:ok, watch_ref}`. The `watch_ref`
  is a reference to the created stream, that can be used to correlate watching
  keys with the notification messages at the watching process inbox.

  In order to cancel the watch, use `cancel_watch/3`.

  Watch streams are recreated on etcd reconnections, and automatically closed
  upon watch process termination.

  The same watching process can watch multiple keys by simply calling `watch/5`
  multiple times passing the same `watching_process` argument.

  ## Watch streams

  According to the [Etcd documentation](https://etcd.io/docs/v3.5/learning/api/#watch-api),
  watches are long-running requests and use gRPC streams to stream event data.
  A watch stream is bi-directional; the client writes to the stream to
  establish watches and reads to receive watch events. A single watch stream
  can multiplex many distinct watches by tagging events with per-watch
  identifiers. This multiplexing helps reducing the memory footprint and
  connection overhead on the core etcd cluster.

  Watches make three guarantees about events:

    * Ordered - events are ordered by revision; an event will never appear on a
      watch if it precedes an event in time that has already been posted.
    * Reliable - a sequence of events will never drop any subsequence of
      events; if there are events ordered in time as `a` < `b` < `c`, then if
      the watch receives events `a` and `c`, it is guaranteed to receive `b`.
    * Atomic - a list of events is guaranteed to encompass complete revisions;
      updates in the same revision over multiple keys will not be split over
      several lists of events.

  ## Options

  `watch_params` accepts the following options:

    * `:range_end` - the key range to watch.
    * `:prefix` - if true, sets up `:range_end` as `a+1`.
    * `:from_key` - if true, sets up `:range_end` as `key` to represent all
      keys after `key` argument.
    * `:start_revision` - an optional revision for where to inclusively begin
      watching.  If not given, it will stream events following the revision of
      the watch creation response header revision. The entire available event
      history can be watched starting from the last compaction revision.
    * `:filters` - A list of event types to filter away at server side.
    * `:prev_kv` - when set, the watch receives the key-value data from before
      the event happens. This is useful for knowing what data has been
      overwritten.
    * `:timeout` - indicates max time to wait for a response. Defaults to
      `:infinity`.

  ## Watch events

  In response to a `watch`, the client process receives:

    * `{:etcd_watch_created, watch_ref}` - when the watch stream is created by
      the etcd server.
    * `{:etcd_watch_notify, watch_ref, response}` - for every etcd watch reply.
    * `{:etcd_watch_notify_progress, response}` - case the option
      `:progress_notify` is `true`.
    * `{:etcd_watch_canceled, watch_ref, reason}` - case etcd server cancels
      the watch stream for any reason. One important case to consider is when
      etcd compacts revisions, this is an irrecoverable error.

  Responses look like:

      %{
        header: %{
          cluster_id: 16182920199522267672,
          member_id: 9975446501980398855,
          raft_term: 13,
          revision: 47068291
        },
        watch_id: 1,
        created: false,
        canceled: false,
        compact_revision: 0,
        events: [
          ...
        ]
      }

    * `:watch_id` - the ID of the watch that corresponds to the response. As
      watch streams are recreated during reconnections, the watch ID is exposed
      only for informative purposes.
    * `:created` - set to `true` if the response is for a create watch request.
      `EtcdEx` transforms `created: true` responses into `:etcd_watch_created`
      messages.
    * `:canceled` - set to `true` if the response is for a cancel watch
      request. No further events will be sent to the canceled watcher. `EtcdEx`
      also transforms `canceled: true` responses into `:etcd_watch_canceled`
      messages.
    * `:compact_revision` - set to the minimum historical revision available to
      etcd if a watcher tries watching at a compacted revision. This happens
      when creating a watcher at a compacted revision or the watcher cannot
      catch up with the progress of the key-value store. The watcher will be
      canceled; creating new watches with the same start_revision will fail.
      `EtcdEx` transforms `canceled: true, compact_revision: n` responses into
      `:etcd_watch_canceled` messages, with `{:compated, n}` as reason.
    * `:events` - a list of new events in sequence corresponding to watch ID.

  ## Events

  Every change to every key is represented with event messages. An event
  message provides both the data and the type of update:

      %{
        kv: %{
          create_revision: 45178185,
          key: "foo",
          lease: 4658271320497843197,
          mod_revision: 47068291,
          value: "bar",
          version: 112815
        },
        type: :PUT
      }

    * `:type` - the kind of event. A `:PUT` type indicates new data has been
      stored to the key. A `:DELETE` indicates the key was deleted.
    * `:kv` - the KeyValue associated with the event. A `:PUT` event contains
      current key-value pair. A `:PUT` event with `:version` 1 indicates the
      creation of a key.  A `:DELETE` event contains the deleted key with its
      modification revision set to the revision of deletion.
    * `:prev_kv` - the key-value pair for the key from the revision immediately
      before the event. To save bandwidth, it is only filled out if the watch
      has explicitly enabled it.
  """
  @spec watch(conn, watching_process, Types.key(), [Types.watch_opt()], timeout) ::
          {:ok, watch_ref} | {:error, Mint.Types.error()}
  def watch(conn, watching_process, key, opts \\ [], timeout \\ @default_timeout) do
    EtcdEx.Connection.watch(conn, watching_process, key, opts, timeout)
  end

  @doc """
  Cancels all change events being watched by a process.

  It is safe to call `cancel_watch/3` for a process that isn't currently
  watching any key range.
  """
  @spec cancel_watch(conn, watching_process, timeout) ::
          :ok | {:error, Mint.Types.error()}
  def cancel_watch(conn, watching_process, timeout \\ @default_timeout) do
    EtcdEx.Connection.cancel_watch(conn, watching_process, timeout)
  end

  @doc """
  List watches started by the watching process.
  """
  @spec list_watches(conn, watching_process, timeout) ::
          [{watch_ref, Types.key(), [Types.watch_opt()]}]
  def list_watches(conn, watching_process, timeout \\ @default_timeout) do
    EtcdEx.Connection.list_watches(conn, watching_process, timeout)
  end

  @doc """
  Compacts the event history in the etcd key-value store.

  The key-value store should be periodically compacted or the event history
  will continue to grow indefinitely.
  """
  @spec compact(conn, Types.revision(), physical? :: boolean, timeout) ::
          {:ok, map} | {:error, any}
  def compact(conn, revision, physical?, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :compact, [revision, physical?], timeout)
  end

  @doc """
  Acquires a distributed shared lock on a given named lock.

  On success, it will return a unique key that exists so long as the lock is
  held by the caller. This key can be used in conjunction with transactions to
  safely ensure updates to etcd only occur while holding lock ownership. The
  lock is held until `unlock` is called on the `key` or the associated lease
  expires.
  """
  @spec lock(conn, Types.name(), Types.lease_id(), timeout) :: {:ok, map} | {:error, any}
  def lock(conn, name, lease_id, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :lock, [name, lease_id], timeout)
  end

  @doc """
  Takes a key returned by `lock/4` and releases the hold on lock.

  The next `lock/4` caller waiting for the lock will then be woken up and given
  ownership of the lock.
  """
  @spec unlock(conn, Types.key(), timeout) :: {:ok, map} | {:error, any}
  def unlock(conn, key, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :unlock, [key], timeout)
  end

  @doc """
  Adds a member into the cluster.
  """
  @spec add_member(conn, [Types.peer_url()], learner? :: boolean, timeout) ::
          {:ok, map} | {:error, any}
  def add_member(conn, peer_urls, learner?, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :add_member, [peer_urls, learner?], timeout)
  end

  @doc """
  Removes a member from the cluster.
  """
  @spec remove_member(conn, Types.member_id(), timeout) ::
          {:ok, map} | {:error, any}
  def remove_member(conn, member_id, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :remove_member, [member_id], timeout)
  end

  @doc """
  Updates a cluster member.
  """
  @spec update_member(conn, Types.member_id(), [Types.peer_url()], timeout) ::
          {:ok, map} | {:error, any}
  def update_member(conn, member_id, peer_urls, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :update_member, [member_id, peer_urls], timeout)
  end

  @doc """
  List all members in the cluster.
  """
  @spec list_members(conn, timeout) ::
          {:ok, map} | {:error, any}
  def list_members(conn, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :list_members, [], timeout)
  end

  @doc """
  Promotes a member from raft learner (non-voting) to raft voting member.
  """
  @spec promote_member(conn, Types.member_id(), timeout) ::
          {:ok, map} | {:error, any}
  def promote_member(conn, member_id, timeout \\ @default_timeout) do
    EtcdEx.Connection.unary(conn, :promote_member, [member_id], timeout)
  end
end