defmodule Segment.Http.Stub do
@moduledoc """
The `Segment.Http.Stub` is used to replace the Tesla adapter with something that logs and returns success. It is used if `send_to_http` has been set to false
"""
require Logger
def call(env, _opts) do
Logger.debug("[Segment] HTTP API called with #{inspect(env)}")
{:ok, %{env | status: 200, body: ""}}
end
end
defmodule Segment.Http do
@moduledoc """
`Segment.Http` is the underlying implementation for making calls to the Segment HTTP API.
The `send/2` and `batch/4` methods can be used for sending events or batches of events to the API. The sending can be configured with
```elixir
config :segment,
send_to_http: true
retry_attempts: 3,
retry_expiry: 10_000,
retry_start: 100
```
* `config :segment, :retry_attempts` The number of times to retry sending against the segment API. Default value is 3
* `config :segment, :retry_expiry` The maximum time (in ms) spent retrying. Default value is 10000 (10 seconds)
* `config :segment, :retry_start` The time (in ms) to start the first retry. Default value is 100
* `config :segment, :send_to_http` If set to `false`, the library will override the Tesla Adapter implementation to only log segment calls to `debug` but not make any actual API calls. This can be useful if you want to switch off Segment for test or dev. Default value is true
The retry uses a linear back-off strategy when retrying the Segment API.
Additionally a different Tesla Adapter can be used if you want to use something other than Hackney.
* `config :segment, :tesla, :adapter` This config option allows for overriding the HTTP Adapter for Tesla (which the library defaults to Hackney).This can be useful if you prefer something else, or want to mock the adapter for testing.
"""
@type client :: Tesla.Client.t()
@type adapter :: Tesla.Client.adapter()
require Logger
use Retry
@doc """
Create a Tesla client with the Segment Source Write API Key
"""
@spec client(String.t()) :: client()
def client(api_key) do
adapter =
case Segment.Config.send_to_http() do
true ->
Application.get_env(:segment, :tesla)[:adapter] ||
{Tesla.Adapter.Hackney, [recv_timeout: 30_000]}
false ->
{Segment.Http.Stub, []}
end
client(api_key, adapter)
end
@doc """
Create a Tesla client with the Segment Source Write API Key and the given Tesla adapter
"""
@spec client(String.t(), adapter()) :: client()
def client(api_key, adapter) do
middleware = [
{Tesla.Middleware.BaseUrl, Segment.Config.api_url()},
Tesla.Middleware.JSON,
{Tesla.Middleware.BasicAuth, %{username: api_key, password: ""}}
]
Tesla.client(middleware, adapter)
end
@doc """
Send a list of Segment events as a batch
"""
@spec send(client(), list(Segment.segment_event())) :: :ok | :error
def send(client, events) when is_list(events), do: batch(client, events)
@spec send(client(), Segment.segment_event()) :: :ok | :error
def send(client, event) do
:telemetry.span([:segment, :send], %{event: event}, fn ->
tesla_result =
make_request(client, event.type, prepare_events(event), Segment.Config.retry_attempts())
case process_send_post_result(tesla_result) do
:ok ->
{:ok, %{event: event, status: :ok, result: tesla_result}}
:error ->
{:error, %{event: event, status: :error, error: tesla_result, result: tesla_result}}
end
end)
end
defp process_send_post_result(tesla_result) do
case tesla_result do
{:ok, %{status: status}} when status == 200 ->
:ok
{:ok, %{status: status}} when status == 400 ->
Logger.error("[Segment] Call Failed. JSON too large or invalid")
:error
{:error, err} ->
Logger.error(
"[Segment] Call Failed after #{Segment.Config.retry_attempts()} retries. #{inspect(err)}"
)
:error
err ->
Logger.error("[Segment] Call Failed #{inspect(err)}")
:error
end
end
@doc """
Send a list of Segment events as a batch.
The `batch` function takes optional arguments for context and integrations which can
be applied to the entire batch of events. See [Segment's docs](https://segment.com/docs/sources/server/http/#batch)
"""
@spec batch(client(), list(Segment.segment_event()), map() | nil, map() | nil) :: :ok | :error
def batch(client, events, context \\ nil, integrations \\ nil) do
:telemetry.span([:segment, :batch], %{events: events}, fn ->
data =
%{batch: prepare_events(events)}
|> add_if(:context, context)
|> add_if(:integrations, integrations)
tesla_result = make_request(client, "batch", data, Segment.Config.retry_attempts())
case process_batch_post_result(tesla_result, events) do
:ok ->
{:ok, %{events: events, status: :ok, result: tesla_result}}
:error ->
{:error, %{events: events, status: :error, error: tesla_result, result: tesla_result}}
end
end)
end
defp process_batch_post_result(tesla_result, events) do
case tesla_result do
{:ok, %{status: status}} when status == 200 ->
:ok
{:ok, %{status: status}} when status == 400 ->
Logger.error(
"[Segment] Batch call of #{length(events)} events failed. JSON too large or invalid"
)
:error
{:error, err} ->
Logger.error(
"[Segment] Batch call of #{length(events)} events failed after #{Segment.Config.retry_attempts()} retries. #{inspect(err)}"
)
:error
err ->
Logger.error("[Segment] Batch callof #{length(events)} events failed #{inspect(err)}")
:error
end
end
defp make_request(client, url, data, retries) when retries > 0 do
retry with:
linear_backoff(Segment.Config.retry_start(), 2)
|> cap(Segment.Config.retry_expiry())
|> Stream.take(retries) do
Tesla.post(client, url, data)
after
result -> result
else
error -> error
end
end
defp make_request(client, url, data, _retries) do
Tesla.post(client, url, data)
end
defp prepare_events(items) when is_list(items), do: Enum.map(items, &prepare_events/1)
defp prepare_events(item) do
Map.from_struct(item)
|> prep_context()
|> add_sent_at()
|> drop_nils()
end
defp drop_nils(map) do
map
|> Enum.filter(fn
{_, %{} = item} when map_size(item) == 0 -> false
{_, nil} -> false
{_, _} -> true
end)
|> Enum.into(%{})
end
defp prep_context(%{context: nil} = map),
do: %{map | context: map_content(Segment.Analytics.Context.new())}
defp prep_context(%{context: context} = map), do: %{map | context: map_content(context)}
defp prep_context(map),
do: Map.put_new(map, :context, map_content(Segment.Analytics.Context.new()))
defp map_content(%Segment.Analytics.Context{} = context), do: Map.from_struct(context)
defp map_content(context) when is_map(context), do: context
defp add_sent_at(%{sentAt: nil} = map), do: Map.put(map, :sentAt, DateTime.utc_now())
defp add_sent_at(map), do: Map.put_new(map, :sentAt, DateTime.utc_now())
defp add_if(map, _key, nil), do: map
defp add_if(map, key, value), do: Map.put_new(map, key, value)
end