Skip to main content

lib/bb/sensor/ina219.ex

# SPDX-FileCopyrightText: 2026 James Harton
#
# SPDX-License-Identifier: Apache-2.0

defmodule BB.Sensor.INA219 do
  @moduledoc """
  A BB sensor that polls an INA219 voltage / current / power monitor
  over I2C and publishes `BB.Message.Sensor.PowerState` messages.

  The INA219 is a general-purpose power monitor. Use it to watch a battery,
  a motor's draw for stall detection, a logic rail, a solar input, or any
  other electrical bus you can run a shunt resistor on.

  ## Example DSL Usage

      topology do
        link :chassis do
          sensor :main_bus, {BB.Sensor.INA219,
            bus: "i2c-1",
            address: 0x40,
            calibration: :calibrate_32V_2A,
            publish_rate: ~u(10 hertz)
          }
        end
      end

  ## Options

  - `bus` — I2C bus name (e.g. `"i2c-1"`) — required.
  - `address` — I2C address (default `0x40`).
  - `calibration` — One of `:calibrate_32V_2A` (default), `:calibrate_32V_1A`,
    `:calibrate_16V_400mA`. These match the helper functions on the underlying
    `INA219` library and assume a 0.1Ω shunt resistor (Adafruit breakout).
    Each preset implies a fixed `current_divisor` and `power_divisor`.
  - `publish_rate` — How often to read + publish (default `~u(1 hertz)`).

  ## Published Messages

  `BB.Message.Sensor.PowerState` published to `[:sensor | path]` where `path`
  is the sensor's position in the topology. Fields are in SI units (Volts,
  Amperes, Watts).

  Read failures crash the process — going silent on the topic would hide
  a dead sensor from the supervisor and from downstream consumers. The
  supervisor restarts the process per its restart strategy; if the device
  is genuinely gone (e.g. a USB-attached bus disappeared), `init/1` will
  fail to reacquire and the restart intensity limit propagates the failure
  up the tree.
  """

  use BB.Sensor

  import BB.Unit
  import BB.Unit.Option

  alias BB.Message
  alias BB.Message.Sensor.PowerState
  alias BB.Robot.Units
  alias Localize.Unit
  alias Wafer.Driver.Circuits.I2C, as: CircuitsI2C

  @calibrations [:calibrate_32V_2A, :calibrate_32V_1A, :calibrate_16V_400mA]

  @impl BB.Sensor
  def options_schema do
    Spark.Options.new!(
      bus: [
        type: :string,
        required: true,
        doc: "I2C bus name (e.g. \"i2c-1\")"
      ],
      address: [
        type: :integer,
        default: 0x40,
        doc: "I2C address of the INA219"
      ],
      calibration: [
        type: {:in, @calibrations},
        default: :calibrate_32V_2A,
        doc: "Calibration preset (matches INA219.calibrate_*/1 helpers)"
      ],
      publish_rate: [
        type: unit_type(compatible: :hertz),
        default: ~u(1 hertz),
        doc: "Rate at which to read the sensor and publish PowerState"
      ]
    )
  end

  @impl BB.Sensor
  def init(opts) do
    opts = Map.new(opts)
    {current_divisor, power_divisor} = divisors_for(opts.calibration)

    with {:ok, conn} <-
           CircuitsI2C.acquire(bus_name: opts.bus, address: opts.address),
         {:ok, ina} <-
           INA219.acquire(
             conn: conn,
             current_divisor: current_divisor,
             power_divisor: power_divisor
           ),
         {:ok, ina} <- apply(INA219, opts.calibration, [ina]) do
      publish_interval_ms = hertz_to_ms(opts.publish_rate)

      state = %{
        bb: opts.bb,
        ina: ina,
        publish_interval_ms: publish_interval_ms
      }

      schedule_tick(publish_interval_ms)
      {:ok, state}
    else
      {:error, reason} -> {:stop, reason}
    end
  end

  @impl BB.Sensor
  def handle_info(:tick, state) do
    {:ok, fields} = read(state.ina)
    frame_id = List.last(state.bb.path)
    message = Message.new!(PowerState, frame_id, fields)
    BB.publish(state.bb.robot, [:sensor | state.bb.path], message)
    schedule_tick(state.publish_interval_ms)
    {:noreply, state}
  end

  @impl BB.Sensor
  def handle_options(new_opts, state) do
    new_opts = Map.new(new_opts)
    publish_interval_ms = hertz_to_ms(new_opts.publish_rate)
    {:ok, %{state | publish_interval_ms: publish_interval_ms}}
  end

  defp read(ina) do
    with {:ok, bus_v} <- INA219.bus_voltage(ina),
         {:ok, current_ma} <- INA219.current(ina),
         {:ok, power_mw} <- INA219.power(ina),
         {:ok, shunt_mv} <- INA219.shunt_voltage(ina) do
      {:ok,
       [
         voltage: bus_v,
         current: current_ma / 1000.0,
         power: power_mw / 1000.0,
         shunt_voltage: shunt_mv / 1000.0
       ]}
    end
  end

  defp divisors_for(:calibrate_32V_2A), do: {10, 2}
  defp divisors_for(:calibrate_32V_1A), do: {25, 1}
  defp divisors_for(:calibrate_16V_400mA), do: {20, 1}

  defp hertz_to_ms(rate) do
    rate
    |> Unit.convert!("hertz")
    |> Units.extract_float()
    |> then(&round(1000 / &1))
  end

  defp schedule_tick(ms) do
    Process.send_after(self(), :tick, ms)
  end
end