From 8e300facc0c2c5863925cdd892f60c65b8dd15af Mon Sep 17 00:00:00 2001 From: Seivan Heidari Date: Mon, 2 Dec 2019 10:02:37 +0100 Subject: [PATCH] Fixes: https://github.com/bitwalker/libcluster/pull/113 And merges https://github.com/bitwalker/libcluster/pull/36 & https://github.com/bitwalker/libcluster/pull/99 There are some issue related to handling errors. Right now either implementations do both throw exceptions `Keyword.fetch!/2` but also doing conditional checks for logging which seems weird. Should probably stick to one, either crash or log. --- lib/strategy/kubernetes_dns.ex | 142 +++++++++++++++-- lib/strategy/kubernetes_dns_srv.ex | 238 ----------------------------- test/kubernetes_dns_srv_test.exs | 15 +- 3 files changed, 139 insertions(+), 256 deletions(-) delete mode 100644 lib/strategy/kubernetes_dns_srv.ex diff --git a/lib/strategy/kubernetes_dns.ex b/lib/strategy/kubernetes_dns.ex index 9a4568a..137af69 100644 --- a/lib/strategy/kubernetes_dns.ex +++ b/lib/strategy/kubernetes_dns.ex @@ -1,17 +1,24 @@ defmodule Cluster.Strategy.Kubernetes.DNS do @moduledoc """ - This clustering strategy works by loading all your Erlang nodes (within Pods) in the current [Kubernetes - namespace](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/). - It will fetch the addresses of all pods under a shared headless service and attempt to connect. - It will continually monitor and update its connections every 5s. + This clustering strategy works by loading all your Erlang nodes (within Pods) in the current + [Kubernetes namespace](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/). + For more information, see the kubernetes stateful-application [documentation](https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#using-stable-network-identities) + * It will fetch the FQDN of all pods under the headless service and attempt to connect. + * It will continually monitor and update its connections according to the polling_interval (default 5s) + + The `application_name`, `namespace` and `method` is configurable (you may have launched erlang with a different configured name), + but will in most cases be the name of your application It assumes that all Erlang nodes were launched under a base name, are using longnames, and are unique based on their FQDN, rather than the base hostname. In other words, in the following longname, `@`, `basename` would be the value configured through `application_name`. - An example configuration is below: + It uses one of two methods for the lookups: + * `:srv` + * `:a` (default) + An example for using A query configuration is below: config :libcluster, topologies: [ @@ -20,8 +27,110 @@ defmodule Cluster.Strategy.Kubernetes.DNS do config: [ service: "myapp-headless", application_name: "myapp", - polling_interval: 10_000]]] + polling_interval: 10_000, + method: :a + ] + ] + ] + + + + + An example for SRV query configuration is below: + + config :libcluster, + topologies: [ + k8s_example: [ + strategy: #{__MODULE__}, + config: [ + service: "elixir-plug-poc", + application_name: "elixir_plug_poc", + polling_interval: 10_000, + namespace: "default", + method: :srv + ] + ] + ] + + An example of how this strategy extracts topology information from DNS follows: + + ``` + bash-5.0# hostname -f + elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local + bash-5.0# dig SRV elixir-plug-poc.default.svc.cluster.local + + ; <<>> DiG 9.14.3 <<>> SRV elixir-plug-poc.default.svc.cluster.local + ;; global options: +cmd + ;; Got answer: + ;; WARNING: .local is reserved for Multicast DNS + ;; You are currently testing what happens when an mDNS query is leaked to DNS + ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 7169 + ;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 2 + + ;; QUESTION SECTION: + ;elixir-plug-poc.default.svc.cluster.local. IN SRV + + ;; ANSWER SECTION: + elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local. + elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local. + + ;; ADDITIONAL SECTION: + elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.95 + elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.96 + + ;; Query time: 0 msec + ;; SERVER: 10.96.0.10#53(10.96.0.10) + ;; WHEN: Wed Jul 03 11:55:27 UTC 2019 + ;; MSG SIZE rcvd: 167 + ``` + + And here is an example of a corresponding kubernetes statefulset/service definition: + + ```yaml + apiVersion: v1 + kind: Service + metadata: + name: elixir-plug-poc + labels: + app: elixir-plug-poc + spec: + ports: + - port: 4000 + name: web + clusterIP: None + selector: + app: elixir-plug-poc + --- + apiVersion: apps/v1 + kind: StatefulSet + metadata: + name: elixir-plug-poc + spec: + serviceName: "elixir-plug-poc" + replicas: 2 + selector: + matchLabels: + app: elixir-plug-poc + template: + metadata: + labels: + app: elixir-plug-poc + spec: + containers: + - name: elixir-plug-poc + image: binarytemple/elixir_plug_poc + args: + - foreground + env: + - name: ERLANG_COOKIE + value: "cookie" + imagePullPolicy: Always + ports: + - containerPort: 4000 + name: http + protocol: TCP + ``` """ use GenServer use Cluster.Strategy @@ -106,17 +215,21 @@ defmodule Cluster.Strategy.Kubernetes.DNS do @spec get_nodes(State.t()) :: [atom()] defp get_nodes(%State{topology: topology, config: config}) do - app_name = Keyword.fetch!(config, :application_name) - service = Keyword.fetch!(config, :service) - resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :a)) + method = Keyword.get(config, :method, :a) + app_name = Keyword.get(config, :application_name) + service_name = Keyword.get(config, :service) + namespace = Keyword.get(config, :namespace, "default") + service_k8s_path = "#{service_name}.#{namespace}.svc.cluster.local." + service = if method == :a, do: service_name, else: service_k8s_path + resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, method)) cond do app_name != nil and service != nil -> headless_service = to_charlist(service) case resolver.(headless_service) do - {:ok, {:hostent, _fqdn, [], :inet, _value, addresses}} -> - parse_response(addresses, app_name) + {:ok, {:hostent, _, _, addrtype, _count, addresses}} when addrtype in [:srv, :inet] -> + parse_response(addresses, {app_name, method}) {:error, reason} -> error(topology, "lookup against #{service} failed: #{inspect(reason)}") @@ -145,9 +258,12 @@ defmodule Cluster.Strategy.Kubernetes.DNS do Keyword.get(config, :polling_interval, @default_polling_interval) end - defp parse_response(addresses, app_name) do + defp parse_response(addresses, {app_name, method}) do + parser = + if method == :a, do: &:inet_parse.ntoa(&1), else: &:erlang.list_to_binary(elem(&1, 3)) + addresses - |> Enum.map(&:inet_parse.ntoa(&1)) + |> Enum.map(parser) |> Enum.map(&"#{app_name}@#{&1}") |> Enum.map(&String.to_atom(&1)) end diff --git a/lib/strategy/kubernetes_dns_srv.ex b/lib/strategy/kubernetes_dns_srv.ex deleted file mode 100644 index f636795..0000000 --- a/lib/strategy/kubernetes_dns_srv.ex +++ /dev/null @@ -1,238 +0,0 @@ -defmodule Cluster.Strategy.Kubernetes.DNSSRV do - @moduledoc """ - This clustering strategy works by issuing a SRV query for the kubernetes headless service - under which the stateful set containing your nodes is running. - - For more information, see the kubernetes stateful-application [documentation](https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#using-stable-network-identities) - - * It will fetch the FQDN of all pods under the headless service and attempt to connect. - * It will continually monitor and update its connections according to the polling_interval (default 5s) - - The `application_name` is configurable (you may have launched erlang with a different configured name), - but will in most cases be the name of your application - - An example configuration is below: - - config :libcluster, - topologies: [ - k8s_example: [ - strategy: #{__MODULE__}, - config: [ - service: "elixir-plug-poc", - application_name: "elixir_plug_poc", - polling_interval: 10_000]]] - - An example of how this strategy extracts topology information from DNS follows: - - ``` - bash-5.0# hostname -f - elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local - bash-5.0# dig SRV elixir-plug-poc.default.svc.cluster.local - - ; <<>> DiG 9.14.3 <<>> SRV elixir-plug-poc.default.svc.cluster.local - ;; global options: +cmd - ;; Got answer: - ;; WARNING: .local is reserved for Multicast DNS - ;; You are currently testing what happens when an mDNS query is leaked to DNS - ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 7169 - ;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 2 - - ;; QUESTION SECTION: - ;elixir-plug-poc.default.svc.cluster.local. IN SRV - - ;; ANSWER SECTION: - elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local. - elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local. - - ;; ADDITIONAL SECTION: - elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.95 - elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.96 - - ;; Query time: 0 msec - ;; SERVER: 10.96.0.10#53(10.96.0.10) - ;; WHEN: Wed Jul 03 11:55:27 UTC 2019 - ;; MSG SIZE rcvd: 167 - ``` - - And here is an example of a corresponding kubernetes statefulset/service definition: - - ```yaml - apiVersion: v1 - kind: Service - metadata: - name: elixir-plug-poc - labels: - app: elixir-plug-poc - spec: - ports: - - port: 4000 - name: web - clusterIP: None - selector: - app: elixir-plug-poc - --- - apiVersion: apps/v1 - kind: StatefulSet - metadata: - name: elixir-plug-poc - spec: - serviceName: "elixir-plug-poc" - replicas: 2 - selector: - matchLabels: - app: elixir-plug-poc - template: - metadata: - labels: - app: elixir-plug-poc - spec: - containers: - - name: elixir-plug-poc - image: binarytemple/elixir_plug_poc - args: - - foreground - env: - - name: ERLANG_COOKIE - value: "cookie" - imagePullPolicy: Always - ports: - - containerPort: 4000 - name: http - protocol: TCP - ``` - """ - use GenServer - use Cluster.Strategy - import Cluster.Logger - - alias Cluster.Strategy.State - - @default_polling_interval 5_000 - - @impl true - def start_link(args), do: GenServer.start_link(__MODULE__, args) - - @impl true - def init([%State{meta: nil} = state]) do - init([%State{state | :meta => MapSet.new()}]) - end - - def init([%State{} = state]) do - {:ok, load(state), 0} - end - - @impl true - def handle_info(:timeout, state) do - handle_info(:load, state) - end - - def handle_info(:load, state) do - {:noreply, load(state)} - end - - def handle_info(_, state) do - {:noreply, state} - end - - defp load(%State{topology: topology, meta: meta} = state) do - new_nodelist = MapSet.new(get_nodes(state)) - added = MapSet.difference(new_nodelist, meta) - removed = MapSet.difference(meta, new_nodelist) - - new_nodelist = - case Cluster.Strategy.disconnect_nodes( - topology, - state.disconnect, - state.list_nodes, - MapSet.to_list(removed) - ) do - :ok -> - new_nodelist - - {:error, bad_nodes} -> - # Add back the nodes which should have been removed, but which couldn't be for some reason - Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> - MapSet.put(acc, n) - end) - end - - new_nodelist = - case Cluster.Strategy.connect_nodes( - topology, - state.connect, - state.list_nodes, - MapSet.to_list(added) - ) do - :ok -> - new_nodelist - - {:error, bad_nodes} -> - # Remove the nodes which should have been added, but couldn't be for some reason - Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc -> - MapSet.delete(acc, n) - end) - end - - Process.send_after( - self(), - :load, - polling_interval(state) - ) - - %State{state | :meta => new_nodelist} - end - - @spec get_nodes(State.t()) :: [atom()] - defp get_nodes(%State{topology: topology, config: config}) do - app_name = Keyword.fetch!(config, :application_name) - service = Keyword.fetch!(config, :service) - namespace = Keyword.fetch!(config, :namespace) - service_k8s_path = "#{service}.#{namespace}.svc.cluster.local." - resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :srv)) - - cond do - app_name != nil and service != nil -> - headless_service = to_charlist(service_k8s_path) - - case resolver.(headless_service) do - {:ok, {:hostent, _, _, :srv, _count, addresses}} -> - parse_response(addresses, app_name) - - {:error, reason} -> - error( - topology, - "#{inspect(headless_service)} : lookup against #{service} failed: #{inspect(reason)}" - ) - - [] - end - - app_name == nil -> - warn( - topology, - "kubernetes.DNS strategy is selected, but :application_name is not configured!" - ) - - [] - - service == nil -> - warn(topology, "kubernetes strategy is selected, but :service is not configured!") - [] - - :else -> - warn(topology, "kubernetes strategy is selected, but is not configured!") - [] - end - end - - defp polling_interval(%State{config: config}) do - Keyword.get(config, :polling_interval, @default_polling_interval) - end - - defp parse_response(addresses, app_name) do - addresses - |> Enum.map(&:erlang.list_to_binary(elem(&1, 3))) - |> Enum.map(&"#{app_name}@#{&1}") - |> Enum.map(&String.to_atom(&1)) - end -end diff --git a/test/kubernetes_dns_srv_test.exs b/test/kubernetes_dns_srv_test.exs index 55969bb..d25dbf6 100644 --- a/test/kubernetes_dns_srv_test.exs +++ b/test/kubernetes_dns_srv_test.exs @@ -4,7 +4,7 @@ defmodule Cluster.Strategy.KubernetesSRVDNSTest do use ExUnit.Case, async: true import ExUnit.CaptureLog - alias Cluster.Strategy.Kubernetes.DNSSRV + alias Cluster.Strategy.Kubernetes.DNS, as: DNSSRV alias Cluster.Strategy.State alias Cluster.Nodes @@ -21,6 +21,7 @@ defmodule Cluster.Strategy.KubernetesSRVDNSTest do service: "elixir-plug-poc", namespace: "default", application_name: "node", + method: :srv, resolver: fn _query -> {:ok, {:hostent, 'elixir-plug-poc.default.svc.cluster.local', [], :srv, 2, @@ -57,6 +58,7 @@ defmodule Cluster.Strategy.KubernetesSRVDNSTest do service: "elixir-plug-poc", namespace: "default", application_name: "node", + method: :srv, resolver: fn _query -> {:ok, {:hostent, 'elixir-plug-poc.default.svc.cluster.local', [], :srv, 1, @@ -100,20 +102,22 @@ defmodule Cluster.Strategy.KubernetesSRVDNSTest do service: "app", namespace: "default", application_name: "node", + method: :srv, resolver: fn _query -> {:ok, {:hostent, 'elixir-plug-poc.default.svc.cluster.local', [], :srv, 2, [ {10, 50, 0, 'elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local'} ]}} - - end ], connect: {Nodes, :connect, [self()]}, disconnect: {Nodes, :disconnect, [self()]}, - list_nodes: {Nodes, :list_nodes, [[:"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"]]}, - meta: MapSet.new([:"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"]) + list_nodes: + {Nodes, :list_nodes, + [[:"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"]]}, + meta: + MapSet.new([:"node@elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local"]) } ] |> DNSSRV.start_link() @@ -133,6 +137,7 @@ defmodule Cluster.Strategy.KubernetesSRVDNSTest do service: "app", namespace: "default", application_name: "node", + method: :srv, resolver: fn _query -> {:error, :nxdomain} end ], connect: {Nodes, :connect, [self()]},