defmodule ExMidi.MidiMessage do
@moduledoc """
Frozen (immutable) MIDI message struct with rich API.
Inspired by mido's `Message` class. Messages are immutable and provide
`copy/2`, `to_bytes/1`, `to_hex/1`, `to_string/1`, `to_map/1`, and `from_map/1`.
## Examples
iex> msg = ExMidi.MidiMessage.new(:note_on, channel: 1, pitch: 60, velocity: 100)
iex> ExMidi.MidiMessage.to_bytes(msg)
<<144, 60, 100>>
iex> ExMidi.MidiMessage.to_hex(msg)
"90 3C 64"
iex> msg2 = ExMidi.MidiMessage.copy(msg, channel: 2)
iex> ExMidi.MidiMessage.to_bytes(msg2)
<<146, 60, 100>>
"""
import Bitwise
@midi :midi
@message_specs %{
note_off: %{status: 0x80, value_names: [:channel, :note, :velocity]},
note_on: %{status: 0x90, value_names: [:channel, :note, :velocity]},
poly_aftertouch: %{status: 0xA0, value_names: [:channel, :note, :value]},
cc: %{status: 0xB0, value_names: [:channel, :control, :value]},
program_change: %{status: 0xC0, value_names: [:channel, :program]},
aftertouch: %{status: 0xD0, value_names: [:channel, :value]},
pitch_bend: %{status: 0xE0, value_names: [:channel, :pitch]},
sysex: %{status: 0xF0, value_names: [:data]},
quarter_frame: %{status: 0xF1, value_names: [:frame_type, :frame_value]},
songpos: %{status: 0xF2, value_names: [:pos]},
song_select: %{status: 0xF3, value_names: [:song]},
tune_request: %{status: 0xF6, value_names: []},
clock: %{status: 0xF8, value_names: []},
rt_start: %{status: 0xFA, value_names: []},
continue: %{status: 0xFB, value_names: []},
rt_stop: %{status: 0xFC, value_names: []},
active_sensing: %{status: 0xFE, value_names: []},
rt_reset: %{status: 0xFF, value_names: []}
}
defstruct type: nil,
time: 0,
channel: 0,
note: 0,
velocity: 64,
control: 0,
value: 0,
program: 0,
pitch: 0,
data: [],
frame_type: 0,
frame_value: 0,
pos: 0,
song: 0
@type t :: %__MODULE__{}
@doc "Create a new MIDI message. Channel is 1-based (1-16)."
@spec new(atom(), keyword()) :: t()
def new(type, opts \\ []) do
spec = Map.get(@message_specs, type)
if spec == nil do
raise ArgumentError, "Unknown message type: #{inspect(type)}"
end
msg = %__MODULE__{type: type}
msg =
Enum.reduce(spec.value_names, msg, fn name, acc ->
value = Keyword.get(opts, name, default_value(name))
# Store channel as 0-based internally
value =
if name == :channel and is_integer(value) and value > 0, do: value - 1, else: value
Map.put(acc, name, value)
end)
time = Keyword.get(opts, :time, 0)
Map.put(msg, :time, time)
end
defp default_value(:velocity), do: 64
defp default_value(:channel), do: 0
defp default_value(:data), do: []
defp default_value(_), do: 0
@doc "Create a copy of the message with overridden attributes."
@spec copy(t(), keyword()) :: t()
def copy(%__MODULE__{} = msg, overrides) do
overrides =
Enum.map(overrides, fn
{:channel, value} when is_integer(value) and value > 0 -> {:channel, value - 1}
other -> other
end)
struct(msg, overrides)
end
@doc "Encode message to a binary."
@spec to_bytes(t()) :: binary()
def to_bytes(%__MODULE__{} = msg) do
ch = msg.channel &&& 0x0F
case msg.type do
:sysex ->
data_binary = Enum.reduce(msg.data, <<>>, fn b, acc -> acc <> <<b &&& 0x7F>> end)
<<0xF0, data_binary::binary, 0xF7>>
:tune_request ->
<<0xF6>>
:clock ->
<<0xF8>>
:rt_start ->
<<0xFA>>
:continue ->
<<0xFB>>
:rt_stop ->
<<0xFC>>
:active_sensing ->
<<0xFE>>
:rt_reset ->
<<0xFF>>
type ->
spec = Map.get(@message_specs, type)
status_byte = spec.status ||| ch
encode_channel_message(type, msg, status_byte)
end
end
defp encode_channel_message(:note_off, msg, sb),
do: <<sb, msg.note &&& 0x7F, msg.velocity &&& 0x7F>>
defp encode_channel_message(:note_on, msg, sb),
do: <<sb, msg.note &&& 0x7F, msg.velocity &&& 0x7F>>
defp encode_channel_message(:poly_aftertouch, msg, sb),
do: <<sb, msg.note &&& 0x7F, msg.value &&& 0x7F>>
defp encode_channel_message(:cc, msg, sb), do: <<sb, msg.control &&& 0x7F, msg.value &&& 0x7F>>
defp encode_channel_message(:program_change, msg, sb), do: <<sb, msg.program &&& 0x7F>>
defp encode_channel_message(:aftertouch, msg, sb), do: <<sb, msg.value &&& 0x7F>>
defp encode_channel_message(:pitch_bend, msg, sb) do
value = msg.pitch + 8192
lsb = value &&& 0x7F
msb = bsr(value, 7) &&& 0x7F
<<sb, lsb, msb>>
end
defp encode_channel_message(:quarter_frame, msg, sb) do
val = (msg.frame_type &&& 0x0F <<< 4) ||| (msg.frame_value &&& 0x0F)
<<sb, val>>
end
defp encode_channel_message(:songpos, msg, sb) do
value = msg.pos &&& 0x3FFF
lsb = value &&& 0x7F
msb = bsr(value, 7) &&& 0x7F
<<sb, lsb, msb>>
end
defp encode_channel_message(:song_select, msg, sb), do: <<sb, msg.song &&& 0x7F>>
@doc "Encode message to a hex string."
@spec to_hex(t(), String.t()) :: String.t()
def to_hex(%__MODULE__{} = msg, sep \\ " ") do
msg
|> to_bytes()
|> :binary.bin_to_list()
|> Enum.map_join(sep, &String.pad_leading(Integer.to_string(&1, 16), 2, "0"))
end
@doc "Convert message to a human-readable string."
@spec to_string(t()) :: String.t()
def to_string(%__MODULE__{} = msg) do
type_str = Atom.to_string(msg.type)
attrs = []
ch = msg.channel + 1
attrs = if ch != 1, do: attrs ++ ["channel=#{ch}"], else: attrs
attrs =
case msg.type do
:note_on ->
attrs ++ ["note=#{msg.note}", "velocity=#{msg.velocity}"]
:note_off ->
attrs ++ ["note=#{msg.note}", "velocity=#{msg.velocity}"]
:cc ->
attrs ++ ["control=#{msg.control}", "value=#{msg.value}"]
:program_change ->
attrs ++ ["program=#{msg.program}"]
:pitch_bend ->
attrs ++ ["pitch=#{msg.pitch}"]
:aftertouch ->
attrs ++ ["value=#{msg.value}"]
:poly_aftertouch ->
attrs ++ ["note=#{msg.note}", "value=#{msg.value}"]
:sysex ->
hex = Enum.map_join(msg.data, " ", &String.pad_leading("#{&1}", 2, "0"))
attrs ++ ["data=(#{hex})"]
:songpos ->
attrs ++ ["pos=#{msg.pos}"]
:song_select ->
attrs ++ ["song=#{msg.song}"]
:quarter_frame ->
attrs ++ ["frame_type=#{msg.frame_type}", "frame_value=#{msg.frame_value}"]
_ ->
attrs
end
attrs = if msg.time != 0, do: attrs ++ ["time=#{msg.time}"], else: attrs
"MidiMessage(#{type_str} #{Enum.join(attrs, " ")})"
end
@doc "Convert message to a map."
@spec to_map(t()) :: map()
def to_map(%__MODULE__{} = msg) do
Map.from_struct(msg)
end
@doc "Create a message from a map."
@spec from_map(map()) :: t()
def from_map(%{"type" => type} = map) do
opts = for {k, v} <- map, k != "type", do: {String.to_existing_atom(k), v}
new(type, opts)
end
def from_map(%{type: type} = map) do
opts = for {k, v} <- map, k != :type, do: {k, v}
new(type, opts)
end
@doc "Create a message from a binary."
@spec from_bytes(binary(), integer()) :: t()
def from_bytes(bytes, time \\ 0) when is_binary(bytes) do
parser = ExMidi.MidiParser.new()
parser = ExMidi.MidiParser.feed_bytes(parser, :binary.bin_to_list(bytes))
case ExMidi.MidiParser.parse(parser) do
{{@midi, {type, values}}, _parser} -> to_struct({type, values}, time)
nil -> raise ArgumentError, "Could not parse MIDI bytes"
end
end
@doc "Create a message from a hex string."
@spec from_hex(String.t(), integer()) :: t()
def from_hex(hex_string, time \\ 0) do
hex_string = String.replace(hex_string, ~r/\s/, "")
bytes = for <<chunk::binary-size(2) <- hex_string>>, do: String.to_integer(chunk, 16)
from_bytes(Enum.reduce(bytes, <<>>, fn b, acc -> acc <> <<b>> end), time)
end
@doc "Check if message is a control change for the given control number."
@spec is_cc(t(), non_neg_integer() | nil) :: boolean()
def is_cc(%__MODULE__{type: :cc}, nil), do: true
def is_cc(%__MODULE__{type: :cc, control: ctrl}, ctrl), do: true
def is_cc(_, _), do: false
@doc "Check if message is a system realtime message."
@spec is_realtime(t()) :: boolean()
def is_realtime(%__MODULE__{type: type}) do
type in [:tune_request, :clock, :rt_start, :continue, :rt_stop]
end
defp to_struct({type, values}, time) do
opts = Keyword.put(values, :time, time)
new(type, opts)
end
end