lib/pid_control.ex

defmodule PIDControl do
  @moduledoc """
  A discrete implementation of a [PID controller](https://en.wikipedia.org/wiki/PID_controller) for building
  closed-loop control into embedded systems.

  ## Usage

  The `PIDControl.new/1` function returns an initialized `PIDControl` struct with the given configuration:

  ```elixir
  iex> pid = PIDControl.new(kp: 0.5, kd: 0.2, ki: 0.03)
  %PIDControl{...}
  ```

  Calling `PIDControl.step/3` performs one discrete cycle of the PID loop for the given `set_point` and `measurement`
  values.  The new PIDControl state is returned and the output can be accessed via the `output` key in the struct.

  ```elixir
  iex> pid = PIDControl.step(pid, 0.3, 0.312)
  %PIDControl{
  output: -0.11448221
  #...
  }
  ```

  The `step` function can be called in each successive input-output cycle of the underling sensor/actuator.

  ## Example 

  ```elixir
  defmodule Controller do
  use GenServer
  # ...

  def init(_) do
    Sensor.subcribe()
    pid = PIDControl.new(kp: 0.5, kd: 0.2, ki: 0.03)
    {:ok, pid}
  end

  def handle_info({:sensor, measurment}, pid) do
    pid = PIDControl.step(pid, @set_point, measurment)
    Actuator.command(pid.output)
    {:noreply, pid}
  end
  end
  ```

  ## Telemetry

  If `telemetry` is set to `true`, telemetry for the PID state will be emitted each time the `step` function is called.
  By default, the event name will be `[pid_control]` and will contain the following measurements.

  * `set_point` - Current set point
  * `measurement` - Most recent measurement
  * `error` - Current error that is being fed to the PID
  * `p` - Proportional component of the output
  * `i` - Integral component of the output
  * `d` - Derivative component of the output
  * `t` - The time value used for the step function.  This will always be the configured value `t` unless `use_system_t` is set to `true`.
  * `output` - Most recent output (which will be the sum of `p`, `i`, and `d` values)

  This is really useful for manual tuning when combined with something like `PheonixLiveDashboard`.

  """
  defstruct p: 0.0,
            i: 0.0,
            d: 0.0,
            t: 0.0,
            e0: nil,
            measurement_prev: 0.0,
            set_point_prev: 0.0,
            output: 0.0,
            time_prev: nil,
            config: %{}

  @default_config %{
    kp: 0.0,
    ki: 0.0,
    kd: 0.0,
    tau: 1.0,
    t: 1.0,
    output_min: -1.0,
    output_max: 1.0,
    use_system_t: false,
    zero_d_on_set_point_change: false,
    telemetry: false,
    telemetry_event_name: [:pid_control]
  }

  @type config() :: %{
          kp: float(),
          ki: float(),
          kd: float(),
          tau: float(),
          t: float(),
          output_min: float(),
          output_max: float(),
          use_system_t: boolean(),
          zero_d_on_set_point_change: boolean(),
          telemetry: boolean(),
          telemetry_event_name: list(atom())
        }
  @type t() :: %__MODULE__{
          p: float(),
          i: float(),
          d: float(),
          t: float(),
          e0: float() | nil,
          measurement_prev: float(),
          set_point_prev: float(),
          output: float(),
          time_prev: integer() | nil,
          config: config()
        }

  @doc """
  Creates a new PID controller.

  Config Options:

  * **`kp`, `ki`, `kp`** - Parameters for each term in the PID.  Any left blank will be set to `0`
  * **`tau`** - Low-pass filter parameter for the calculated `d` term.  A value of `1` will bypass the filtering.
    A value between `0` and `1` will filter out high-frequency noise in the derivative term.
  * **`t`** - Time factor for calculating the `i` and `d` term of the PID.  A value of `1` will ignore
    any time parameters and treat each `step` as a single time unit.  A value of `0.1`, for example,
    will treat each `step` as a tenth of a time unit.  If `use_system_t` is set to `true`, this value
    is ignored in favor of the system time (in seconds) between calls to `step`.
  * **`output_min` and `output_max`** - Minimum and maximum values that will be output from the PID. Any other
    values outside of this range will be clamped. These same values are used for anti-windup of the `i` term.
    Default is `-1` and `1`, respectively.
  * **`use_system_t`** - When set to `true`, the time used to calculate `d` and `i` terms of the PID will be
    derived from the measured time since the last call to `step` (using `System.monotonic_time/0`). When set
    to false, the configured value of `t` will be used. Defaults to `false`.
  * **`zero_d_on_set_point_change`** - When the set point changes rapidly, the d-term will suddenly spike, causing
    a sudden change in the PID output. Setting `tau` to a lower value will lessen this effect, but setting 
    `zero_d_on_set_point_change` will force the d-term to zero for the step when the set_point changes.  This is 
    useful for situations when the set point changes suddenly and occasionally, as opposed to situations where 
    the set_point is gradually changes (as when following a profile curve).  Defaults to `false`.
  * **`telemetry`** - When set to `true`, the PID will automatically emit its own telemetry after each `step`.
    Defaults to `false`.
  * **`telemetry_prefix`** - If `telemetry` is set to `true`, this value defines the name that the event is published
    under.  Defaults to `[:pid_control]`
  """
  @spec new(keyword()) :: t()
  def new(config \\ []) do
    %__MODULE__{config: @default_config} |> apply_config(config)
  end

  defp apply_config(%__MODULE__{} = pid, config) do
    pid |> Map.put(:config, Map.merge(pid.config, Enum.into(config, %{})))
  end

  @doc """
  Performs one step in the control loop.  Given the set point and measurement, the PID state
  is advanced and the output of the PID is set to the `output` field of the `PIDControl` struct.
  """
  @spec step(t(), float(), float()) :: t()
  def step(pid, set_point, measurement) do
    time = System.monotonic_time()

    t =
      if pid.config.use_system_t do
        t_actual(pid, time)
      else
        pid.config.t
      end

    e0 = set_point - measurement

    # P term is simply linear with the error
    p = pid.config.kp * e0

    # D term is linear on the change in error, passed through a low-pass filter
    last_error =
      cond do
        pid.e0 == nil -> e0
        set_point != pid.set_point_prev and pid.config.zero_d_on_set_point_change -> e0
        true -> pid.e0
      end

    d_raw = pid.config.kd * (e0 - last_error) / max(t, 0.001)
    d = pid.d + pid.config.tau * (d_raw - pid.d)

    # I term is linear to the sum of all previous errors, with anti-windup
    i_raw = pid.i + pid.config.ki * e0 * t
    i = clamp(i_raw, pid.config.output_min, pid.config.output_max)

    output = clamp(p + d + i, pid.config.output_min, pid.config.output_max)

    %{
      pid
      | p: p,
        i: i,
        d: d,
        t: t,
        e0: e0,
        measurement_prev: measurement,
        set_point_prev: set_point,
        output: output,
        time_prev: time
    }
    |> emit_telemetry()
  end

  defp t_actual(pid, time) do
    System.convert_time_unit(
      time - (Map.get(pid, :time_prev) || time),
      :native,
      :microsecond
    ) / 1_000_000.0
  end

  defp emit_telemetry(%__MODULE__{config: %{telemetry: true}} = pid) do
    :telemetry.execute(
      pid.config.telemetry_event_name,
      %{
        set_point: pid.set_point_prev,
        measurement: pid.measurement_prev,
        error: pid.e0,
        output: pid.output,
        p: pid.p,
        i: pid.i,
        d: pid.d,
        t: pid.t
      }
    )

    pid
  end

  defp emit_telemetry(pid), do: pid

  defp clamp(n, a, b) when a > b, do: clamp(n, b, a)

  defp clamp(n, a, b) when is_float(n) or is_float(a) or is_float(b),
    do: (n / 1) |> max(a / 1) |> min(b / 1)

  defp clamp(n, a, b), do: n |> max(a) |> min(b)
end