defmodule ReqBigQuery do
@moduledoc """
`Req` plugin for [Google BigQuery](https://cloud.google.com/bigquery/docs/reference/rest).
ReqBigQuery makes it easy to make BigQuery queries. It uses `Goth` for authentication.
Query results are decoded into the `ReqBigQuery.Result` struct.
The struct implements the `Table.Reader` protocol and thus can be efficiently traversed by rows or columns.
"""
alias Req.Request
alias ReqBigQuery.Result
@allowed_options ~w(goth default_dataset_id project_id bigquery max_results use_legacy_sql timeout_ms dry_run)a
@base_url "https://bigquery.googleapis.com/bigquery/v2"
@max_results 10_000
@use_legacy_sql false
@timeout_ms 10_000
@doc """
Attaches to Req request.
## Request Options
* `:goth` - Required. The goth server name.
* `:project_id` - Required. The GCP project id.
* `:bigquery` - Required. The query to execute. It can be a plain sql string or
a `{query, params}` tuple, where `query` can contain `?` placeholders and `params`
is a list of corresponding values.
* `:default_dataset_id` - Optional. If set, the dataset to assume for any unqualified table
names in the query. If not set, all table names in the query string must be qualified in the
format 'datasetId.tableId'.
* `:max_results` - Optional. Number of rows to be returned by BigQuery in each request (paging).
The rows Stream can make multiple requests if `num_rows` returned is grather than `:max_results`.
Defaults to 10000.
* `:use_legacy_sql` - Optional. Specifies whether to use BigQuery's legacy SQL dialect for this query.
If set to false, the query will use BigQuery's GoogleSQL: https://cloud.google.com/bigquery/sql-reference/
The default value is false.
* `:timeout_ms` - Optional. How long to wait for the query to complete, in milliseconds,
before the request times out and returns. Note: The call is not guaranteed to wait for
the specified timeout. If the query takes longer to run than the timeout value, the call
returns without any results and with the 'jobComplete' flag set to false. The default
value is 10000 milliseconds (10 seconds).
* `:dry_run` - Optional. Specifies whether to run the given query in dry run mode.
If you want to set any of these options when attaching the plugin, pass them as the second argument.
## Examples
With plain query string:
iex> credentials = File.read!("credentials.json") |> Jason.decode!()
iex> source = {:service_account, credentials, []}
iex> {:ok, _} = Goth.start_link(name: MyGoth, source: source, http_client: &Req.request/1)
iex> project_id = System.fetch_env!("PROJECT_ID")
iex> query = \"""
...> SELECT title, SUM(views) AS views
...> FROM `bigquery-public-data.wikipedia.table_bands`
...> WHERE EXTRACT(YEAR FROM datehour) <= 2021
...> GROUP BY title
...> ORDER BY views DESC
...> LIMIT 10
...> \"""
iex> req = Req.new() |> ReqBigQuery.attach(goth: MyGoth, project_id: project_id)
iex> res = Req.post!(req, bigquery: query).body
iex> res
%ReqBigQuery.Result{
columns: ["title", "views"],
job_id: "job_JDDZKquJWkY7x0LlDcmZ4nMQqshb",
num_rows: 10,
total_bytes_processed: 18161868216,
rows: %Stream{}
}
iex> Enum.to_list(res.rows)
[
["The_Beatles", 13758950],
["Queen_(band)", 12019563],
["Pink_Floyd", 9522503],
["AC/DC", 8972364],
["Led_Zeppelin", 8294994],
["Linkin_Park", 8242802],
["The_Rolling_Stones", 7825952],
["Red_Hot_Chili_Peppers", 7302904],
["Fleetwood_Mac", 7199563],
["Twenty_One_Pilots", 6970692]
]
With parameterized query:
iex> credentials = File.read!("credentials.json") |> Jason.decode!()
iex> source = {:service_account, credentials, []}
iex> {:ok, _} = Goth.start_link(name: MyGoth, source: source, http_client: &Req.request/1)
iex> project_id = System.fetch_env!("PROJECT_ID")
iex> query = \"""
...> SELECT EXTRACT(YEAR FROM datehour) AS year, SUM(views) AS views
...> FROM `bigquery-public-data.wikipedia.table_bands`
...> WHERE EXTRACT(YEAR FROM datehour) <= 2021
...> AND title = ?
...> GROUP BY 1
...> ORDER BY views DESC
...> \"""
iex> req = Req.new() |> ReqBigQuery.attach(goth: MyGoth, project_id: project_id)
iex> res = Req.post!(req, bigquery: {query, ["Linkin_Park"]}).body
%ReqBigQuery.Result{
columns: ["year", "views"],
job_id: "job_GXiJvALNsTAoAOJ39Eg3Mw94XMUQ",
num_rows: 7,
total_bytes_processed: 15686357820,
rows: %Stream{}
}
iex> Enum.to_list(res.rows)
[[2017, 2895889], [2016, 1173359], [2018, 1133770], [2020, 906538], [2015, 860899], [2019, 790747], [2021, 481600]]
"""
@spec attach(Request.t(), keyword()) :: Request.t()
def attach(%Request{} = request, options \\ []) do
checked_options =
options
|> Keyword.put_new(:base_url, @base_url)
|> Keyword.put_new(:max_results, @max_results)
|> Keyword.put_new(:use_legacy_sql, @use_legacy_sql)
|> Keyword.put_new(:timeout_ms, @timeout_ms)
request
|> Request.prepend_request_steps(bigquery_run: &run/1)
|> Request.register_options(@allowed_options)
|> Request.merge_options(checked_options)
end
defp run(%Request{options: options} = request) do
if query = options[:bigquery] do
goth = options[:goth] || raise ":goth is missing"
project_id = options[:project_id] || raise ":project_id is missing"
base_url = options[:base_url]
token = Goth.fetch!(goth).token
uri = URI.parse("#{base_url}/projects/#{project_id}/queries")
json =
query
|> build_request_body(options[:default_dataset_id])
|> Map.put(:maxResults, options[:max_results])
|> Map.put(:useLegacySql, options[:use_legacy_sql])
|> Map.put(:timeoutMs, options[:timeout_ms])
|> Map.put(:dryRun, options[:dry_run] || false)
%{request | url: uri}
|> Request.merge_options(auth: {:bearer, token}, json: json)
|> Request.append_response_steps(bigquery_decode: &decode/1)
else
request
end
end
defp build_request_body({query, []}, dataset) when is_binary(query) do
build_request_body(query, dataset)
end
defp build_request_body({query, params}, dataset) when is_binary(query) do
map = build_request_body(query, dataset)
query_params =
for value <- params do
{value, type} = encode_value(value)
%{parameterType: %{type: type}, parameterValue: %{value: value}}
end
Map.merge(map, %{
queryParameters: query_params,
useLegacySql: false,
parameterMode: "POSITIONAL"
})
end
defp build_request_body(query, dataset) when dataset in ["", nil] do
%{query: query}
end
defp build_request_body(query, dataset) when is_binary(query) do
%{defaultDataset: %{datasetId: dataset}, query: query}
end
defp decode({request, %{status: 200} = response}) do
{request, update_in(response.body, &decode_body(&1, request.options))}
end
defp decode(any), do: any
defp decode_body(
%{
"jobReference" => %{"jobId" => job_id},
"kind" => "bigquery#queryResponse",
"rows" => _rows,
"schema" => %{"fields" => fields},
"totalRows" => num_rows,
"totalBytesProcessed" => total_bytes
} = initial_response,
request_options
) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
total_bytes_processed: String.to_integer(total_bytes),
rows: initial_response |> rows_stream(request_options) |> decode_rows(fields),
columns: decode_columns(fields)
}
end
defp decode_body(
%{
"jobReference" => %{"jobId" => job_id},
"kind" => "bigquery#queryResponse",
"schema" => %{"fields" => fields},
"totalRows" => num_rows,
"totalBytesProcessed" => total_bytes
},
_request_options
) do
%Result{
job_id: job_id,
num_rows: String.to_integer(num_rows),
total_bytes_processed: String.to_integer(total_bytes),
rows: [],
columns: decode_columns(fields)
}
end
defp decode_body(
%{
"jobReference" => %{},
"kind" => "bigquery#queryResponse",
"schema" => %{"fields" => fields},
"totalBytesProcessed" => total_bytes
},
_request_options
) do
%Result{
job_id: nil,
num_rows: 0,
total_bytes_processed: String.to_integer(total_bytes),
rows: [],
columns: decode_columns(fields)
}
end
defp rows_stream(initial_response, request_options) do
Stream.unfold({:initial, initial_response}, fn
{:initial, %{"rows" => rows} = initial_body} ->
{rows, initial_body}
%{
"pageToken" => page_token,
"jobReference" => job_reference
} ->
%{"jobId" => job_id, "projectId" => project_id} = job_reference
job_location = Map.get(job_reference, "location")
resp = page_request(request_options, project_id, job_id, job_location, page_token)
{resp.body["rows"], resp.body}
_end ->
# last iteration didn't have pageToken
nil
end)
|> Stream.flat_map(& &1)
end
defp page_request(options, project_id, job_id, job_location, page_token) do
uri =
URI.parse(
"#{@base_url}/projects/#{project_id}/queries/#{job_id}?maxResults=#{options[:max_results]}&pageToken=#{page_token}"
)
uri =
case job_location do
nil -> uri
job_location -> URI.append_query(uri, "location=#{job_location}")
end
token = Goth.fetch!(options[:goth]).token
Req.new(url: uri)
|> Request.merge_options(auth: {:bearer, token})
|> Req.get!()
end
defp decode_rows(rows, fields) do
Stream.map(rows, fn %{"f" => columns} ->
Enum.with_index(columns, fn %{"v" => value}, index ->
field = Enum.at(fields, index)
decode_value(value, field)
end)
end)
end
defp decode_columns(fields) do
Enum.map(fields, & &1["name"])
end
@decimal_types ~w(NUMERIC BIGNUMERIC)
defp decode_value(nil, _), do: nil
defp decode_value(%{"v" => value}, field), do: decode_value(value, field)
@invalid_float_values ["-Infinity", "Infinity", "NaN"]
defp decode_value(value, %{"type" => "FLOAT"}) when value in @invalid_float_values do
raise "float value #{inspect(value)} is not supported"
end
defp decode_value(values, %{"mode" => "REPEATED"} = field) do
Enum.map(values, &decode_value(&1, Map.delete(field, "mode")))
end
defp decode_value(value, %{"type" => "FLOAT"}), do: String.to_float(value)
defp decode_value(value, %{"type" => "INTEGER"}), do: String.to_integer(value)
defp decode_value(value, %{"type" => type}) when type in @decimal_types,
do: Decimal.new(value)
defp decode_value("true", %{"type" => "BOOLEAN"}), do: true
defp decode_value("false", %{"type" => "BOOLEAN"}), do: false
defp decode_value(value, %{"fields" => fields, "type" => "RECORD"}) do
decode_record(value, fields)
end
defp decode_value(value, %{"type" => "DATE"}), do: Date.from_iso8601!(value)
defp decode_value(value, %{"type" => "DATETIME"}),
do: NaiveDateTime.from_iso8601!(value)
defp decode_value(value, %{"type" => "TIME"}), do: Time.from_iso8601!(value)
defp decode_value(value, %{"type" => "TIMESTAMP"}) do
float = String.to_float(value)
DateTime.from_unix!(round(float * 1_000_000), :microsecond)
end
defp decode_value(value, _), do: value
defp decode_record(values, fields) when is_list(values) do
Enum.map(values, &decode_record(&1, fields))
end
defp decode_record(%{"f" => columns}, fields) do
for {%{"v" => value}, index} <- Enum.with_index(columns), into: %{} do
field = Enum.at(fields, index)
{field["name"], decode_value(value, field)}
end
end
defp encode_value(%DateTime{time_zone: "Etc/UTC"} = datetime) do
naive_datetime = DateTime.to_naive(datetime)
{to_string(naive_datetime), "TIMESTAMP"}
end
defp encode_value(%Date{} = date), do: {to_string(date), "DATE"}
defp encode_value(%Time{} = time), do: {to_string(time), "TIME"}
defp encode_value(%NaiveDateTime{} = timestamp), do: {to_string(timestamp), "DATETIME"}
defp encode_value(%Decimal{} = decimal), do: {to_string(decimal), "BIGNUMERIC"}
defp encode_value(value) when is_boolean(value), do: {value, "BOOL"}
defp encode_value(value) when is_float(value), do: {value, "FLOAT"}
defp encode_value(value) when is_integer(value), do: {value, "INTEGER"}
defp encode_value(value), do: {value, "STRING"}
end