lib/kayrock/member_assignment.ex

defmodule Kayrock.MemberAssignment do
  @moduledoc """
  Code to serialize/deserialize Kafka consumer group member assignments

  See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-SyncGroupRequest
  """

  defstruct version: 0, partition_assignments: [], user_data: ""

  defmodule PartitionAssignment do
    @moduledoc "Represents partition assignments for a specific topic"
    defstruct topic: nil, partitions: []

    @type t :: %__MODULE__{
            topic: binary,
            partitions: [integer]
          }
  end

  @type t :: %__MODULE__{
          version: integer,
          partition_assignments: [PartitionAssignment.t()],
          user_data: binary
        }

  @spec serialize(t) :: iodata
  def serialize(%__MODULE__{
        version: version,
        partition_assignments: partition_assignments,
        user_data: user_data
      }) do
    [
      <<version::16-signed, length(partition_assignments)::32-signed>>,
      Enum.map(partition_assignments, &serialize_partition_assignment/1),
      Kayrock.Serialize.serialize(:bytes, user_data)
    ]
  end

  @spec deserialize(binary) :: {t, binary}
  def deserialize(<<>>), do: {%__MODULE__{}, <<>>}

  def deserialize(<<0::32-signed>>), do: {%__MODULE__{}, <<>>}

  def deserialize(<<0::32-signed, rest::bits>>), do: {%__MODULE__{}, rest}

  def deserialize(<<data_size::32-signed, data::size(data_size)-binary, rest::bits>>) do
    {deserialize_member_assignments(data), rest}
  end

  defp deserialize_member_assignments(
         <<version::16-signed, assignments_size::32-signed, rest::binary>>
       ) do
    {partition_assignments, user_data} = parse_assignments(assignments_size, rest, [])

    %__MODULE__{
      version: version,
      partition_assignments: partition_assignments,
      user_data: user_data
    }
  end

  defp parse_assignments(0, rest, assignments), do: {assignments, rest}

  defp parse_assignments(
         size,
         <<topic_len::16-signed, topic::size(topic_len)-binary, partition_len::32-signed,
           rest::binary>>,
         assignments
       ) do
    {partitions, rest} = parse_partitions(partition_len, rest, [])

    parse_assignments(size - 1, rest, [
      %PartitionAssignment{topic: topic, partitions: partitions} | assignments
    ])
  end

  defp parse_partitions(0, rest, partitions), do: {partitions, rest}

  defp parse_partitions(
         size,
         <<partition::32-signed, rest::binary>>,
         partitions
       ) do
    parse_partitions(size - 1, rest, [partition | partitions])
  end

  defp serialize_partition_assignment(%PartitionAssignment{topic: topic, partitions: partitions}) do
    [
      Kayrock.Serialize.serialize(:string, topic),
      Kayrock.Serialize.serialize_array(:int32, partitions)
    ]
  end
end