Skip to content

Commit 4a8ed17

Browse files
committed
resolve Christoph's comments
1 parent f929711 commit 4a8ed17

6 files changed

Lines changed: 174 additions & 117 deletions

File tree

internal/controller/sync/controller.go

Lines changed: 119 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636

3737
corev1 "k8s.io/api/core/v1"
3838
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
39+
"k8s.io/apimachinery/pkg/api/meta"
3940
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4041
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
4142
"k8s.io/apimachinery/pkg/labels"
@@ -164,11 +165,13 @@ func Create(
164165
return nil, fmt.Errorf("failed to setup local-side watch: %w", err)
165166
}
166167

167-
// Watch origin:kcp related resources so that changes to them trigger reconciliation
168-
// of the owning primary object. Only related resources with a Watch config are covered.
169-
watchedGVKs := sets.New[schema.GroupVersionKind]()
168+
// Watch related resources on their origin side so that changes to them trigger
169+
// reconciliation of the owning primary object. Only related resources with a Watch
170+
// config are covered. Deduplication is per-origin to allow the same GVK on both sides.
171+
watchedKcpGVKs := sets.New[schema.GroupVersionKind]()
172+
watchedServiceGVKs := sets.New[schema.GroupVersionKind]()
170173
for _, relRes := range pubRes.Spec.Related {
171-
if relRes.Origin != syncagentv1alpha1.RelatedResourceOriginKcp || relRes.Watch == nil {
174+
if relRes.Watch == nil {
172175
continue
173176
}
174177

@@ -178,57 +181,134 @@ func Create(
178181
Resource: relRes.Resource,
179182
}
180183

181-
// Use the local REST mapper to determine the Kind.
182-
gvk, err := localManager.GetRESTMapper().KindFor(gvr)
183-
if err != nil {
184-
log.Warnw("Failed to determine Kind for origin:kcp related resource, skipping watch", "gvr", gvr, "error", err)
185-
continue
184+
// Use the REST mapper of the origin side: related resources may have projected GVKs
185+
// that differ between kcp and the service cluster, so we must resolve using the
186+
// mapper that actually knows about the GVR on that side.
187+
var originRESTMapper meta.RESTMapper
188+
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
189+
originRESTMapper = remoteManager.GetLocalManager().GetRESTMapper()
190+
} else {
191+
originRESTMapper = localManager.GetRESTMapper()
186192
}
187193

188-
// Deduplicate: only set up one watch per GVK.
189-
if watchedGVKs.Has(gvk) {
190-
continue
194+
gvk, err := originRESTMapper.KindFor(gvr)
195+
if err != nil {
196+
return nil, fmt.Errorf("failed to determine Kind for related resource %v (origin: %s): %w", gvr, relRes.Origin, err)
191197
}
192-
watchedGVKs.Insert(gvk)
193198

194199
relatedDummy := &unstructured.Unstructured{}
195200
relatedDummy.SetGroupVersionKind(gvk)
196201

197-
var enqueueForRelated mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request]
202+
if relRes.Origin == syncagentv1alpha1.RelatedResourceOriginKcp {
203+
// Deduplicate: only set up one watch per GVK per origin side.
204+
if watchedKcpGVKs.Has(gvk) {
205+
continue
206+
}
207+
watchedKcpGVKs.Insert(gvk)
208+
209+
// The related resource lives in the kcp workspace; watch it via MultiClusterWatch.
210+
var enqueueForRelated mchandler.TypedEventHandlerFunc[*unstructured.Unstructured, mcreconcile.Request]
211+
212+
switch {
213+
case relRes.Watch.ByOwner != nil:
214+
ownerGVK := remoteDummy.GroupVersionKind()
215+
enqueueForRelated = func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
216+
return &byOwnerEventHandler{
217+
clusterName: clusterName,
218+
ownerGVK: ownerGVK,
219+
}
220+
}
198221

199-
switch {
200-
case relRes.Watch.ByOwner != nil:
201-
ownerKind := relRes.Watch.ByOwner.Kind
202-
enqueueForRelated = func(clusterName string, _ cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
203-
return &byOwnerEventHandler{
204-
clusterName: clusterName,
205-
ownerKind: ownerKind,
222+
case relRes.Watch.BySelector != nil:
223+
labelSelector := relRes.Watch.BySelector
224+
primaryDummy := remoteDummy.DeepCopy()
225+
enqueueForRelated = func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
226+
return &bySelectorEventHandler{
227+
clusterName: clusterName,
228+
client: cl.GetClient(),
229+
primaryDummy: primaryDummy,
230+
labelSelector: labelSelector,
231+
log: log,
232+
}
206233
}
234+
235+
default:
236+
return nil, fmt.Errorf("related resource %v (origin: %s) has Watch set but neither byOwner nor bySelector configured", gvk, relRes.Origin)
207237
}
208238

209-
case relRes.Watch.ByLabel != nil:
210-
labelTemplates := relRes.Watch.ByLabel
211-
primaryDummy := remoteDummy.DeepCopy()
212-
enqueueForRelated = func(clusterName string, cl cluster.Cluster) handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request] {
213-
return &byLabelEventHandler{
214-
clusterName: clusterName,
215-
client: cl.GetClient(),
216-
primaryDummy: primaryDummy,
217-
labelTemplates: labelTemplates,
218-
log: log,
239+
if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
240+
return nil, fmt.Errorf("failed to setup watch for kcp-origin related resource %v: %w", gvk, err)
241+
}
242+
} else {
243+
// Deduplicate: only set up one watch per GVK per origin side.
244+
if watchedServiceGVKs.Has(gvk) {
245+
continue
246+
}
247+
watchedServiceGVKs.Insert(gvk)
248+
249+
// The related resource lives on the service cluster; watch it via the local manager.
250+
// Map the changed related resource back to the remote (kcp) primary object by going
251+
// through the local primary, which carries the remote cluster/name in its metadata.
252+
localClient := localManager.GetClient()
253+
254+
var enqueueForRelated handler.TypedEventHandler[*unstructured.Unstructured, mcreconcile.Request]
255+
256+
switch {
257+
case relRes.Watch.ByOwner != nil:
258+
ownerGVK := localDummy.GroupVersionKind()
259+
primaryDummy := localDummy.DeepCopy()
260+
enqueueForRelated = handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, obj *unstructured.Unstructured) []mcreconcile.Request {
261+
for _, ref := range obj.GetOwnerReferences() {
262+
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
263+
if err != nil || refGV.Group != ownerGVK.Group || refGV.Version != ownerGVK.Version || ref.Kind != ownerGVK.Kind {
264+
continue
265+
}
266+
localPrimary := primaryDummy.DeepCopy()
267+
if err := localClient.Get(ctx, types.NamespacedName{Namespace: obj.GetNamespace(), Name: ref.Name}, localPrimary); err != nil {
268+
log.Warnw("Failed to fetch local primary for byOwner watch", "owner", ref.Name, "error", err)
269+
return nil
270+
}
271+
if req := sync.RemoteNameForLocalObject(localPrimary); req != nil {
272+
return []mcreconcile.Request{*req}
273+
}
274+
return nil
275+
}
276+
return nil
277+
})
278+
279+
case relRes.Watch.BySelector != nil:
280+
selector, err := metav1.LabelSelectorAsSelector(relRes.Watch.BySelector)
281+
if err != nil {
282+
return nil, fmt.Errorf("failed to convert bySelector for service-origin related resource %v: %w", gvk, err)
219283
}
284+
primaryDummy := localDummy.DeepCopy()
285+
enqueueForRelated = handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, _ *unstructured.Unstructured) []mcreconcile.Request {
286+
primaryList := &unstructured.UnstructuredList{}
287+
primaryList.SetAPIVersion(primaryDummy.GetAPIVersion())
288+
primaryList.SetKind(primaryDummy.GetKind() + "List")
289+
if err := localClient.List(ctx, primaryList, &ctrlruntimeclient.ListOptions{LabelSelector: selector}); err != nil {
290+
log.Warnw("Failed to list local primary objects for bySelector watch", "selector", selector.String(), "error", err)
291+
return nil
292+
}
293+
var reqs []mcreconcile.Request
294+
for i := range primaryList.Items {
295+
if req := sync.RemoteNameForLocalObject(&primaryList.Items[i]); req != nil {
296+
reqs = append(reqs, *req)
297+
}
298+
}
299+
return reqs
300+
})
301+
302+
default:
303+
return nil, fmt.Errorf("related resource %v (origin: %s) has Watch set but neither byOwner nor bySelector configured", gvk, relRes.Origin)
220304
}
221305

222-
default:
223-
log.Warnw("origin:kcp related resource has Watch set but neither byOwner nor byLabel configured, skipping", "gvk", gvk)
224-
continue
225-
}
226-
227-
if err := c.MultiClusterWatch(mcsource.TypedKind(relatedDummy, enqueueForRelated)); err != nil {
228-
return nil, fmt.Errorf("failed to setup watch for origin:kcp related resource %v: %w", gvk, err)
306+
if err := c.Watch(source.TypedKind(localManager.GetCache(), relatedDummy, enqueueForRelated)); err != nil {
307+
return nil, fmt.Errorf("failed to setup watch for service-origin related resource %v: %w", gvk, err)
308+
}
229309
}
230310

231-
log.Infow("Set up watch for origin:kcp related resource", "gvk", gvk)
311+
log.Infow("Set up watch for related resource", "gvk", gvk, "origin", relRes.Origin)
232312
}
233313

234314
log.Info("Done setting up unmanaged controller.")

internal/controller/sync/related_handlers.go

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ package sync
1818

1919
import (
2020
"context"
21+
"fmt"
2122

2223
"go.uber.org/zap"
2324

24-
"github.com/kcp-dev/api-syncagent/internal/sync/templating"
25-
25+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2626
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
27+
"k8s.io/apimachinery/pkg/runtime/schema"
2728
"k8s.io/apimachinery/pkg/types"
2829
"k8s.io/client-go/util/workqueue"
2930
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
@@ -33,10 +34,10 @@ import (
3334
)
3435

3536
// byOwnerEventHandler enqueues the primary object by inspecting the OwnerReferences
36-
// of the changed related object and finding one with the configured Kind.
37+
// of the changed related object and finding one matching the configured GVK.
3738
type byOwnerEventHandler struct {
3839
clusterName string
39-
ownerKind string
40+
ownerGVK schema.GroupVersionKind
4041
}
4142

4243
func (h *byOwnerEventHandler) Create(_ context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
@@ -57,7 +58,11 @@ func (h *byOwnerEventHandler) Generic(_ context.Context, evt event.TypedGenericE
5758

5859
func (h *byOwnerEventHandler) enqueue(obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
5960
for _, ref := range obj.GetOwnerReferences() {
60-
if ref.Kind == h.ownerKind {
61+
refGV, err := schema.ParseGroupVersion(ref.APIVersion)
62+
if err != nil {
63+
continue
64+
}
65+
if refGV.Group == h.ownerGVK.Group && refGV.Version == h.ownerGVK.Version && ref.Kind == h.ownerGVK.Kind {
6166
q.Add(mcreconcile.Request{
6267
ClusterName: h.clusterName,
6368
Request: reconcile.Request{
@@ -72,60 +77,46 @@ func (h *byOwnerEventHandler) enqueue(obj *unstructured.Unstructured, q workqueu
7277
}
7378
}
7479

75-
// byLabelEventHandler enqueues primary objects by evaluating label templates against
76-
// the changed related object and listing primaries matching the resulting label selector.
77-
type byLabelEventHandler struct {
78-
clusterName string
79-
client ctrlruntimeclient.Client
80-
primaryDummy *unstructured.Unstructured
81-
labelTemplates map[string]string
82-
log *zap.SugaredLogger
80+
// bySelectorEventHandler enqueues primary objects by listing primaries matching the configured
81+
// label selector whenever a related object changes.
82+
type bySelectorEventHandler struct {
83+
clusterName string
84+
client ctrlruntimeclient.Client
85+
primaryDummy *unstructured.Unstructured
86+
labelSelector *metav1.LabelSelector
87+
log *zap.SugaredLogger
8388
}
8489

85-
func (h *byLabelEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
90+
func (h *bySelectorEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
8691
h.enqueue(ctx, evt.Object, q)
8792
}
8893

89-
func (h *byLabelEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
94+
func (h *bySelectorEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
9095
h.enqueue(ctx, evt.ObjectNew, q)
9196
}
9297

93-
func (h *byLabelEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
98+
func (h *bySelectorEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
9499
h.enqueue(ctx, evt.Object, q)
95100
}
96101

97-
func (h *byLabelEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
102+
func (h *bySelectorEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[*unstructured.Unstructured], q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
98103
h.enqueue(ctx, evt.Object, q)
99104
}
100105

101-
func (h *byLabelEventHandler) enqueue(ctx context.Context, obj *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
102-
// Build the template context using the changed related object.
103-
data := map[string]any{
104-
"watchObject": map[string]any{
105-
"name": obj.GetName(),
106-
"namespace": obj.GetNamespace(),
107-
"labels": obj.GetLabels(),
108-
},
109-
}
110-
111-
// Evaluate each label template to build the selector.
112-
matchingLabels := ctrlruntimeclient.MatchingLabels{}
113-
for key, tpl := range h.labelTemplates {
114-
value, err := templating.Render(tpl, data)
115-
if err != nil {
116-
h.log.Warnw("Failed to evaluate byLabel template", "key", key, "template", tpl, "error", err)
117-
return
118-
}
119-
matchingLabels[key] = value
106+
func (h *bySelectorEventHandler) enqueue(ctx context.Context, _ *unstructured.Unstructured, q workqueue.TypedRateLimitingInterface[mcreconcile.Request]) {
107+
selector, err := metav1.LabelSelectorAsSelector(h.labelSelector)
108+
if err != nil {
109+
h.log.Warnw("Failed to convert bySelector selector", "error", err)
110+
return
120111
}
121112

122-
// List primary objects matching the derived label selector.
113+
// List primary objects matching the label selector.
123114
primaryList := &unstructured.UnstructuredList{}
124115
primaryList.SetAPIVersion(h.primaryDummy.GetAPIVersion())
125116
primaryList.SetKind(h.primaryDummy.GetKind() + "List")
126117

127-
if err := h.client.List(ctx, primaryList, matchingLabels); err != nil {
128-
h.log.Warnw("Failed to list primary objects for byLabel watch", "selector", matchingLabels, "error", err)
118+
if err := h.client.List(ctx, primaryList, &ctrlruntimeclient.ListOptions{LabelSelector: selector}); err != nil {
119+
h.log.Warnw("Failed to list primary objects for bySelector watch", "selector", fmt.Sprintf("%v", selector), "error", err)
129120
return
130121
}
131122

sdk/apis/syncagent/v1alpha1/published_resource.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -266,27 +266,27 @@ type RelatedResourceSpec struct {
266266

267267
// RelatedResourceWatch configures how the watch handler maps a changed related resource
268268
// back to its owning primary object.
269-
// Exactly one of ByOwner or ByLabel must be set.
269+
// Exactly one of ByOwner or BySelector must be set.
270+
// +kubebuilder:validation:XValidation:rule="has(self.byOwner) != has(self.bySelector)",message="exactly one of byOwner or bySelector must be set"
270271
type RelatedResourceWatch struct {
271272
// ByOwner configures the watch handler to inspect the OwnerReferences of the changed
272273
// object. When an OwnerReference with the given Kind is found, the referenced owner
273274
// is enqueued as the primary object.
274275
// +optional
275276
ByOwner *RelatedResourceWatchByOwner `json:"byOwner,omitempty"`
276277

277-
// ByLabel configures the watch handler to list primary objects matching a label selector
278-
// derived from the changed object. Each map key is a label key on the primary object;
279-
// each value is a Go template expression evaluated with the changed object available as
280-
// .watchObject (with fields .name, .namespace, .labels).
278+
// BySelector configures the watch handler to list primary objects matching the given label
279+
// selector. When a related object changes, all primary objects matching this selector
280+
// are enqueued for reconciliation.
281281
// +optional
282-
ByLabel map[string]string `json:"byLabel,omitempty"`
282+
BySelector *metav1.LabelSelector `json:"bySelector,omitempty"`
283283
}
284284

285285
// RelatedResourceWatchByOwner configures reverse lookup via OwnerReferences.
286-
type RelatedResourceWatchByOwner struct {
287-
// Kind is the Kind to look for in the OwnerReferences of the changed related object.
288-
Kind string `json:"kind"`
289-
}
286+
// The agent already knows the GVK of the primary object, so no further configuration
287+
// is needed: when a related object changes, its OwnerReferences are inspected for a
288+
// reference whose Kind matches the primary object's Kind.
289+
type RelatedResourceWatchByOwner struct{}
290290

291291
// RelatedResourceProjection describes how the source GVK of a related resource (i.e.
292292
// the GVK on the related resource's origin side) should be modified when an object

sdk/apis/syncagent/v1alpha1/zz_generated.deepcopy.go

Lines changed: 4 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)