Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
4aaaa56
Handle regions that are not enabled by default more gracefully.
dylanratcliffe Feb 3, 2026
1dd64cd
Revert "Handle regions that are not enabled by default more gracefully."
dylanratcliffe Feb 3, 2026
7531d18
ENG-2285 Update to sigs.k8s.io/controller-runtime v0.23.1 (#3706)
getinnocuous Feb 4, 2026
c52b7ab
Info log output stream (#3779)
tphoney Feb 4, 2026
03a4175
Cache "Not found" results in aws-source (#3661)
tphoney Feb 4, 2026
26cd002
Integrate the riverui in area51 (#3745)
tphoney Feb 4, 2026
c71ee89
maint, sources remove Cache() helper functions idiom (#3760)
tphoney Feb 4, 2026
1b86e3b
1password retries (#3747)
DavidS-ovm Feb 4, 2026
8f8658d
Fixed mappings for a number of broken adapters (#3782)
dylanratcliffe Feb 4, 2026
c4d8e46
Handle regions that are not enabled by default more gracefully. (#3781)
DavidS-ovm Feb 5, 2026
f50a06d
Improve mapping UI (#3795)
dylanratcliffe Feb 5, 2026
59a3a40
Fixed numeric project mapping (#3794)
dylanratcliffe Feb 5, 2026
2a1d699
fix(deps): update github.com/hashicorp/terraform-config-inspect diges…
tphoney Feb 6, 2026
8216dd4
chore(deps): update terraform (#3814)
renovate[bot] Feb 6, 2026
cf0e10f
fix(deps): update go (#3815)
DavidS-ovm Feb 6, 2026
bfd6d7e
Eng 2242 Azure multi-scope adapter implementation (#3790)
Lionel-Wilson Feb 6, 2026
29914ac
Source config error handling (#3803)
DavidS-ovm Feb 6, 2026
9e28be4
fix(deps): update google.golang.org/genproto/googleapis/rpc digest to…
DavidS-ovm Feb 6, 2026
b0690af
several small fixes (#3816)
DavidS-ovm Feb 6, 2026
acd4153
more source config error handling (#3821)
DavidS-ovm Feb 9, 2026
2c79382
Do start/end-change processing in the background (backend) (#3710)
DavidS-ovm Feb 9, 2026
e6e9557
K8s-source chart probes (#3826)
tphoney Feb 9, 2026
eab06cf
aws-source: fix kms-grant issue with service principal (#3831)
carabasdaniel Feb 9, 2026
75f13ac
Run go mod tidy
actions-user Feb 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
58 changes: 29 additions & 29 deletions .terraform.lock.hcl

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 57 additions & 27 deletions aws-source/adapters/adapterhelpers_always_get_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"

"buf.build/go/protovalidate"
Expand Down Expand Up @@ -80,7 +80,7 @@ type AlwaysGetAdapter[ListInput InputType, ListOutput OutputType, GetInput Input
ListFuncOutputMapper func(output ListOutput, input ListInput) ([]GetInput, error)

CacheDuration time.Duration // How long to cache items for
cache sdpcache.Cache // The cache for this adapter (set during creation, can be nil for tests)
cache sdpcache.Cache // This is mandatory
}

func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) cacheDuration() time.Duration {
Expand All @@ -91,21 +91,6 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
return s.CacheDuration
}

var (
noOpCacheAlwaysGetOnce sync.Once
noOpCacheAlwaysGet sdpcache.Cache
)

func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) Cache() sdpcache.Cache {
if s.cache == nil {
noOpCacheAlwaysGetOnce.Do(func() {
noOpCacheAlwaysGet = sdpcache.NewNoOpCache()
})
return noOpCacheAlwaysGet
}
return s.cache
}

// Validate Checks that the adapter has been set up correctly
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) Validate() error {
if !s.DisableList {
Expand Down Expand Up @@ -168,7 +153,7 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
return nil, WrapAWSError(err)
}

cacheHit, ck, cachedItems, qErr, done := s.Cache().Lookup(ctx, s.Name(), sdp.QueryMethod_GET, scope, s.ItemType, query, ignoreCache)
cacheHit, ck, cachedItems, qErr, done := s.cache.Lookup(ctx, s.Name(), sdp.QueryMethod_GET, scope, s.ItemType, query, ignoreCache)
defer done()
if qErr != nil {
return nil, qErr
Expand All @@ -187,12 +172,12 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
if err != nil {
err := WrapAWSError(err)
if !CanRetry(err) {
s.Cache().StoreError(ctx, err, s.cacheDuration(), ck)
s.cache.StoreError(ctx, err, s.cacheDuration(), ck)
}
return nil, err
}

s.Cache().StoreItem(ctx, item, s.cacheDuration(), ck)
s.cache.StoreItem(ctx, item, s.cacheDuration(), ck)
return item, nil
}

Expand All @@ -218,9 +203,13 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
return
}

cacheHit, ck, cachedItems, qErr, done := s.Cache().Lookup(ctx, s.Name(), sdp.QueryMethod_LIST, scope, s.ItemType, "", ignoreCache)
cacheHit, ck, cachedItems, qErr, done := s.cache.Lookup(ctx, s.Name(), sdp.QueryMethod_LIST, scope, s.ItemType, "", ignoreCache)
defer done()
if qErr != nil {
// For better semantics, convert cached NOTFOUND into empty result
if qErr.GetErrorType() == sdp.QueryError_NOTFOUND {
return
}
stream.SendError(qErr)
return
}
Expand All @@ -238,30 +227,53 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
paginator := s.ListFuncPaginatorBuilder(s.Client, input)
var newGetInputs []GetInput
p := pool.New().WithContext(ctx).WithMaxGoroutines(s.MaxParallel.Value())

// Track whether any items were found and if we had an error
var itemsSent atomic.Int64
var hadError atomic.Bool

defer func() {
// Always wait for everything to be completed before returning
err := p.Wait()
if err != nil {
sentry.CaptureException(err)
}

// Only cache not-found when no items were found AND no error occurred
// If we had an error, that error is already cached, don't overwrite it
shouldCacheNotFound := itemsSent.Load() == 0 && !hadError.Load()

if shouldCacheNotFound {
notFoundErr := &sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: fmt.Sprintf("no %s found in scope %s", s.ItemType, scope),
Scope: scope,
SourceName: s.Name(),
ItemType: s.ItemType,
ResponderName: s.Name(),
}
s.cache.StoreError(ctx, notFoundErr, s.cacheDuration(), ck)
}
}()

for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
hadError.Store(true)
err := WrapAWSError(err)
if !CanRetry(err) {
s.Cache().StoreError(ctx, err, s.cacheDuration(), ck)
s.cache.StoreError(ctx, err, s.cacheDuration(), ck)
}
stream.SendError(err)
return
}

newGetInputs, err = s.ListFuncOutputMapper(output, input)
if err != nil {
hadError.Store(true)
err := WrapAWSError(err)
if !CanRetry(err) {
s.Cache().StoreError(ctx, err, s.cacheDuration(), ck)
s.cache.StoreError(ctx, err, s.cacheDuration(), ck)
}
stream.SendError(err)
return
Expand All @@ -276,10 +288,13 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
if err != nil {
// Don't cache individual errors as they are cheap to re-run
stream.SendError(WrapAWSError(err))
// Mark that we had an error so we don't cache NOTFOUND
hadError.Store(true)
}
if item != nil {
s.Cache().StoreItem(ctx, item, s.cacheDuration(), ck)
s.cache.StoreItem(ctx, item, s.cacheDuration(), ck)
stream.SendItem(item)
itemsSent.Add(1)
}

return nil
Expand Down Expand Up @@ -322,9 +337,13 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
// SearchCustom Searches using custom mapping logic. The SearchInputMapper is
// used to create an input for ListFunc, at which point the usual logic is used
func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStruct, Options]) SearchCustom(ctx context.Context, scope string, query string, ignoreCache bool, stream discovery.QueryResultStream) {
cacheHit, ck, cachedItems, qErr, done := s.Cache().Lookup(ctx, s.Name(), sdp.QueryMethod_SEARCH, scope, s.ItemType, query, ignoreCache)
cacheHit, ck, cachedItems, qErr, done := s.cache.Lookup(ctx, s.Name(), sdp.QueryMethod_SEARCH, scope, s.ItemType, query, ignoreCache)
defer done()
if qErr != nil {
// For better semantics, convert cached NOTFOUND into empty result
if qErr.GetErrorType() == sdp.QueryError_NOTFOUND {
return
}
stream.SendError(qErr)
return
}
Expand Down Expand Up @@ -356,15 +375,26 @@ func (s *AlwaysGetAdapter[ListInput, ListOutput, GetInput, GetOutput, ClientStru
if err != nil {
err := WrapAWSError(err)
if !CanRetry(err) {
s.Cache().StoreError(ctx, err, s.cacheDuration(), ck)
s.cache.StoreError(ctx, err, s.cacheDuration(), ck)
}
stream.SendError(err)
return
}

if item != nil {
s.Cache().StoreItem(ctx, item, s.cacheDuration(), ck)
s.cache.StoreItem(ctx, item, s.cacheDuration(), ck)
stream.SendItem(item)
} else {
// Cache not-found when item is nil
notFoundErr := &sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: fmt.Sprintf("%s not found for search query '%s'", s.ItemType, query),
Scope: scope,
SourceName: s.Name(),
ItemType: s.ItemType,
ResponderName: s.Name(),
}
s.cache.StoreError(ctx, notFoundErr, s.cacheDuration(), ck)
}
} else {
stream.SendError(errors.New("SearchCustom called without SearchInputMapper or SearchGetInputMapper"))
Expand Down
11 changes: 10 additions & 1 deletion aws-source/adapters/adapterhelpers_always_get_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestAlwaysGetSourceGet(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return ""
},
cache: sdpcache.NewNoOpCache(),
}

_, err := lgs.Get(context.Background(), "foo.bar", "", false)
Expand Down Expand Up @@ -108,6 +109,7 @@ func TestAlwaysGetSourceGet(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return ""
},
cache: sdpcache.NewNoOpCache(),
}

_, err := lgs.Get(context.Background(), "foo.bar", "", false)
Expand Down Expand Up @@ -144,6 +146,7 @@ func TestAlwaysGetSourceList(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return ""
},
cache: sdpcache.NewNoOpCache(),
}

stream := discovery.NewRecordingQueryResultStream()
Expand Down Expand Up @@ -183,6 +186,7 @@ func TestAlwaysGetSourceList(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return ""
},
cache: sdpcache.NewNoOpCache(),
}

stream := discovery.NewRecordingQueryResultStream()
Expand Down Expand Up @@ -228,6 +232,7 @@ func TestAlwaysGetSourceList(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return ""
},
cache: sdpcache.NewNoOpCache(),
}

stream := discovery.NewRecordingQueryResultStream()
Expand Down Expand Up @@ -275,6 +280,7 @@ func TestAlwaysGetSourceSearch(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return scope + "." + query
},
cache: sdpcache.NewNoOpCache(),
}

t.Run("bad ARN", func(t *testing.T) {
Expand Down Expand Up @@ -338,6 +344,7 @@ func TestAlwaysGetSourceSearch(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return scope + "." + query
},
cache: sdpcache.NewNoOpCache(),
}

t.Run("ARN", func(t *testing.T) {
Expand Down Expand Up @@ -400,6 +407,7 @@ func TestAlwaysGetSourceSearch(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return ""
},
cache: sdpcache.NewNoOpCache(),
}

stream := discovery.NewRecordingQueryResultStream()
Expand Down Expand Up @@ -448,6 +456,7 @@ func TestAlwaysGetSourceSearch(t *testing.T) {
GetInputMapper: func(scope, query string) string {
return scope + "." + query
},
cache: sdpcache.NewNoOpCache(),
}

stream := discovery.NewRecordingQueryResultStream()
Expand Down Expand Up @@ -475,7 +484,7 @@ func TestAlwaysGetSourceCaching(t *testing.T) {
Region: "eu-west-2",
Client: struct{}{},
ListInput: "",
cache: sdpcache.NewCache(ctx),
cache: sdpcache.NewMemoryCache(),
ListFuncPaginatorBuilder: func(client struct{}, input string) Paginator[string, struct{}] {
return &TestPaginator{
DataFunc: func() string {
Expand Down
Loading