22import logging
33import os
44from collections import namedtuple
5- from typing import Optional , List , Set , Dict , Union , Any
5+ from datetime import datetime
6+ from typing import Optional , List , Set , Dict , Union , Any , Tuple
67
78import click
89import coloredlogs
910import yaml
10- from github import Github , UnknownObjectException
11+ from github import Github , UnknownObjectException , GitRelease
1112from github .Organization import Organization
1213from github .PaginatedList import PaginatedList
14+ from github .Repository import Repository
1315
1416from gha_cli .scanner import Org , print_orgs_as_csvs
1517
1820
1921ActionVersion = namedtuple ("ActionVersion" , ["name" , "current" , "latest" ])
2022
21- FLAG_COMPARE_EXACT_VERSION = False
22-
23-
24- def compare_versions (orig_v1 : str , orig_v2 : str , major_only : bool ) -> int :
25- """Compare two versions, return 1 if v1 > v2, 0 if v1 == v2, -1 if v1 < v2"""
26- if orig_v1 .startswith ("v" ):
27- orig_v1 = orig_v1 [1 :]
28- if orig_v2 .startswith ("v" ):
29- orig_v2 = orig_v2 [1 :]
30- v1 = orig_v1 .split ("." )
31- v2 = orig_v2 .split ("." )
32- if major_only :
33- v1 = [v1 [0 ]]
34- v2 = [v2 [0 ]]
35- try :
36- compare_count = max (len (v1 ), len (v2 )) if FLAG_COMPARE_EXACT_VERSION else 1
37- for i in range (compare_count ):
38- v1_i = int (v1 [i ]) if i < len (v1 ) else 0
39- v2_i = int (v2 [i ]) if i < len (v2 ) else 0
40- if v1_i > v2_i :
41- return 1
42- if v1_i < v2_i :
43- return - 1
44- except ValueError :
45- logging .warning (f"Could not compare versions { orig_v1 } and { orig_v2 } " )
46- return 0
47-
48-
49- def _fix_version (tag_name : str , major_only : bool ) -> str :
50- if major_only :
51- return tag_name .split ("." )[0 ]
52- return tag_name
23+
24+ def _is_sha (current_version : str ) -> bool :
25+ """Check if the current version is a SHA (40 characters long)"""
26+ return len (current_version ) == 40 and all (c in "0123456789abcdef" for c in current_version .lower ())
5327
5428
5529class GithubActionsTools (object ):
5630 _wf_cache : dict [str , dict [str , Any ]] = dict () # repo_name -> [path -> workflow/yaml]
57- actions_latest_release : dict [str , str ] = dict () # action_name@current_release -> latest_release_tag
31+ __actions_latest_release : dict [str , Tuple [ str , datetime ] ] = dict () # action_name@current_release -> latest_release_tag
5832
59- def __init__ (self , github_token : str ):
33+ def __init__ (self , github_token : str , update_major_version_only : bool = False ):
6034 self .client = Github (login_or_token = github_token )
35+ self ._update_major_version_only = update_major_version_only
36+
37+ def _fix_version (self , tag_name : str ) -> str :
38+ if self ._update_major_version_only :
39+ return tag_name .split ("." )[0 ]
40+ return tag_name
41+
42+ def _compare_versions (self , orig_v1 : str , orig_v2 : str ) -> int :
43+ """Compare two versions, return 1 if v1 > v2, 0 if v1 == v2, -1 if v1 < v2"""
44+ if orig_v1 .startswith ("v" ):
45+ orig_v1 = orig_v1 [1 :]
46+ if orig_v2 .startswith ("v" ):
47+ orig_v2 = orig_v2 [1 :]
48+ v1 = orig_v1 .split ("." )
49+ v2 = orig_v2 .split ("." )
50+ if self ._update_major_version_only :
51+ v1 = [v1 [0 ]]
52+ v2 = [v2 [0 ]]
53+ compare_count = max (len (v1 ), len (v2 ))
54+ try :
55+ for i in range (compare_count ):
56+ v1_i = int (v1 [i ]) if i < len (v1 ) else 0
57+ v2_i = int (v2 [i ]) if i < len (v2 ) else 0
58+ if v1_i > v2_i :
59+ return 1
60+ if v1_i < v2_i :
61+ return - 1
62+ except ValueError :
63+ logging .warning (f"Could not compare versions { orig_v1 } and { orig_v2 } " )
64+ return 0
65+
66+ def get_action_latest_release (self , uses_tag_value : str ) -> Optional [str ]:
67+ """Check whether an action has an update, and return the latest version if it does syntax for uses:
68+ https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_iduses
69+ """
70+ if "@" not in uses_tag_value :
71+ return None
72+ action_name , current_version = uses_tag_value .split ("@" )
73+ if action_name in self .__actions_latest_release :
74+ latest_release = self .__actions_latest_release [action_name ]
75+ logging .debug (f"Found in cache { action_name } : { latest_release } " )
76+ if _is_sha (current_version ):
77+ logging .debug (f"Current version for { action_name } is a SHA: { current_version } , checking whether latest release is newer" )
78+ if latest_release [1 ] > datetime .now ():
79+ return latest_release [0 ]
80+ return latest_release if self ._compare_versions (latest_release [0 ], current_version ) > 0 else None
81+
82+ logging .debug (f"Checking for updates for { action_name } @{ current_version } : Getting repo { action_name } " )
83+ repo : Repository = self .client .get_repo (action_name )
84+ logging .info (f"Getting latest release for repository: { action_name } " )
85+ latest_release : GitRelease
86+ try :
87+ latest_release = repo .get_latest_release ()
88+ if latest_release is None :
89+ logging .warning (f"No latest release found for repository: { action_name } " )
90+ return None
91+ except UnknownObjectException :
92+ logging .warning (f"No releases found for repository: { action_name } " )
93+
94+ if _is_sha (current_version ):
95+ logging .debug (
96+ f"Current version for { action_name } is a SHA: { current_version } , checking whether latest release is newer" )
97+ current_version_commit = repo .get_commit (current_version )
98+ if latest_release .last_modified_datetime > current_version_commit .last_modified_datetime :
99+ self .__actions_latest_release [action_name ] = self ._fix_version (latest_release .tag_name ), latest_release .last_modified_datetime
100+ return latest_release .tag_name
101+ if self ._compare_versions (latest_release .tag_name , current_version ) > 0 :
102+ self .__actions_latest_release [action_name ] = self ._fix_version (latest_release .tag_name ), latest_release .last_modified_datetime
103+ return latest_release .tag_name
104+ return None
61105
62106 @staticmethod
63107 def is_local_repo (repo_name : str ) -> bool :
@@ -69,7 +113,7 @@ def list_full_paths(path: str) -> set[str]:
69113 return set ()
70114 return {os .path .join (path , file ) for file in os .listdir (path ) if file .endswith ((".yml" , ".yaml" ))}
71115
72- def get_workflow_actions (self , repo_name : str , workflow_path : str ) -> Set [str ]:
116+ def get_workflow_action_names (self , repo_name : str , workflow_path : str ) -> Set [str ]:
73117 workflow_content = self ._get_workflow_file_content (repo_name , workflow_path )
74118 workflow = yaml .load (workflow_content , Loader = yaml .CLoader )
75119 res = set ()
@@ -79,45 +123,27 @@ def get_workflow_actions(self, repo_name: str, workflow_path: str) -> Set[str]:
79123 res .add (step ["uses" ])
80124 return res
81125
82- def check_for_updates (self , action_name : str , major_only : bool ) -> Optional [str ]:
83- """Check whether an action has an update, and return the latest version if it does syntax for uses:
84- https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_iduses
85- """
86- if "@" not in action_name :
87- return None
88- repo_name , current_version = action_name .split ("@" )
89- logging .debug (f"Checking for updates for { action_name } : Getting repo { repo_name } " )
90- if repo_name in self .actions_latest_release :
91- latest_release = self .actions_latest_release [repo_name ]
92- logging .debug (f"Found in cache { repo_name } : { latest_release } " )
93- return latest_release if compare_versions (latest_release , current_version , major_only ) else None
94- repo = self .client .get_repo (repo_name )
95- logging .debug (f"Getting latest release for repository: { repo_name } " )
96- try :
97- latest_release = repo .get_latest_release ()
98- if compare_versions (latest_release .tag_name , current_version , major_only ):
99- self .actions_latest_release [repo_name ] = _fix_version (latest_release .tag_name , major_only )
100- return latest_release .tag_name
101- except UnknownObjectException :
102- logging .warning (f"No releases found for repository: { repo_name } " )
103- return None
104-
105- def get_repo_actions_latest (self , repo_name : str , major_only : bool ) -> Dict [str , List [ActionVersion ]]:
126+ def get_repo_actions_latest (self , repo_name : str ) -> Dict [str , List [ActionVersion ]]:
106127 workflow_paths = self ._get_github_workflow_filenames (repo_name )
107128 res = dict ()
129+ all_actions = set ()
130+ all_actions_no_version = set () # actions without version, e.g., actions/checkout
108131 for path in workflow_paths :
109132 res [path ] = list ()
110- actions = self .get_workflow_actions (repo_name , path )
111- for action in actions :
133+ actions = self .get_workflow_action_names (repo_name , path )
134+ for action in all_actions :
112135 if "@" not in action :
113136 continue
114- action_name , curr_version = action .split ("@" )
115- if action not in self .actions_latest_release :
116- latest = self .check_for_updates (action , major_only )
117- self .actions_latest_release [action ] = latest
118- else :
119- latest = self .actions_latest_release [action ]
120- res [path ].append (ActionVersion (action_name , curr_version , latest ))
137+ action_name , _ = action .split ("@" )
138+ all_actions_no_version .add (action_name )
139+ all_actions = all_actions .union (actions )
140+ logging .info (f"Found { len (all_actions_no_version )} actions in workflows: { ", " .join (all_actions_no_version )} " )
141+ for action in all_actions :
142+ if "@" not in action :
143+ continue
144+ action_name , curr_version = action .split ("@" )
145+ latest_version = self .get_action_latest_release (action )
146+ res [path ].append (ActionVersion (action_name , curr_version , latest_version ))
121147 return res
122148
123149 def get_repo_workflow_names (self , repo_name : str ) -> Dict [str , str ]:
@@ -158,7 +184,7 @@ def _update_workflow_content(self, repo_name: str, workflow_path: str, workflow_
158184 return
159185
160186 # remote
161- repo = self .client .get_repo (repo_name )
187+ repo : Repository = self .client .get_repo (repo_name )
162188 current_content = repo .get_contents (workflow_path )
163189 res = repo .update_file (
164190 workflow_path ,
@@ -179,7 +205,7 @@ def _get_github_workflow_filenames(self, repo_name: str) -> Set[str]:
179205 click .secho (f"{ repo_name } is not a local repo and does not start with owner/repo" , fg = "red" , err = True )
180206 raise ValueError (f"{ repo_name } is not a local repo and does not start with owner/repo" )
181207 # Remote
182- repo = self .client .get_repo (repo_name )
208+ repo : Repository = self .client .get_repo (repo_name )
183209 self ._wf_cache [repo_name ] = {wf .path : wf for wf in repo .get_workflows () if wf .path .startswith (".github/" )}
184210 return set (self ._wf_cache [repo_name ].keys ())
185211
@@ -202,7 +228,7 @@ def _get_workflow_file_content(self, repo_name: str, workflow_path: str) -> Unio
202228 f"possible values: { workflow_paths } " ,
203229 err = True ,
204230 )
205- repo = self .client .get_repo (repo_name )
231+ repo : Repository = self .client .get_repo (repo_name )
206232 try :
207233 workflow_content = repo .get_contents (workflow_path )
208234 except UnknownObjectException :
@@ -235,23 +261,24 @@ def _get_workflow_file_content(self, repo_name: str, workflow_path: str) -> Unio
235261 help = "GitHub token to use, by default will use GITHUB_TOKEN environment variable" ,
236262)
237263@click .option (
238- "--compare-exact-versions" ,
264+ "-m" ,
265+ "--major-only" ,
239266 is_flag = True ,
240267 default = False ,
241- help = "Compare versions using all semantic and not only major versions, e.g., v1 will be upgraded to v1.2.3 " ,
268+ help = "Update major versions only , e.g., v1.2.3 will not be upgraded to v1.2.4 but to v2 " ,
242269)
243270@click .pass_context
244- def cli (ctx , verbose : int , repo : str , github_token : Optional [str ], compare_exact_versions : bool ):
271+ def cli (ctx , verbose : int , repo : str , github_token : Optional [str ], major_only : bool ):
245272 if verbose == 1 :
246273 coloredlogs .install (level = "INFO" )
247274 if verbose > 1 :
248275 coloredlogs .install (level = "DEBUG" )
249276 ctx .ensure_object (dict )
250- global FLAG_COMPARE_EXACT_VERSION
251- FLAG_COMPARE_EXACT_VERSION = compare_exact_versions
277+ repo_name = os . getcwd () if repo == "." else repo
278+ click . secho ( f"GitHub Actions CLI, scanning repo in { repo_name } " , fg = "green" , bold = True )
252279 if not github_token :
253280 click .secho (GITHUB_ACTION_NOT_PROVIDED_MSG , fg = "yellow" , err = True )
254- ctx .obj ["gh" ] = GithubActionsTools (github_token )
281+ ctx .obj ["gh" ] = GithubActionsTools (github_token , major_only )
255282 ctx .obj ["repo" ] = repo
256283 if not ctx .invoked_subcommand :
257284 ctx .invoke (update_actions )
@@ -272,18 +299,12 @@ def cli(ctx, verbose: int, repo: str, github_token: Optional[str], compare_exact
272299 show_default = True ,
273300 help = "Commit msg, only relevant when remote repo" ,
274301)
275- @click .option (
276- "-m" ,
277- "--major-only" ,
278- is_flag = True ,
279- default = False ,
280- help = "Compare major versions only, e.g., v1.2.3 will not be upgraded to v1.2.4 but to v2" ,
281- )
282302@click .pass_context
283- def update_actions (ctx , update : bool , commit_msg : str , major_only : bool ) -> None :
284- gh , repo = ctx .obj ["gh" ], ctx .obj ["repo" ]
285- workflow_names = gh .get_repo_workflow_names (repo )
286- workflow_action_versions = gh .get_repo_actions_latest (repo , major_only )
303+ def update_actions (ctx , update : bool , commit_msg : str ) -> None :
304+ gh , repo_name = ctx .obj ["gh" ], ctx .obj ["repo" ]
305+ workflow_names = gh .get_repo_workflow_names (repo_name )
306+ logging .info (f"Found { len (workflow_names )} workflows in { repo_name } : { ', ' .join (list (workflow_names .keys ()))} " )
307+ workflow_action_versions = gh .get_repo_actions_latest (repo_name )
287308 max_action_name_length , max_version_length = 0 , 0
288309 for workflow_path , actions in workflow_action_versions .items ():
289310 for action in workflow_action_versions [workflow_path ]:
@@ -302,14 +323,14 @@ def update_actions(ctx, update: bool, commit_msg: str, major_only: bool) -> None
302323 if not update :
303324 return
304325 for workflow in workflow_action_versions :
305- gh .update_actions (repo , workflow , workflow_action_versions [workflow ], commit_msg )
326+ gh .update_actions (repo_name , workflow , workflow_action_versions [workflow ], commit_msg )
306327
307328
308329@cli .command (help = "List actions in a workflow" )
309330@click .argument ("workflow" )
310331@click .pass_context
311332def list_actions (ctx , workflow : str ):
312- actions = ctx .obj ["gh" ].get_workflow_actions (ctx .obj ["repo" ], workflow )
333+ actions = ctx .obj ["gh" ].get_workflow_action_names (ctx .obj ["repo" ], workflow )
313334 for action in actions :
314335 click .echo (action )
315336
0 commit comments