defmodule Instream.Encoder.Line do
@moduledoc """
Encoder for the InfluxDB line protocol.
"""
alias Instream.Decoder.RFC3339
@type point ::
%{
required(:fields) => map,
required(:measurement) => binary,
optional(:tags) => map,
optional(:timestamp) => non_neg_integer | binary | nil
}
| %{
__struct__: module,
fields: map,
tags: map,
timestamp: non_neg_integer | binary | nil
}
@doc """
Creates protocol contents for a list of data points.
"""
@spec encode([point()]) :: binary
def encode(points), do: encode(points, [])
defp encode([point | points], lines) do
line = encode_point(point)
encode(points, ["\n", line | lines])
end
defp encode([], ["\n" | lines]) do
lines
|> Enum.reverse()
|> IO.iodata_to_binary()
end
defp encode([], []), do: ""
defp append_fields(line, %{fields: fields}) do
content =
fields
|> Enum.reduce([], fn
{_, nil}, acc -> acc
{field, value}, acc -> [[encode_property(field), "=", encode_value(value)], "," | acc]
end)
|> Enum.reverse()
case content do
[] -> line
["," | encoded_fields] -> [line, " " | encoded_fields]
end
end
defp append_tags(line, %{tags: tags}) do
content =
tags
|> Enum.reduce([], fn
{_, nil}, acc -> acc
{tag, value}, acc -> [[encode_property(tag), "=", encode_property(value)], "," | acc]
end)
|> Enum.reverse()
case content do
[] -> line
encoded_tags -> [line | encoded_tags]
end
end
defp append_tags(line, _), do: line
defp append_timestamp(line, %{timestamp: nil}), do: line
defp append_timestamp(line, %{timestamp: ts}) when is_integer(ts),
do: [line, " ", Integer.to_string(ts)]
defp append_timestamp(line, %{timestamp: ts}) when is_binary(ts),
do: [line, " ", ts |> RFC3339.to_nanosecond() |> Integer.to_string()]
defp append_timestamp(line, _), do: line
defp encode_point(%{__struct__: series, fields: fields, tags: tags, timestamp: timestamp}) do
encode_point(%{
measurement: series.__meta__(:measurement),
fields: Map.from_struct(fields),
tags: Map.from_struct(tags),
timestamp: timestamp
})
end
defp encode_point(%{measurement: measurement} = point) do
[encode_property(measurement)]
|> append_tags(point)
|> append_fields(point)
|> append_timestamp(point)
end
defp encode_property(s) when is_binary(s) do
s
|> :binary.replace(",", "\\,", [:global])
|> :binary.replace(" ", "\\ ", [:global])
|> :binary.replace("=", "\\=", [:global])
end
defp encode_property(s), do: Kernel.to_string(s)
defp encode_value(i) when is_integer(i), do: [Integer.to_string(i), "i"]
defp encode_value(s) when is_binary(s),
do: ["\"", :binary.replace(s, "\"", "\\\"", [:global]), "\""]
defp encode_value(true), do: "true"
defp encode_value(false), do: "false"
defp encode_value(other), do: inspect(other)
end