Skip to content

Commit d58fc65

Browse files
authored
Merge pull request #771 from jetstack/fetch-ctx
Add context to DataGatherer.Fetch
2 parents b9e1889 + 7e9a310 commit d58fc65

File tree

9 files changed

+21
-22
lines changed

9 files changed

+21
-22
lines changed

pkg/agent/dummy_data_gatherer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func (g *dummyDataGatherer) WaitForCacheSync(ctx context.Context) error {
3939
return nil
4040
}
4141

42-
func (c *dummyDataGatherer) Fetch() (any, int, error) {
42+
func (c *dummyDataGatherer) Fetch(ctx context.Context) (any, int, error) {
4343
var err error
4444
if c.attemptNumber < c.FailedAttempts {
4545
err = fmt.Errorf("First %d attempts will fail", c.FailedAttempts)

pkg/agent/run.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ const schemaVersion string = "v2.0.0"
5151

5252
// Run starts the agent process
5353
func Run(cmd *cobra.Command, args []string) (returnErr error) {
54-
ctx, cancel := context.WithCancel(cmd.Context())
54+
baseCtx, cancel := context.WithCancel(cmd.Context())
5555
defer cancel()
56-
log := klog.FromContext(ctx).WithName("Run")
56+
log := klog.FromContext(baseCtx).WithName("Run")
5757

5858
log.Info("Starting", "version", version.PreflightVersion, "commit", version.Commit)
5959

@@ -78,7 +78,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
7878
return fmt.Errorf("While evaluating configuration: %v", err)
7979
}
8080

81-
group, gctx := errgroup.WithContext(ctx)
81+
group, gctx := errgroup.WithContext(baseCtx)
8282
defer func() {
8383
cancel()
8484
if groupErr := group.Wait(); groupErr != nil {
@@ -123,13 +123,14 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
123123
})
124124

125125
group.Go(func() error {
126+
listenCtx := klog.NewContext(gctx, log)
126127
err := listenAndServe(
127-
klog.NewContext(gctx, log),
128+
listenCtx,
128129
&http.Server{
129130
Addr: serverAddress,
130131
Handler: server,
131132
BaseContext: func(_ net.Listener) context.Context {
132-
return gctx
133+
return listenCtx
133134
},
134135
},
135136
)
@@ -239,7 +240,7 @@ func Run(cmd *cobra.Command, args []string) (returnErr error) {
239240
// be cancelled, which will cause this blocking loop to exit
240241
// instead of waiting for the time period.
241242
for {
242-
if err := gatherAndOutputData(klog.NewContext(ctx, log), eventf, config, preflightClient, dataGatherers); err != nil {
243+
if err := gatherAndOutputData(gctx, eventf, config, preflightClient, dataGatherers); err != nil {
243244
return err
244245
}
245246

@@ -316,7 +317,7 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf
316317
}
317318
} else {
318319
var err error
319-
readings, err = gatherData(klog.NewContext(ctx, log), config, dataGatherers)
320+
readings, err = gatherData(ctx, config, dataGatherers)
320321
if err != nil {
321322
return err
322323
}
@@ -338,7 +339,7 @@ func gatherAndOutputData(ctx context.Context, eventf Eventf, config CombinedConf
338339
postCtx, cancel := context.WithTimeout(ctx, config.BackoffMaxTime)
339340
defer cancel()
340341

341-
return struct{}{}, postData(klog.NewContext(postCtx, log), config, preflightClient, readings)
342+
return struct{}{}, postData(postCtx, config, preflightClient, readings)
342343
}
343344

344345
group.Go(func() error {
@@ -361,7 +362,7 @@ func gatherData(ctx context.Context, config CombinedConfig, dataGatherers map[st
361362

362363
var dgError *multierror.Error
363364
for k, dg := range dataGatherers {
364-
dgData, count, err := dg.Fetch()
365+
dgData, count, err := dg.Fetch(ctx)
365366
if err != nil {
366367
dgError = multierror.Append(dgError, fmt.Errorf("error in datagatherer %s: %w", k, err))
367368

@@ -406,7 +407,6 @@ func gatherData(ctx context.Context, config CombinedConfig, dataGatherers map[st
406407

407408
func postData(ctx context.Context, config CombinedConfig, preflightClient client.Client, readings []*api.DataReading) error {
408409
log := klog.FromContext(ctx).WithName("postData")
409-
ctx = klog.NewContext(ctx, log)
410410
err := preflightClient.PostDataReadingsWithOptions(ctx, readings, client.Options{
411411
ClusterName: config.ClusterName,
412412
ClusterDescription: config.ClusterDescription,

pkg/datagatherer/datagatherer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ type DataGatherer interface {
1414
// Fetch retrieves data.
1515
// count is the number of items that were discovered. A negative count means the number
1616
// of items was indeterminate.
17-
Fetch() (data any, count int, err error)
17+
Fetch(ctx context.Context) (data any, count int, err error)
1818
// Run starts the data gatherer's informers for resource collection.
1919
// Returns error if the data gatherer informer wasn't initialized
2020
Run(ctx context.Context) error

pkg/datagatherer/k8sdiscovery/discovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ func (g *DataGathererDiscovery) WaitForCacheSync(ctx context.Context) error {
7676
}
7777

7878
// Fetch will fetch discovery data from the apiserver, or return an error
79-
func (g *DataGathererDiscovery) Fetch() (any, int, error) {
79+
func (g *DataGathererDiscovery) Fetch(ctx context.Context) (any, int, error) {
8080
data, err := g.cl.ServerVersion()
8181
if err != nil {
8282
return nil, -1, fmt.Errorf("failed to get server version: %v", err)

pkg/datagatherer/k8sdynamic/dynamic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,7 +385,7 @@ func (g *DataGathererDynamic) WaitForCacheSync(ctx context.Context) error {
385385

386386
// Fetch will fetch the requested data from the apiserver, or return an error
387387
// if fetching the data fails.
388-
func (g *DataGathererDynamic) Fetch() (any, int, error) {
388+
func (g *DataGathererDynamic) Fetch(ctx context.Context) (any, int, error) {
389389
if g.groupVersionResource.String() == "" {
390390
return nil, -1, fmt.Errorf("resource type must be specified")
391391
}

pkg/datagatherer/k8sdynamic/dynamic_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ func sortGatheredResources(list []*api.GatheredResource) {
117117

118118
func TestNewDataGathererWithClientAndDynamicInformer(t *testing.T) {
119119
ctx := t.Context()
120+
120121
config := ConfigDynamic{
121122
ExcludeNamespaces: []string{"kube-system"},
122123
GroupVersionResource: schema.GroupVersionResource{Group: "foobar", Version: "v1", Resource: "foos"},
@@ -748,7 +749,7 @@ func TestDynamicGatherer_Fetch(t *testing.T) {
748749
if waitTimeout(&wg, 30*time.Second) {
749750
t.Fatalf("unexpected timeout")
750751
}
751-
res, expectCount, err := dynamiDg.Fetch()
752+
res, expectCount, err := dynamiDg.Fetch(ctx)
752753
if err != nil && !tc.err {
753754
t.Errorf("expected no error but got: %v", err)
754755
}
@@ -1061,7 +1062,7 @@ func TestDynamicGathererNativeResources_Fetch(t *testing.T) {
10611062
if waitTimeout(&wg, 5*time.Second) {
10621063
t.Fatalf("unexpected timeout")
10631064
}
1064-
rawRes, count, err := dynamiDg.Fetch()
1065+
rawRes, count, err := dynamiDg.Fetch(ctx)
10651066
if tc.err {
10661067
require.Error(t, err)
10671068
} else {

pkg/datagatherer/local/local.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func (g *DataGatherer) WaitForCacheSync(ctx context.Context) error {
4949
}
5050

5151
// Fetch loads and returns the data from the LocalDatagatherer's dataPath
52-
func (g *DataGatherer) Fetch() (any, int, error) {
52+
func (g *DataGatherer) Fetch(ctx context.Context) (any, int, error) {
5353
dataBytes, err := os.ReadFile(g.dataPath)
5454
if err != nil {
5555
return nil, -1, err

pkg/datagatherer/oidc/oidc.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,7 @@ func (g *DataGathererOIDC) WaitForCacheSync(ctx context.Context) error {
6666
}
6767

6868
// Fetch will fetch the OIDC discovery document and JWKS from the cluster API server.
69-
func (g *DataGathererOIDC) Fetch() (any, int, error) {
70-
ctx := context.Background()
71-
69+
func (g *DataGathererOIDC) Fetch(ctx context.Context) (any, int, error) {
7270
oidcResponse, oidcErr := g.fetchOIDCConfig(ctx)
7371
jwksResponse, jwksErr := g.fetchJWKS(ctx)
7472

pkg/datagatherer/oidc/oidc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func TestFetch_Success(t *testing.T) {
5151
rc := makeRESTClient(t, ts)
5252
g := &DataGathererOIDC{cl: rc}
5353

54-
anyRes, count, err := g.Fetch()
54+
anyRes, count, err := g.Fetch(t.Context())
5555
require.NoError(t, err)
5656
require.Equal(t, 1, count)
5757

@@ -197,7 +197,7 @@ func TestFetch_Errors(t *testing.T) {
197197
rc := makeRESTClient(t, ts)
198198
g := &DataGathererOIDC{cl: rc}
199199

200-
anyRes, count, err := g.Fetch()
200+
anyRes, count, err := g.Fetch(t.Context())
201201
require.NoError(t, err)
202202
require.Equal(t, 1, count)
203203

0 commit comments

Comments
 (0)