defmodule ReqBigQuery do
@moduledoc """
`Req` plugin for [Google BigQuery](https://cloud.google.com/bigquery/docs/reference/rest).
ReqBigQuery makes it easy to make BigQuery queries. It uses `Goth` for authentication.
Query results are decoded into the `ReqBigQuery.Result` struct.
The struct implements the `Table.Reader` protocol and thus can be efficiently traversed by rows or columns.
"""
alias Req.Request
alias ReqBigQuery.Result
@allowed_options ~w(goth default_dataset_id project_id bigquery)a
@base_url "https://bigquery.googleapis.com/bigquery/v2"
@doc """
Attaches to Req request.
## Request Options
* `:goth` - Required. The goth server name.
* `:project_id` - Required. The GCP project id.
* `:bigquery` - Required. The query to execute. It can be a plain sql string or
a `{query, params}` tuple, where `query` can contain `?` placeholders and `params`
is a list of corresponding values.
* `:default_dataset_id` - Optional. If set, the dataset to assume for any unqualified table
names in the query. If not set, all table names in the query string must be qualified in the
format 'datasetId.tableId'.
If you want to set any of these options when attaching the plugin, pass them as the second argument.
## Examples
With plain query string:
iex> credentials = File.read!("credentials.json") |> Jason.decode!()
iex> source = {:service_account, credentials, []}
iex> {:ok, _} = Goth.start_link(name: MyGoth, source: source, http_client: &Req.request/1)
iex> project_id = System.fetch_env!("PROJECT_ID")
iex> query = \"""
...> SELECT title, SUM(views) AS views
...> FROM `bigquery-public-data.wikipedia.table_bands`
...> WHERE EXTRACT(YEAR FROM datehour) <= 2021
...> GROUP BY title
...> ORDER BY views DESC
...> LIMIT 10
...> \"""
iex> req = Req.new() |> ReqBigQuery.attach(goth: MyGoth, project_id: project_id)
iex> Req.post!(req, bigquery: query).body
%ReqBigQuery.Result{
columns: ["title", "views"],
job_id: "job_JDDZKquJWkY7x0LlDcmZ4nMQqshb",
num_rows: 10,
rows: [
["The_Beatles", 13758950],
["Queen_(band)", 12019563],
["Pink_Floyd", 9522503],
["AC/DC", 8972364],
["Led_Zeppelin", 8294994],
["Linkin_Park", 8242802],
["The_Rolling_Stones", 7825952],
["Red_Hot_Chili_Peppers", 7302904],
["Fleetwood_Mac", 7199563],
["Twenty_One_Pilots", 6970692]
]
}
With parameterized query:
iex> credentials = File.read!("credentials.json") |> Jason.decode!()
iex> source = {:service_account, credentials, []}
iex> {:ok, _} = Goth.start_link(name: MyGoth, source: source, http_client: &Req.request/1)
iex> project_id = System.fetch_env!("PROJECT_ID")
iex> query = \"""
...> SELECT EXTRACT(YEAR FROM datehour) AS year, SUM(views) AS views
...> FROM `bigquery-public-data.wikipedia.table_bands`
...> WHERE EXTRACT(YEAR FROM datehour) <= 2021
...> AND title = ?
...> GROUP BY 1
...> ORDER BY views DESC
...> \"""
iex> req = Req.new() |> ReqBigQuery.attach(goth: MyGoth, project_id: project_id)
iex> Req.post!(req, bigquery: {query, ["Linkin_Park"]}).body
%ReqBigQuery.Result{
columns: ["year", "views"],
job_id: "job_GXiJvALNsTAoAOJ39Eg3Mw94XMUQ",
num_rows: 7,
rows: [[2017, 2895889], [2016, 1173359], [2018, 1133770], [2020, 906538], [2015, 860899], [2019, 790747], [2021, 481600]]
}
"""
@spec attach(Request.t(), keyword()) :: Request.t()
def attach(%Request{} = request, options \\ []) do
request
|> Request.prepend_request_steps(bigquery_run: &run/1)
|> Request.register_options(@allowed_options)
|> Request.merge_options(options)
end
defp run(%Request{options: options} = request) do
if query = options[:bigquery] do
base_url = options[:base_url] || @base_url
token = Goth.fetch!(options.goth).token
uri = URI.parse("#{base_url}/projects/#{options.project_id}/queries")
json = build_request_body(query, options[:default_dataset_id])
%{request | url: uri}
|> Request.merge_options(auth: {:bearer, token}, json: json)
|> Request.append_response_steps(bigquery_decode: &decode/1)
else
request
end
end
defp build_request_body({query, []}, dataset) when is_binary(query) do
build_request_body(query, dataset)
end
defp build_request_body({query, params}, dataset) when is_binary(query) do
map = build_request_body(query, dataset)
query_params =
for value <- params do
{value, type} = encode_value(value)
%{parameterType: %{type: type}, parameterValue: %{value: value}}
end
Map.merge(map, %{
queryParameters: query_params,
useLegacySql: false,
parameterMode: "POSITIONAL"
})
end
defp build_request_body(query, dataset) when dataset in ["", nil] do
%{query: query, useLegacySql: false}
end
defp build_request_body(query, dataset) when is_binary(query) do
%{defaultDataset: %{datasetId: dataset}, query: query}
end
defp decode({request, %{status: 200} = response}) do
{request, update_in(response.body, &decode_body/1)}
end
defp decode(any), do: any
defp decode_body(%{
"jobReference" => %{"jobId" => job_id},
"kind" => "bigquery#queryResponse",
"rows" => rows,
"schema" => %{"fields" => fields},
"totalRows" => num_rows
}) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
rows: decode_rows(rows, fields),
columns: decode_columns(fields)
}
end
defp decode_body(%{
"jobReference" => %{"jobId" => job_id},
"kind" => "bigquery#queryResponse",
"schema" => %{"fields" => fields},
"totalRows" => num_rows
}) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
rows: [],
columns: decode_columns(fields)
}
end
defp decode_rows(rows, fields) do
Enum.map(rows, fn %{"f" => columns} ->
Enum.with_index(columns, fn %{"v" => value}, index ->
field = Enum.at(fields, index)
decode_value(value, field)
end)
end)
end
defp decode_columns(fields) do
Enum.map(fields, & &1["name"])
end
@decimal_types ~w(NUMERIC BIGNUMERIC)
defp decode_value(nil, _), do: nil
defp decode_value(%{"v" => value}, field), do: decode_value(value, field)
@invalid_float_values ["-Infinity", "Infinity", "NaN"]
defp decode_value(value, %{"type" => "FLOAT"}) when value in @invalid_float_values do
raise "float value #{inspect(value)} is not supported"
end
defp decode_value(values, %{"mode" => "REPEATED"} = field) do
Enum.map(values, &decode_value(&1, Map.delete(field, "mode")))
end
defp decode_value(value, %{"type" => "FLOAT"}), do: String.to_float(value)
defp decode_value(value, %{"type" => "INTEGER"}), do: String.to_integer(value)
defp decode_value(value, %{"type" => type}) when type in @decimal_types,
do: Decimal.new(value)
defp decode_value("true", %{"type" => "BOOLEAN"}), do: true
defp decode_value("false", %{"type" => "BOOLEAN"}), do: false
defp decode_value(value, %{"fields" => fields, "type" => "RECORD"}) do
decode_record(value, fields)
end
defp decode_value(value, %{"type" => "DATE"}), do: Date.from_iso8601!(value)
defp decode_value(value, %{"type" => "DATETIME"}),
do: NaiveDateTime.from_iso8601!(value)
defp decode_value(value, %{"type" => "TIME"}), do: Time.from_iso8601!(value)
defp decode_value(value, %{"type" => "TIMESTAMP"}) do
float = String.to_float(value)
DateTime.from_unix!(round(float * 1_000_000), :microsecond)
end
defp decode_value(value, _), do: value
defp decode_record(values, fields) when is_list(values) do
Enum.map(values, &decode_record(&1, fields))
end
defp decode_record(%{"f" => columns}, fields) do
for {%{"v" => value}, index} <- Enum.with_index(columns), into: %{} do
field = Enum.at(fields, index)
{field["name"], decode_value(value, field)}
end
end
defp encode_value(%DateTime{time_zone: "Etc/UTC"} = datetime) do
naive_datetime = DateTime.to_naive(datetime)
{to_string(naive_datetime), "TIMESTAMP"}
end
defp encode_value(%Date{} = date), do: {to_string(date), "DATE"}
defp encode_value(%Time{} = time), do: {to_string(time), "TIME"}
defp encode_value(%NaiveDateTime{} = timestamp), do: {to_string(timestamp), "DATETIME"}
defp encode_value(%Decimal{} = decimal), do: {to_string(decimal), "BIGNUMERIC"}
defp encode_value(value) when is_boolean(value), do: {value, "BOOL"}
defp encode_value(value) when is_float(value), do: {value, "FLOAT"}
defp encode_value(value) when is_integer(value), do: {value, "INTEGER"}
defp encode_value(value), do: {value, "STRING"}
end