Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 70 additions & 59 deletions pkg/encapsulation/cilium.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,95 +17,106 @@ package encapsulation
import (
"fmt"
"net"
"sync"

"github.com/vishvananda/netlink"

"github.com/cozystack/kilo/pkg/iproute"
"github.com/cozystack/kilo/pkg/iptables"
)

const ciliumDeviceName = "cilium_host"
const ciliumHostIface = "cilium_host"

type cilium struct {
iface int
strategy Strategy
ch chan netlink.LinkUpdate
done chan struct{}
// mu guards updates to the iface field.
mu sync.Mutex
}

// NewCilium returns an encapsulator that uses Cilium.
// NewCilium returns an encapsulator that uses IPIP tunnels
// routed through Cilium's VxLAN overlay.
func NewCilium(strategy Strategy) Encapsulator {
return &cilium{
ch: make(chan netlink.LinkUpdate),
done: make(chan struct{}),
strategy: strategy,
}
return &cilium{strategy: strategy}
}

// CleanUp close done channel
func (f *cilium) CleanUp() error {
close(f.done)
return nil
// CleanUp will remove any created IPIP devices.
func (c *cilium) CleanUp() error {
if err := iproute.DeleteAddresses(c.iface); err != nil {
return err
}
return iproute.RemoveInterface(c.iface)
}

// Gw returns the correct gateway IP associated with the given node.
func (f *cilium) Gw(_, _ net.IP, subnet *net.IPNet) net.IP {
// It returns the Cilium internal IP so that the IPIP outer packets are routed
// through Cilium's VxLAN overlay rather than the host network.
func (c *cilium) Gw(_, _, ciliumIP net.IP, subnet *net.IPNet) net.IP {
if ciliumIP != nil {
return ciliumIP
}
return subnet.IP
}

// Index returns the index of the Cilium interface.
func (f *cilium) Index() int {
f.mu.Lock()
defer f.mu.Unlock()
return f.iface
}

// Init finds the Cilium interface index.
func (f *cilium) Init(_ int) error {
if err := netlink.LinkSubscribe(f.ch, f.done); err != nil {
return fmt.Errorf("failed to subscribe to updates to %s: %v", ciliumDeviceName, err)
}
go func() {
var lu netlink.LinkUpdate
for {
select {
case lu = <-f.ch:
if lu.Attrs().Name == ciliumDeviceName {
f.mu.Lock()
f.iface = lu.Attrs().Index
f.mu.Unlock()
}
case <-f.done:
return
}
}
}()
i, err := netlink.LinkByName(ciliumDeviceName)
if _, ok := err.(netlink.LinkNotFoundError); ok {
// LocalIP returns the IP address of the cilium_host interface.
// This IP is advertised to other nodes so they can route IPIP outer
// packets through Cilium's overlay.
func (c *cilium) LocalIP() net.IP {
iface, err := net.InterfaceByName(ciliumHostIface)
if err != nil {
return nil
}
addrs, err := iface.Addrs()
if err != nil {
return fmt.Errorf("failed to query for Cilium interface: %v", err)
return nil
}
for _, a := range addrs {
if ipNet, ok := a.(*net.IPNet); ok && ipNet.IP.To4() != nil {
return ipNet.IP
}
}
f.mu.Lock()
f.iface = i.Attrs().Index
f.mu.Unlock()
return nil
}

// Rules is a no-op.
func (f *cilium) Rules(_ []*net.IPNet) iptables.RuleSet {
return iptables.RuleSet{}
// Index returns the index of the IPIP tunnel interface.
func (c *cilium) Index() int {
return c.iface
}

// Set is a no-op.
func (f *cilium) Set(_ *net.IPNet) error {
// Init initializes the IPIP tunnel interface.
func (c *cilium) Init(base int) error {
iface, err := iproute.NewIPIP(base)
if err != nil {
return fmt.Errorf("failed to create tunnel interface: %v", err)
}
if err := iproute.Set(iface, true); err != nil {
return fmt.Errorf("failed to set tunnel interface up: %v", err)
}
c.iface = iface
return nil
}

// Rules returns a set of iptables rules that are necessary
// when traffic between nodes must be encapsulated.
func (c *cilium) Rules(nodes []*net.IPNet) iptables.RuleSet {
rules := iptables.RuleSet{}
proto := ipipProtocolName()
rules.AddToAppend(iptables.NewIPv4Chain("filter", "KILO-IPIP"))
rules.AddToAppend(iptables.NewIPv6Chain("filter", "KILO-IPIP"))
rules.AddToAppend(iptables.NewIPv4Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: jump to IPIP chain", "-j", "KILO-IPIP"))
rules.AddToAppend(iptables.NewIPv6Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: jump to IPIP chain", "-j", "KILO-IPIP"))
for _, n := range nodes {
// Accept encapsulated traffic from peers.
rules.AddToPrepend(iptables.NewRule(iptables.GetProtocol(n.IP), "filter", "KILO-IPIP", "-s", n.String(), "-m", "comment", "--comment", "Kilo: allow IPIP traffic", "-j", "ACCEPT"))
}
// Drop all other IPIP traffic.
rules.AddToAppend(iptables.NewIPv4Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: reject other IPIP traffic", "-j", "DROP"))
rules.AddToAppend(iptables.NewIPv6Rule("filter", "INPUT", "-p", proto, "-m", "comment", "--comment", "Kilo: reject other IPIP traffic", "-j", "DROP"))

return rules
}

// Set sets the IP address of the IPIP tunnel interface.
func (c *cilium) Set(cidr *net.IPNet) error {
return iproute.SetAddress(c.iface, cidr)
}

// Strategy returns the configured strategy for encapsulation.
func (f *cilium) Strategy() Strategy {
return f.strategy
func (c *cilium) Strategy() Strategy {
return c.strategy
}
5 changes: 4 additions & 1 deletion pkg/encapsulation/encapsulation.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,12 @@ const (
// * clean up any changes applied to the backend.
type Encapsulator interface {
CleanUp() error
Gw(net.IP, net.IP, *net.IPNet) net.IP
Gw(net.IP, net.IP, net.IP, *net.IPNet) net.IP
Index() int
Init(int) error
// LocalIP returns the local overlay IP that should be advertised
// to other nodes. For Cilium, this is the IP of the cilium_host interface.
LocalIP() net.IP
Rules([]*net.IPNet) iptables.RuleSet
Set(*net.IPNet) error
Strategy() Strategy
Expand Down
7 changes: 6 additions & 1 deletion pkg/encapsulation/flannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,15 @@ func (f *flannel) CleanUp() error {
}

// Gw returns the correct gateway IP associated with the given node.
func (f *flannel) Gw(_, _ net.IP, subnet *net.IPNet) net.IP {
func (f *flannel) Gw(_, _, _ net.IP, subnet *net.IPNet) net.IP {
return subnet.IP
}

// LocalIP is a no-op for Flannel.
func (f *flannel) LocalIP() net.IP {
return nil
}

// Index returns the index of the Flannel interface.
func (f *flannel) Index() int {
f.mu.Lock()
Expand Down
7 changes: 6 additions & 1 deletion pkg/encapsulation/ipip.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,15 @@ func (i *ipip) CleanUp() error {
}

// Gw returns the correct gateway IP associated with the given node.
func (i *ipip) Gw(_, internal net.IP, _ *net.IPNet) net.IP {
func (i *ipip) Gw(_, internal, _ net.IP, _ *net.IPNet) net.IP {
return internal
}

// LocalIP is a no-op for IPIP.
func (i *ipip) LocalIP() net.IP {
return nil
}

// Index returns the index of the IPIP interface.
func (i *ipip) Index() int {
return i.iface
Expand Down
7 changes: 6 additions & 1 deletion pkg/encapsulation/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ func (n Noop) CleanUp() error {
}

// Gw will also do nothing.
func (n Noop) Gw(_ net.IP, _ net.IP, _ *net.IPNet) net.IP {
func (n Noop) Gw(_, _, _ net.IP, _ *net.IPNet) net.IP {
return nil
}

// LocalIP will also do nothing.
func (n Noop) LocalIP() net.IP {
return nil
}

Expand Down
41 changes: 27 additions & 14 deletions pkg/k8s/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,21 @@ import (

const (
// Backend is the name of this mesh backend.
Backend = "kubernetes"
endpointAnnotationKey = "kilo.squat.ai/endpoint"
forceEndpointAnnotationKey = "kilo.squat.ai/force-endpoint"
forceInternalIPAnnotationKey = "kilo.squat.ai/force-internal-ip"
internalIPAnnotationKey = "kilo.squat.ai/internal-ip"
keyAnnotationKey = "kilo.squat.ai/key"
lastSeenAnnotationKey = "kilo.squat.ai/last-seen"
leaderAnnotationKey = "kilo.squat.ai/leader"
locationAnnotationKey = "kilo.squat.ai/location"
persistentKeepaliveKey = "kilo.squat.ai/persistent-keepalive"
wireGuardIPAnnotationKey = "kilo.squat.ai/wireguard-ip"
discoveredEndpointsKey = "kilo.squat.ai/discovered-endpoints"
allowedLocationIPsKey = "kilo.squat.ai/allowed-location-ips"
granularityKey = "kilo.squat.ai/granularity"
Backend = "kubernetes"
endpointAnnotationKey = "kilo.squat.ai/endpoint"
forceEndpointAnnotationKey = "kilo.squat.ai/force-endpoint"
forceInternalIPAnnotationKey = "kilo.squat.ai/force-internal-ip"
internalIPAnnotationKey = "kilo.squat.ai/internal-ip"
keyAnnotationKey = "kilo.squat.ai/key"
lastSeenAnnotationKey = "kilo.squat.ai/last-seen"
leaderAnnotationKey = "kilo.squat.ai/leader"
locationAnnotationKey = "kilo.squat.ai/location"
persistentKeepaliveKey = "kilo.squat.ai/persistent-keepalive"
wireGuardIPAnnotationKey = "kilo.squat.ai/wireguard-ip"
discoveredEndpointsKey = "kilo.squat.ai/discovered-endpoints"
allowedLocationIPsKey = "kilo.squat.ai/allowed-location-ips"
granularityKey = "kilo.squat.ai/granularity"
ciliumInternalIPAnnotationKey = "kilo.squat.ai/cilium-internal-ip"
// RegionLabelKey is the key for the well-known Kubernetes topology region label.
RegionLabelKey = "topology.kubernetes.io/region"
jsonPatchSlash = "~1"
Expand Down Expand Up @@ -241,6 +242,11 @@ func (nb *nodeBackend) Set(ctx context.Context, name string, node *mesh.Node) er
n.ObjectMeta.Annotations[discoveredEndpointsKey] = string(discoveredEndpoints)
}
n.ObjectMeta.Annotations[granularityKey] = string(node.Granularity)
if node.CiliumInternalIP != nil {
n.ObjectMeta.Annotations[ciliumInternalIPAnnotationKey] = node.CiliumInternalIP.String()
} else {
n.ObjectMeta.Annotations[ciliumInternalIPAnnotationKey] = ""
}
oldData, err := json.Marshal(old)
if err != nil {
return err
Expand Down Expand Up @@ -342,6 +348,12 @@ func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
// TODO log some error or warning.
key, _ := wgtypes.ParseKey(node.ObjectMeta.Annotations[keyAnnotationKey])

// Parse the Cilium internal IP if present.
var ciliumInternalIP net.IP
if cipStr, ok := node.ObjectMeta.Annotations[ciliumInternalIPAnnotationKey]; ok && cipStr != "" {
ciliumInternalIP = net.ParseIP(cipStr)
}

return &mesh.Node{
// Endpoint and InternalIP should only ever fail to parse if the
// remote node's agent has not yet set its IP address;
Expand All @@ -352,6 +364,7 @@ func translateNode(node *v1.Node, topologyLabel string) *mesh.Node {
Endpoint: endpoint,
NoInternalIP: noInternalIP,
InternalIP: internalIP,
CiliumInternalIP: ciliumInternalIP,
Key: key,
LastSeen: lastSeen,
Leader: leader,
Expand Down
9 changes: 5 additions & 4 deletions pkg/mesh/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,11 @@ const (

// Node represents a node in the network.
type Node struct {
Endpoint *wireguard.Endpoint
Key wgtypes.Key
NoInternalIP bool
InternalIP *net.IPNet
Endpoint *wireguard.Endpoint
Key wgtypes.Key
NoInternalIP bool
InternalIP *net.IPNet
CiliumInternalIP net.IP
// LastSeen is a Unix time for the last time
// the node confirmed it was live.
LastSeen int64
Expand Down
2 changes: 2 additions & 0 deletions pkg/mesh/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func (m *Mesh) handleLocal(ctx context.Context, n *Node) {
Key: m.pub,
NoInternalIP: n.NoInternalIP,
InternalIP: n.InternalIP,
CiliumInternalIP: m.enc.LocalIP(),
LastSeen: time.Now().Unix(),
Leader: n.Leader,
Location: n.Location,
Expand Down Expand Up @@ -699,6 +700,7 @@ func nodesAreEqual(a, b *Node) bool {
return a.Key.String() == b.Key.String() &&
ipNetsEqual(a.WireGuardIP, b.WireGuardIP) &&
ipNetsEqual(a.InternalIP, b.InternalIP) &&
a.CiliumInternalIP.Equal(b.CiliumInternalIP) &&
a.Leader == b.Leader &&
a.Location == b.Location &&
a.Name == b.Name &&
Expand Down
12 changes: 7 additions & 5 deletions pkg/mesh/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
var gw net.IP
for _, segment := range t.segments {
if segment.location == t.location {
gw = enc.Gw(t.updateEndpoint(segment.endpoint, segment.key, &segment.persistentKeepalive).IP(), segment.privateIPs[segment.leader], segment.cidrs[segment.leader])
gw = enc.Gw(t.updateEndpoint(segment.endpoint, segment.key, &segment.persistentKeepalive).IP(), segment.privateIPs[segment.leader], segment.ciliumInternalIPs[segment.leader], segment.cidrs[segment.leader])
break
}
}
Expand All @@ -61,10 +61,11 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
if segment.privateIPs[i].Equal(t.privateIP.IP) {
continue
}
nodeGw := enc.Gw(nil, segment.privateIPs[i], segment.ciliumInternalIPs[i], segment.cidrs[i])
routes = append(routes, encapsulateRoute(&netlink.Route{
Dst: segment.cidrs[i],
Flags: int(netlink.FLAG_ONLINK),
Gw: segment.privateIPs[i],
Gw: nodeGw,
LinkIndex: privIface,
Protocol: unix.RTPROT_STATIC,
}, enc.Strategy(), t.privateIP, tunlIface))
Expand All @@ -74,7 +75,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
routes = append(routes, &netlink.Route{
Dst: oneAddressCIDR(segment.privateIPs[i]),
Flags: int(netlink.FLAG_ONLINK),
Gw: segment.privateIPs[i],
Gw: nodeGw,
LinkIndex: tunlIface,
Src: t.privateIP.IP,
Protocol: unix.RTPROT_STATIC,
Expand Down Expand Up @@ -155,10 +156,11 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
if segment.privateIPs[i].Equal(t.privateIP.IP) {
continue
}
nodeGw := enc.Gw(nil, segment.privateIPs[i], segment.ciliumInternalIPs[i], segment.cidrs[i])
routes = append(routes, encapsulateRoute(&netlink.Route{
Dst: segment.cidrs[i],
Flags: int(netlink.FLAG_ONLINK),
Gw: segment.privateIPs[i],
Gw: nodeGw,
LinkIndex: privIface,
Protocol: unix.RTPROT_STATIC,
}, enc.Strategy(), t.privateIP, tunlIface))
Expand All @@ -168,7 +170,7 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface
routes = append(routes, &netlink.Route{
Dst: oneAddressCIDR(segment.privateIPs[i]),
Flags: int(netlink.FLAG_ONLINK),
Gw: segment.privateIPs[i],
Gw: nodeGw,
LinkIndex: tunlIface,
Src: t.privateIP.IP,
Protocol: unix.RTPROT_STATIC,
Expand Down
Loading