Skip to main content

lib/ex_midi/midi_message.ex

defmodule ExMidi.MidiMessage do
  @moduledoc """
  Frozen (immutable) MIDI message struct with rich API.

  Inspired by mido's `Message` class. Messages are immutable and provide
  `copy/2`, `to_bytes/1`, `to_hex/1`, `to_string/1`, `to_map/1`, and `from_map/1`.

  ## Examples

      iex> msg = ExMidi.MidiMessage.new(:note_on, channel: 1, pitch: 60, velocity: 100)
      iex> ExMidi.MidiMessage.to_bytes(msg)
      <<144, 60, 100>>
      iex> ExMidi.MidiMessage.to_hex(msg)
      "90 3C 64"
      iex> msg2 = ExMidi.MidiMessage.copy(msg, channel: 2)
      iex> ExMidi.MidiMessage.to_bytes(msg2)
      <<146, 60, 100>>
  """

  import Bitwise

  @midi :midi

  @message_specs %{
    note_off: %{status: 0x80, value_names: [:channel, :note, :velocity]},
    note_on: %{status: 0x90, value_names: [:channel, :note, :velocity]},
    poly_aftertouch: %{status: 0xA0, value_names: [:channel, :note, :value]},
    cc: %{status: 0xB0, value_names: [:channel, :control, :value]},
    program_change: %{status: 0xC0, value_names: [:channel, :program]},
    aftertouch: %{status: 0xD0, value_names: [:channel, :value]},
    pitch_bend: %{status: 0xE0, value_names: [:channel, :pitch]},
    sysex: %{status: 0xF0, value_names: [:data]},
    quarter_frame: %{status: 0xF1, value_names: [:frame_type, :frame_value]},
    songpos: %{status: 0xF2, value_names: [:pos]},
    song_select: %{status: 0xF3, value_names: [:song]},
    tune_request: %{status: 0xF6, value_names: []},
    clock: %{status: 0xF8, value_names: []},
    rt_start: %{status: 0xFA, value_names: []},
    continue: %{status: 0xFB, value_names: []},
    rt_stop: %{status: 0xFC, value_names: []},
    active_sensing: %{status: 0xFE, value_names: []},
    rt_reset: %{status: 0xFF, value_names: []}
  }

  defstruct type: nil,
            time: 0,
            channel: 0,
            note: 0,
            velocity: 64,
            control: 0,
            value: 0,
            program: 0,
            pitch: 0,
            data: [],
            frame_type: 0,
            frame_value: 0,
            pos: 0,
            song: 0

  @type t :: %__MODULE__{}

  @doc "Create a new MIDI message. Channel is 1-based (1-16)."
  @spec new(atom(), keyword()) :: t()
  def new(type, opts \\ []) do
    spec = Map.get(@message_specs, type)

    if spec == nil do
      raise ArgumentError, "Unknown message type: #{inspect(type)}"
    end

    msg = %__MODULE__{type: type}

    msg =
      Enum.reduce(spec.value_names, msg, fn name, acc ->
        value = Keyword.get(opts, name, default_value(name))
        # Store channel as 0-based internally
        value =
          if name == :channel and is_integer(value) and value > 0, do: value - 1, else: value

        Map.put(acc, name, value)
      end)

    time = Keyword.get(opts, :time, 0)
    Map.put(msg, :time, time)
  end

  defp default_value(:velocity), do: 64
  defp default_value(:channel), do: 0
  defp default_value(:data), do: []
  defp default_value(_), do: 0

  @doc "Create a copy of the message with overridden attributes."
  @spec copy(t(), keyword()) :: t()
  def copy(%__MODULE__{} = msg, overrides) do
    overrides =
      Enum.map(overrides, fn
        {:channel, value} when is_integer(value) and value > 0 -> {:channel, value - 1}
        other -> other
      end)

    struct(msg, overrides)
  end

  @doc "Encode message to a binary."
  @spec to_bytes(t()) :: binary()
  def to_bytes(%__MODULE__{} = msg) do
    ch = msg.channel &&& 0x0F

    case msg.type do
      :sysex ->
        data_binary = Enum.reduce(msg.data, <<>>, fn b, acc -> acc <> <<b &&& 0x7F>> end)
        <<0xF0, data_binary::binary, 0xF7>>

      :tune_request ->
        <<0xF6>>

      :clock ->
        <<0xF8>>

      :rt_start ->
        <<0xFA>>

      :continue ->
        <<0xFB>>

      :rt_stop ->
        <<0xFC>>

      :active_sensing ->
        <<0xFE>>

      :rt_reset ->
        <<0xFF>>

      type ->
        spec = Map.get(@message_specs, type)
        status_byte = spec.status ||| ch
        encode_channel_message(type, msg, status_byte)
    end
  end

  defp encode_channel_message(:note_off, msg, sb),
    do: <<sb, msg.note &&& 0x7F, msg.velocity &&& 0x7F>>

  defp encode_channel_message(:note_on, msg, sb),
    do: <<sb, msg.note &&& 0x7F, msg.velocity &&& 0x7F>>

  defp encode_channel_message(:poly_aftertouch, msg, sb),
    do: <<sb, msg.note &&& 0x7F, msg.value &&& 0x7F>>

  defp encode_channel_message(:cc, msg, sb), do: <<sb, msg.control &&& 0x7F, msg.value &&& 0x7F>>
  defp encode_channel_message(:program_change, msg, sb), do: <<sb, msg.program &&& 0x7F>>
  defp encode_channel_message(:aftertouch, msg, sb), do: <<sb, msg.value &&& 0x7F>>

  defp encode_channel_message(:pitch_bend, msg, sb) do
    value = msg.pitch + 8192
    lsb = value &&& 0x7F
    msb = bsr(value, 7) &&& 0x7F
    <<sb, lsb, msb>>
  end

  defp encode_channel_message(:quarter_frame, msg, sb) do
    val = (msg.frame_type &&& 0x0F <<< 4) ||| (msg.frame_value &&& 0x0F)
    <<sb, val>>
  end

  defp encode_channel_message(:songpos, msg, sb) do
    value = msg.pos &&& 0x3FFF
    lsb = value &&& 0x7F
    msb = bsr(value, 7) &&& 0x7F
    <<sb, lsb, msb>>
  end

  defp encode_channel_message(:song_select, msg, sb), do: <<sb, msg.song &&& 0x7F>>

  @doc "Encode message to a hex string."
  @spec to_hex(t(), String.t()) :: String.t()
  def to_hex(%__MODULE__{} = msg, sep \\ " ") do
    msg
    |> to_bytes()
    |> :binary.bin_to_list()
    |> Enum.map_join(sep, &String.pad_leading(Integer.to_string(&1, 16), 2, "0"))
  end

  @doc "Convert message to a human-readable string."
  @spec to_string(t()) :: String.t()
  def to_string(%__MODULE__{} = msg) do
    type_str = Atom.to_string(msg.type)
    attrs = []
    ch = msg.channel + 1

    attrs = if ch != 1, do: attrs ++ ["channel=#{ch}"], else: attrs

    attrs =
      case msg.type do
        :note_on ->
          attrs ++ ["note=#{msg.note}", "velocity=#{msg.velocity}"]

        :note_off ->
          attrs ++ ["note=#{msg.note}", "velocity=#{msg.velocity}"]

        :cc ->
          attrs ++ ["control=#{msg.control}", "value=#{msg.value}"]

        :program_change ->
          attrs ++ ["program=#{msg.program}"]

        :pitch_bend ->
          attrs ++ ["pitch=#{msg.pitch}"]

        :aftertouch ->
          attrs ++ ["value=#{msg.value}"]

        :poly_aftertouch ->
          attrs ++ ["note=#{msg.note}", "value=#{msg.value}"]

        :sysex ->
          hex = Enum.map_join(msg.data, " ", &String.pad_leading("#{&1}", 2, "0"))
          attrs ++ ["data=(#{hex})"]

        :songpos ->
          attrs ++ ["pos=#{msg.pos}"]

        :song_select ->
          attrs ++ ["song=#{msg.song}"]

        :quarter_frame ->
          attrs ++ ["frame_type=#{msg.frame_type}", "frame_value=#{msg.frame_value}"]

        _ ->
          attrs
      end

    attrs = if msg.time != 0, do: attrs ++ ["time=#{msg.time}"], else: attrs
    "MidiMessage(#{type_str} #{Enum.join(attrs, " ")})"
  end

  @doc "Convert message to a map."
  @spec to_map(t()) :: map()
  def to_map(%__MODULE__{} = msg) do
    Map.from_struct(msg)
  end

  @doc "Create a message from a map."
  @spec from_map(map()) :: t()
  def from_map(%{"type" => type} = map) do
    opts = for {k, v} <- map, k != "type", do: {String.to_existing_atom(k), v}
    new(type, opts)
  end

  def from_map(%{type: type} = map) do
    opts = for {k, v} <- map, k != :type, do: {k, v}
    new(type, opts)
  end

  @doc "Create a message from a binary."
  @spec from_bytes(binary(), integer()) :: t()
  def from_bytes(bytes, time \\ 0) when is_binary(bytes) do
    parser = ExMidi.MidiParser.new()
    parser = ExMidi.MidiParser.feed_bytes(parser, :binary.bin_to_list(bytes))

    case ExMidi.MidiParser.parse(parser) do
      {{@midi, {type, values}}, _parser} -> to_struct({type, values}, time)
      nil -> raise ArgumentError, "Could not parse MIDI bytes"
    end
  end

  @doc "Create a message from a hex string."
  @spec from_hex(String.t(), integer()) :: t()
  def from_hex(hex_string, time \\ 0) do
    hex_string = String.replace(hex_string, ~r/\s/, "")
    bytes = for <<chunk::binary-size(2) <- hex_string>>, do: String.to_integer(chunk, 16)
    from_bytes(Enum.reduce(bytes, <<>>, fn b, acc -> acc <> <<b>> end), time)
  end

  @doc "Check if message is a control change for the given control number."
  @spec is_cc(t(), non_neg_integer() | nil) :: boolean()
  def is_cc(%__MODULE__{type: :cc}, nil), do: true
  def is_cc(%__MODULE__{type: :cc, control: ctrl}, ctrl), do: true
  def is_cc(_, _), do: false

  @doc "Check if message is a system realtime message."
  @spec is_realtime(t()) :: boolean()
  def is_realtime(%__MODULE__{type: type}) do
    type in [:tune_request, :clock, :rt_start, :continue, :rt_stop]
  end

  defp to_struct({type, values}, time) do
    opts = Keyword.put(values, :time, time)
    new(type, opts)
  end
end