diff --git a/analyze/analyze.go b/analyze/analyze.go index 865183a..7a91c06 100644 --- a/analyze/analyze.go +++ b/analyze/analyze.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "os" "path/filepath" "regexp" "strings" @@ -22,7 +21,6 @@ import ( scm_domain "github.com/boostsecurityio/poutine/providers/scm/domain" "github.com/boostsecurityio/poutine/scanner" "github.com/rs/zerolog/log" - "github.com/schollz/progressbar/v3" ) const TEMP_DIR_PREFIX = "poutine-*" @@ -96,6 +94,14 @@ type Analyzer struct { Formatter Formatter Config *models.Config Opa *opa.Opa + Observer ProgressObserver +} + +func (a *Analyzer) observer() ProgressObserver { + if a.Observer != nil { + return a.Observer + } + return noopObserver{} } func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutines *int) ([]*models.PackageInsights, error) { @@ -114,8 +120,9 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine pkgsupplyClient := pkgsupply.NewStaticClient() inventory := scanner.NewInventory(a.Opa, pkgsupplyClient, provider, providerVersion) + obs := a.observer() + obs.OnAnalysisStarted("Discovering repositories") log.Debug().Msgf("Starting repository analysis for organization: %s on %s", org, provider) - bar := a.ProgressBar(0, "Analyzing repositories") var reposWg sync.WaitGroup errChan := make(chan error, 1) @@ -137,23 +144,24 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine } }() + discoveryCompleted := false for repoBatch := range orgReposBatches { if repoBatch.Err != nil { log.Error().Err(repoBatch.Err).Msg("failed to fetch batch of repos, skipping batch") continue } - if repoBatch.TotalCount != 0 { - bar.ChangeMax(repoBatch.TotalCount) + if !discoveryCompleted && repoBatch.TotalCount != 0 { + discoveryCompleted = true + obs.OnDiscoveryCompleted(org, repoBatch.TotalCount) } for _, repo := range repoBatch.Repositories { if a.Config.IgnoreForks && repo.GetIsFork() { - bar.ChangeMax(repoBatch.TotalCount - 1) + obs.OnRepoSkipped(repo.GetRepoIdentifier(), "fork") continue } if repo.GetSize() == 0 { - bar.ChangeMax(repoBatch.TotalCount - 1) - log.Info().Str("repo", repo.GetRepoIdentifier()).Msg("Skipping empty repository") + obs.OnRepoSkipped(repo.GetRepoIdentifier(), "empty") continue } if err := goRoutineLimitSem.Acquire(ctx, 1); err != nil { @@ -166,9 +174,11 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine defer goRoutineLimitSem.Release(1) defer reposWg.Done() repoNameWithOwner := repo.GetRepoIdentifier() + obs.OnRepoStarted(repoNameWithOwner) repoKey, err := a.cloneRepo(ctx, repo.BuildGitURL(a.ScmClient.GetProviderBaseURL()), a.ScmClient.GetToken(), "HEAD") if err != nil { log.Error().Err(err).Str("repo", repoNameWithOwner).Msg("failed to clone repo") + obs.OnRepoError(repoNameWithOwner, err) return } defer a.GitClient.Cleanup(repoKey) @@ -176,12 +186,14 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine pkg, err := a.GeneratePackageInsights(ctx, repoKey, repo, "HEAD") if err != nil { log.Error().Err(err).Str("repo", repoNameWithOwner).Msg("failed to generate package insights") + obs.OnRepoError(repoNameWithOwner, err) return } files, err := a.GitClient.ListFiles(repoKey, []string{".yml", ".yaml"}) if err != nil { log.Error().Err(err).Str("repo", repoNameWithOwner).Msg("failed to list files") + obs.OnRepoError(repoNameWithOwner, err) return } @@ -199,6 +211,7 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine scannedPkg, err := inventory.ScanPackageScanner(ctx, *pkg, memScanner) if err != nil { log.Error().Err(err).Str("repo", repoNameWithOwner).Msg("failed to scan package") + obs.OnRepoError(repoNameWithOwner, err) return } @@ -208,7 +221,7 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine log.Error().Msg("Context canceled while sending package to channel") return } - _ = bar.Add(1) + obs.OnRepoCompleted(repoNameWithOwner, scannedPkg) }(repo) } } @@ -227,12 +240,12 @@ func (a *Analyzer) AnalyzeOrg(ctx context.Context, org string, numberOfGoroutine } } - _ = bar.Finish() - + obs.OnFinalizeStarted(len(scannedPackages)) err = a.finalizeAnalysis(ctx, scannedPackages) if err != nil { return scannedPackages, err } + obs.OnFinalizeCompleted() return scannedPackages, nil } @@ -259,27 +272,28 @@ func (a *Analyzer) AnalyzeStaleBranches(ctx context.Context, repoString string, inventory := scanner.NewInventory(a.Opa, pkgsupplyClient, provider, providerVersion) + obs := a.observer() + obs.OnAnalysisStarted("Cloning repository") log.Debug().Msgf("Starting repository analysis for: %s/%s on %s", org, repoName, provider) - bar := a.ProgressBar(3, "Cloning repository") - _ = bar.RenderBlank() + obs.OnRepoStarted(repoString) repoUrl := repo.BuildGitURL(a.ScmClient.GetProviderBaseURL()) repoKey, err := a.fetchCone(ctx, repoUrl, a.ScmClient.GetToken(), "refs/heads/*:refs/remotes/origin/*", ".github/workflows") if err != nil { + obs.OnRepoError(repoString, err) return nil, fmt.Errorf("failed to fetch cone: %w", err) } defer a.GitClient.Cleanup(repoKey) - bar.Describe("Listing unique workflows") - _ = bar.Add(1) + obs.OnStepCompleted("Listing unique workflows") workflows, err := a.GitClient.GetUniqWorkflowsBranches(ctx, repoKey) if err != nil { + obs.OnRepoError(repoString, err) return nil, fmt.Errorf("failed to get unique workflow: %w", err) } - bar.Describe("Check which workflows match regex: " + regex.String()) - _ = bar.Add(1) + obs.OnStepCompleted("Checking workflows match regex: " + regex.String()) errChan := make(chan error, 1) maxGoroutines := 5 @@ -340,13 +354,14 @@ func (a *Analyzer) AnalyzeStaleBranches(ctx context.Context, repoString string, close(filesChan) wgConsumer.Wait() for err := range errChan { + obs.OnRepoError(repoString, err) return nil, err } - bar.Describe("Scanning package") - _ = bar.Add(1) + obs.OnStepCompleted("Scanning package") pkg, err := a.GeneratePackageInsights(ctx, repoKey, repo, "HEAD") if err != nil { + obs.OnRepoError(repoString, err) return nil, fmt.Errorf("failed to generate package insight: %w", err) } @@ -359,10 +374,13 @@ func (a *Analyzer) AnalyzeStaleBranches(ctx context.Context, repoString string, scannedPackage, err := inventory.ScanPackageScanner(ctx, *pkg, &inventoryScanner) if err != nil { + obs.OnRepoError(repoString, err) return nil, fmt.Errorf("failed to scan package: %w", err) } - _ = bar.Finish() + obs.OnRepoCompleted(repoString, scannedPackage) + + obs.OnFinalizeStarted(1) if *expand { expanded := []results.Finding{} for _, finding := range scannedPackage.FindingsResults.Findings { @@ -400,8 +418,8 @@ func (a *Analyzer) AnalyzeStaleBranches(ctx context.Context, repoString string, if err := a.Formatter.FormatWithPath(ctx, []*models.PackageInsights{scannedPackage}, results); err != nil { return nil, fmt.Errorf("failed to finalize analysis of package: %w", err) } - } + obs.OnFinalizeCompleted() return scannedPackage, nil } @@ -427,26 +445,31 @@ func (a *Analyzer) AnalyzeRepo(ctx context.Context, repoString string, ref strin pkgsupplyClient := pkgsupply.NewStaticClient() inventory := scanner.NewInventory(a.Opa, pkgsupplyClient, provider, providerVersion) + obs := a.observer() + obs.OnAnalysisStarted("Cloning repository") log.Debug().Msgf("Starting repository analysis for: %s/%s on %s", org, repoName, provider) - bar := a.ProgressBar(2, "Cloning repository") - _ = bar.RenderBlank() + obs.OnRepoStarted(repoString) repoKey, err := a.cloneRepo(ctx, repo.BuildGitURL(a.ScmClient.GetProviderBaseURL()), a.ScmClient.GetToken(), ref) if err != nil { + obs.OnRepoError(repoString, err) return nil, err } defer a.GitClient.Cleanup(repoKey) - bar.Describe("Analyzing repository") - _ = bar.Add(1) + obs.OnStepCompleted("Cloned repository") pkg, err := a.GeneratePackageInsights(ctx, repoKey, repo, ref) if err != nil { + obs.OnRepoError(repoString, err) return nil, err } + obs.OnStepCompleted("Generated package insights") + files, err := a.GitClient.ListFiles(repoKey, []string{".yml", ".yaml"}) if err != nil { + obs.OnRepoError(repoString, err) return nil, fmt.Errorf("failed to list files: %w", err) } @@ -463,14 +486,19 @@ func (a *Analyzer) AnalyzeRepo(ctx context.Context, repoString string, ref strin scannedPackage, err := inventory.ScanPackageScanner(ctx, *pkg, memScanner) if err != nil { + obs.OnRepoError(repoString, err) return nil, err } - _ = bar.Finish() + obs.OnStepCompleted("Scanned repository") + obs.OnRepoCompleted(repoString, scannedPackage) + + obs.OnFinalizeStarted(1) err = a.finalizeAnalysis(ctx, []*models.PackageInsights{scannedPackage}) if err != nil { return nil, err } + obs.OnFinalizeCompleted() return scannedPackage, nil } @@ -721,18 +749,3 @@ func (a *Analyzer) cloneRepo(ctx context.Context, gitURL string, token string, r } return key, nil } - -func (a *Analyzer) ProgressBar(maxValue int64, description string) *progressbar.ProgressBar { - if a.Config.Quiet { - return progressbar.DefaultSilent(maxValue, description) - } else { - return progressbar.NewOptions64( - maxValue, - progressbar.OptionSetDescription(description), - progressbar.OptionShowCount(), - progressbar.OptionSetWriter(os.Stderr), - progressbar.OptionClearOnFinish(), - ) - - } -} diff --git a/analyze/analyze_test.go b/analyze/analyze_test.go index 3fe4dc6..dc95a3a 100644 --- a/analyze/analyze_test.go +++ b/analyze/analyze_test.go @@ -2,8 +2,10 @@ package analyze import ( "context" + "errors" "fmt" "strings" + "sync" "testing" "github.com/boostsecurityio/poutine/formatters/noop" @@ -188,3 +190,131 @@ jobs: require.NotNil(t, result) }) } + +// mockObserver records observer events for testing. +type mockObserver struct { + mu sync.Mutex + events []string +} + +func (m *mockObserver) OnAnalysisStarted(description string) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, "analysis_started:"+description) +} +func (m *mockObserver) OnDiscoveryCompleted(org string, totalCount int) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, fmt.Sprintf("discovery_completed:%s:%d", org, totalCount)) +} +func (m *mockObserver) OnRepoStarted(repo string) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, "repo_started:"+repo) +} +func (m *mockObserver) OnRepoCompleted(repo string, _ *models.PackageInsights) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, "repo_completed:"+repo) +} +func (m *mockObserver) OnRepoError(repo string, _ error) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, "repo_error:"+repo) +} +func (m *mockObserver) OnRepoSkipped(repo string, reason string) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, fmt.Sprintf("repo_skipped:%s:%s", repo, reason)) +} +func (m *mockObserver) OnStepCompleted(description string) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, "step_completed:"+description) +} +func (m *mockObserver) OnFinalizeStarted(total int) { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, fmt.Sprintf("finalize_started:%d", total)) +} +func (m *mockObserver) OnFinalizeCompleted() { + m.mu.Lock() + defer m.mu.Unlock() + m.events = append(m.events, "finalize_completed") +} + +func TestProgressObserverNilSafe(t *testing.T) { + ctx := context.Background() + opaClient, err := newTestOpa(ctx) + require.NoError(t, err) + + // Observer is nil — should not panic + analyzer := NewAnalyzer(nil, nil, &noop.Format{}, models.DefaultConfig(), opaClient) + assert.Nil(t, analyzer.Observer) + + // AnalyzeManifest doesn't use observer, but this ensures nil Observer doesn't crash + _, err = analyzer.AnalyzeManifest(ctx, strings.NewReader("on: push\njobs:\n test:\n runs-on: ubuntu-latest\n steps:\n - uses: actions/checkout@v4"), "github-actions") + require.NoError(t, err) +} + +func TestProgressObserverInterface(t *testing.T) { + obs := &mockObserver{} + + // Verify the interface is implemented + var _ ProgressObserver = obs + var _ ProgressObserver = &noopObserver{} + var _ ProgressObserver = &ProgressBarObserver{} + + // Test mock records events + obs.OnDiscoveryCompleted("test-org", 10) + obs.OnRepoStarted("test-org/repo1") + obs.OnRepoCompleted("test-org/repo1", nil) + obs.OnRepoSkipped("test-org/repo2", "fork") + obs.OnRepoError("test-org/repo3", errors.New("clone failed")) + obs.OnStepCompleted("Analyzing repository") + obs.OnFinalizeStarted(1) + obs.OnFinalizeCompleted() + + assert.Equal(t, []string{ + "discovery_completed:test-org:10", + "repo_started:test-org/repo1", + "repo_completed:test-org/repo1", + "repo_skipped:test-org/repo2:fork", + "repo_error:test-org/repo3", + "step_completed:Analyzing repository", + "finalize_started:1", + "finalize_completed", + }, obs.events) +} + +func TestProgressObserverConcurrency(t *testing.T) { + // Exercise the concurrent methods on both observer implementations + // to verify no data races. Run with: go test -race + observers := []ProgressObserver{ + &mockObserver{}, + NewProgressBarObserver(true), + } + + for _, obs := range observers { + obs.OnDiscoveryCompleted("org", 100) + + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + repo := fmt.Sprintf("org/repo-%d", n) + obs.OnRepoStarted(repo) + if n%5 == 0 { + obs.OnRepoError(repo, errors.New("error")) + } else { + obs.OnRepoCompleted(repo, nil) + } + }(i) + } + wg.Wait() + + obs.OnFinalizeStarted(40) + obs.OnFinalizeCompleted() + } +} diff --git a/analyze/observer.go b/analyze/observer.go new file mode 100644 index 0000000..199eb91 --- /dev/null +++ b/analyze/observer.go @@ -0,0 +1,82 @@ +package analyze + +import "github.com/boostsecurityio/poutine/models" + +// ProgressObserver receives structured progress events during analysis. +// All methods must be non-blocking — implementations should not perform +// expensive I/O or hold locks for extended periods. +// +// # Concurrency +// +// During AnalyzeOrg, methods are called from two contexts: +// +// Main goroutine (sequential, never concurrent with each other): +// - OnAnalysisStarted +// - OnDiscoveryCompleted +// - OnRepoSkipped +// - OnFinalizeStarted +// - OnFinalizeCompleted +// +// Worker goroutines (called concurrently from multiple goroutines): +// - OnRepoStarted +// - OnRepoCompleted +// - OnRepoError +// +// Implementations MUST ensure that the worker-goroutine methods are +// goroutine-safe. Note that worker methods may also run concurrently +// with the main-goroutine methods above. +// +// During AnalyzeRepo and AnalyzeStaleBranches all methods are called +// from a single goroutine. +type ProgressObserver interface { + // OnAnalysisStarted is called from the main goroutine at the very + // beginning of any analysis, before any work begins. Useful for + // rendering an initial status indicator while discovery or cloning runs. + OnAnalysisStarted(description string) + + // OnDiscoveryCompleted is called from the main goroutine when the + // total repo count is known (first batch with TotalCount > 0). + OnDiscoveryCompleted(org string, totalCount int) + + // OnRepoStarted is called from a worker goroutine when analysis + // of a repo begins. Must be goroutine-safe. + OnRepoStarted(repo string) + + // OnRepoCompleted is called from a worker goroutine when a repo + // finishes successfully. Must be goroutine-safe. + OnRepoCompleted(repo string, pkg *models.PackageInsights) + + // OnRepoError is called from a worker goroutine when a repo + // analysis fails (non-fatal). Must be goroutine-safe. + OnRepoError(repo string, err error) + + // OnRepoSkipped is called from the main goroutine when a repo is + // skipped (fork, empty, etc.). + OnRepoSkipped(repo string, reason string) + + // OnStepCompleted is called when a sub-step within a single-repo + // analysis completes (e.g. "Cloning repository", "Analyzing repository"). + // Used by AnalyzeRepo and AnalyzeStaleBranches for step-level progress. + // Called from the main goroutine. + OnStepCompleted(description string) + + // OnFinalizeStarted is called from the main goroutine when the + // formatting/output phase begins, after all repos are processed. + OnFinalizeStarted(totalPackages int) + + // OnFinalizeCompleted is called from the main goroutine when + // formatting is done. + OnFinalizeCompleted() +} + +type noopObserver struct{} + +func (noopObserver) OnAnalysisStarted(string) {} +func (noopObserver) OnDiscoveryCompleted(string, int) {} +func (noopObserver) OnRepoStarted(string) {} +func (noopObserver) OnRepoCompleted(string, *models.PackageInsights) {} +func (noopObserver) OnRepoError(string, error) {} +func (noopObserver) OnRepoSkipped(string, string) {} +func (noopObserver) OnStepCompleted(string) {} +func (noopObserver) OnFinalizeStarted(int) {} +func (noopObserver) OnFinalizeCompleted() {} diff --git a/analyze/progress_bar_observer.go b/analyze/progress_bar_observer.go new file mode 100644 index 0000000..1d149dd --- /dev/null +++ b/analyze/progress_bar_observer.go @@ -0,0 +1,96 @@ +package analyze + +import ( + "os" + + "github.com/boostsecurityio/poutine/models" + "github.com/schollz/progressbar/v3" +) + +// ProgressBarObserver implements ProgressObserver by rendering a CLI progress bar. +// For org analysis it shows a repo-count bar (created on OnDiscoveryCompleted). +// For single-repo analysis it shows a step-level bar (created on first OnStepCompleted). +type ProgressBarObserver struct { + bar *progressbar.ProgressBar + quiet bool + stepMode bool // true when bar is step-driven (single-repo analysis) +} + +func NewProgressBarObserver(quiet bool) *ProgressBarObserver { + return &ProgressBarObserver{quiet: quiet} +} + +func (o *ProgressBarObserver) newBar(barMax int64, description string) *progressbar.ProgressBar { + if o.quiet { + return progressbar.DefaultSilent(barMax, description) + } + return progressbar.NewOptions64(barMax, + progressbar.OptionSetDescription(description), + progressbar.OptionShowCount(), + progressbar.OptionSetWriter(os.Stderr), + progressbar.OptionClearOnFinish(), + ) +} + +func (o *ProgressBarObserver) OnAnalysisStarted(description string) { + // Create an indeterminate spinner bar (max=-1) to show activity + // before the total count is known or the first step completes. + o.bar = o.newBar(-1, description) + _ = o.bar.RenderBlank() +} + +func (o *ProgressBarObserver) OnDiscoveryCompleted(_ string, totalCount int) { + // Finish the spinner before replacing with a counting bar. + if o.bar != nil { + _ = o.bar.Finish() + } + o.bar = o.newBar(int64(totalCount), "Analyzing repositories") +} + +func (o *ProgressBarObserver) OnRepoStarted(_ string) {} + +func (o *ProgressBarObserver) OnRepoCompleted(_ string, _ *models.PackageInsights) { + if o.bar != nil && !o.stepMode { + _ = o.bar.Add(1) + } +} + +func (o *ProgressBarObserver) OnRepoError(_ string, _ error) { + if o.bar != nil && !o.stepMode { + _ = o.bar.Add(1) + } +} + +func (o *ProgressBarObserver) OnRepoSkipped(_ string, _ string) { + if o.bar != nil { + if newMax := o.bar.GetMax() - 1; newMax >= 0 { + o.bar.ChangeMax(newMax) + } + } +} + +func (o *ProgressBarObserver) OnStepCompleted(description string) { + o.stepMode = true + if o.bar != nil && o.bar.GetMax64() == -1 { + // Finish the spinner, then create a step bar. + _ = o.bar.Finish() + o.bar = nil + } + if o.bar == nil { + // First step — create the bar. + o.bar = o.newBar(1, description) + } else { + // Grow the bar to accommodate each new step, then advance. + o.bar.ChangeMax(o.bar.GetMax() + 1) + o.bar.Describe(description) + } + _ = o.bar.Add(1) +} + +func (o *ProgressBarObserver) OnFinalizeStarted(_ int) { + if o.bar != nil { + _ = o.bar.Finish() + } +} + +func (o *ProgressBarObserver) OnFinalizeCompleted() {} diff --git a/cmd/root.go b/cmd/root.go index 46e72fd..c9bf586 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -219,6 +219,7 @@ func GetAnalyzer(ctx context.Context, command string) (*analyze.Analyzer, error) gitClient := gitops.NewGitClient(nil) analyzer := analyze.NewAnalyzer(scmClient, gitClient, formatter, config, opaClient) + analyzer.Observer = analyze.NewProgressBarObserver(config.Quiet) return analyzer, nil } @@ -239,6 +240,7 @@ func GetAnalyzerWithConfig(ctx context.Context, command, scmProvider, scmBaseURL gitClient := gitops.NewGitClient(nil) analyzer := analyze.NewAnalyzer(scmClient, gitClient, formatter, cfg, opaClient) + analyzer.Observer = analyze.NewProgressBarObserver(cfg.Quiet) return analyzer, nil }