lib/grizzly/zwave_firmware.ex

defmodule Grizzly.ZWaveFirmware do
  @moduledoc """
  Z-Wave module firmware upgrade support.

  ## Note on `zw_programmer`

  This module includes functionality to attempt to recover from a failed
  firmware upgrade that has left the Z-Wave module stuck at the bootloader. This
  functionality relies on `zw_programmer`'s `-a` flag, which (as of the 7.18.03
  release) does not work with 700- or 800-series modules.

  If you are using a 700- or 800-series module, you will need to implement this
  functionality yourself. This module invokes `zw_programmer` with a `-7` flag
  along with the `-a` flag when a 700/800-series module is specified.

  Because Z/IP Gateway is not licensed for distribution, we cannot provide a
  patch file. The modification is fairly straightforward, but if you need
  assistance, please open an issue on GitHub.
  """

  alias Grizzly.{FirmwareError, Options}

  require Logger

  @typedoc """
  The various status of a firmware update of the Z-Wave module

  * `:started` - the firmware update has been initiated and all validation of
    the firmware update is complete.
  * `{:done, :success}` - the firmware update of the Z-Wave module is successful.
  * `{:done, :skipped}` - no firmware update was applied.
  * `{:error, reason}` - A firmware update of the Z-Wave module was attempted
    but failed for some `reason`.
  """
  @type update_status ::
          :started | {:done, :success | :skipped} | {:error, FirmwareError.t()}

  @typedoc """
  Chip series (e.g. 500, 700, 800).
  """
  @type chip_series :: pos_integer()

  @typedoc """
  A function that performs a hard reset of the Z-Wave module. Used to detect
  if the module is stuck at the bootloader due to a previous failed upgrade.
  """
  @type module_reset_fun :: (-> :ok)

  defmodule UpgradeSpec do
    @moduledoc """
    A firmware upgrade specification used to determine if a firmware image can
    be applied given the running firmware version.
    """

    @typedoc """
    Firmware upgrade specification.

    * `version` - the version of the firmware image
    * `path` - the path to the firmware image
    * `applies_to` - the version requirement for the running firmware version
      that must be met for the upgrade to be applied
    """
    @type t :: %__MODULE__{
            version: Version.t(),
            path: Path.t(),
            applies_to: Version.requirement()
          }

    @enforce_keys [:version, :path, :applies_to]
    defstruct [:version, :path, :applies_to]

    @spec new(map() | keyword()) :: t()
    def new(opts) do
      struct(__MODULE__, opts)
    end

    @doc "Whether the spec applies given the current version."
    @spec applies?(t(), Version.t()) :: boolean()
    def applies?(%__MODULE__{} = spec, current_version) do
      # Only upgrades apply -- with the default bootloader, it's not possible to
      # downgrade or re-apply the same version.
      Version.compare(current_version, spec.version) == :lt &&
        Version.match?(current_version, spec.applies_to)
    end
  end

  @doc """
  Update the firmware on the Z-Wave module if an update is available
  """
  @spec maybe_run_zwave_firmware_update(Grizzly.Options.t()) :: :ok
  def maybe_run_zwave_firmware_update(%Options{zwave_firmware: %{enabled: true}} = opts) do
    report(opts, :started)
    version = zwave_module_version(opts)

    case find_upgrade_spec(opts.zwave_firmware.specs, version) do
      nil ->
        Logger.info("[Grizzly] No matching firmware upgrade spec")
        report(opts, {:done, :skipped})

      %UpgradeSpec{version: target_version, path: path} ->
        Logger.info(
          "[Grizzly] Attempting to upgrade Z-Wave module to #{to_string(target_version)}"
        )

        apm_flag? = is_nil(version)
        upload_zwave_firmware_update(opts, path, apm_flag?)
        report(opts, {:done, :success})
    end

    :ok
  rescue
    e in FirmwareError ->
      error = %FirmwareError{e | stack_trace: Exception.format(:error, e, __STACKTRACE__)}
      report(opts, {:error, error})
      :ok

    # Firmware update not possible due to code error. Skip it.
    e in RuntimeError ->
      error = %FirmwareError{
        message: "Runtime error: #{e.message}",
        stack_trace: Exception.format(:error, e, __STACKTRACE__),
        fatal?: false
      }

      report(opts, {:error, error})
      :ok
  end

  def maybe_run_zwave_firmware_update(_), do: :ok

  @doc """
  Find an upgrade spec that applies to the current version. In case of multiple
  matches, returns the spec with the highest version. When the current version
  is nil, returns the spec with the highest version. If no specs match, returns
  nil.
  """
  @spec find_upgrade_spec([UpgradeSpec.t()], Version.t() | nil) ::
          UpgradeSpec.t() | nil
  def find_upgrade_spec(specs, nil = _current_version) do
    specs
    |> Enum.sort_by(& &1.version, {:desc, Version})
    |> List.first()
  end

  def find_upgrade_spec(specs, current_version) do
    specs
    |> Enum.sort_by(& &1.version, {:desc, Version})
    |> Enum.find(&UpgradeSpec.applies?(&1, current_version))
  end

  @sapi_version_regex ~r/Chip type: (?<chip_type>\d+).*SDK:\s*(?<major>\d+)\.(?<minor>\d+)\.(?<patch>\d+)/m

  @doc false
  @spec zwave_module_version(Options.t()) :: Version.t() | nil
  def zwave_module_version(opts) do
    reset_zwave_module(opts)

    {answer, _} = zw_programmer(opts, ["-t"])
    Logger.debug("[Grizzly] Extracting current Z-Wave firmware version from #{inspect(answer)}")

    cond do
      Regex.match?(@sapi_version_regex, answer) ->
        %{"major" => major_s, "minor" => minor_s, "patch" => patch_s} =
          Regex.named_captures(@sapi_version_regex, answer)

        Logger.info("[Grizzly] Current Z-Wave firmware version: #{major_s}.#{minor_s}.#{patch_s}")

        {major, ""} = Integer.parse(major_s)
        {minor, ""} = Integer.parse(minor_s)
        {patch, ""} = Integer.parse(patch_s)
        %Version{major: major, minor: minor, patch: patch}

      # Heuristic for detecting when the Z-Wave module is stuck at the bootloader since
      # the SAPI won't be available.
      String.contains?(answer, "SerialAPI: Retransmission 7") ->
        Logger.warning("[Grizzly] Z-Wave module appears to be stuck at the bootloader")
        nil

      # This happens when zw_programmer can't open the serial port
      String.contains?(answer, "Serial Init failed") ->
        raise FirmwareError, message: "zw_programmer said serial init failed", fatal?: true

      true ->
        raise FirmwareError, message: "unexpected zw_programmer output: #{answer}", fatal?: true
    end
  end

  @spec upload_zwave_firmware_update(Options.t(), Path.t(), boolean()) :: :ok
  defp upload_zwave_firmware_update(opts, firmware_path, apm_flag?) do
    Logger.info("[Grizzly] Uploading firmware image #{firmware_path} to #{opts.serial_port}}")

    args =
      cond do
        apm_flag? && opts.zwave_firmware.chip_series in [700, 800] ->
          ["-a", "-7"]

        apm_flag? ->
          ["-a"]

        true ->
          []
      end

    {result, code} = zw_programmer(opts, ["-p", firmware_path | args])

    if code == 0 do
      Logger.info("[Grizzly] Z-Wave firmware upgrade successful")
      :ok
    else
      reset_zwave_module(opts)
      raise FirmwareError, message: "zw_programmer exited with #{code}: #{result}", fatal?: true
    end
  end

  @spec zw_programmer(Options.t(), [binary()]) :: {binary(), non_neg_integer()}
  defp zw_programmer(opts, args) do
    args = ["-s", opts.serial_port | args]

    Logger.info("[Grizzly] Executing #{opts.zw_programmer_path} #{Enum.join(args, " ")}")

    MuonTrap.cmd(opts.zw_programmer_path, args)
  end

  @spec reset_zwave_module(Options.t()) :: :ok
  defp reset_zwave_module(%Options{zwave_firmware: opts}) do
    if is_function(opts[:module_reset_fun], 0), do: opts.module_reset_fun.()

    :ok
  end

  @spec report(Options.t(), update_status()) :: :ok
  defp report(opts, status) do
    _ = Process.spawn(fn -> opts.status_reporter.zwave_firmware_update_status(status) end, [])

    :ok
  end
end