@@ -235,3 +235,346 @@ def __init__(self):
235235 "Checkout called with: {'update': True, 'force': True, 'submodules': 'always'}" ,
236236 ]
237237 caplog .clear ()
238+
239+
240+ def test_which_windows (mocker ):
241+ """Test which() on Windows platform."""
242+ mocker .patch ("platform.system" , return_value = "Windows" )
243+ mocker .patch .dict ("os.environ" , {"PATHEXT" : ".COM;.EXE;.BAT" , "PATH" : "/fake/path" })
244+ mocker .patch ("os.path.exists" , return_value = False )
245+ mocker .patch ("os.access" , return_value = False )
246+
247+ # Test with default value
248+ result = common .which ("test" , default = "/default/test" )
249+ assert result == "/default/test"
250+
251+ # Test without default - should exit
252+ exit_mock = mocker .patch ("sys.exit" )
253+ common .which ("test" )
254+ exit_mock .assert_called_once_with (1 )
255+
256+
257+ def test_which_unix (mocker ):
258+ """Test which() on Unix platform."""
259+ mocker .patch ("platform.system" , return_value = "Linux" )
260+ mocker .patch .dict ("os.environ" , {"PATH" : "/usr/bin:/bin" })
261+
262+ def exists_side_effect (path ):
263+ return path == "/usr/bin/python"
264+
265+ def access_side_effect (path , mode ):
266+ return path == "/usr/bin/python"
267+
268+ mocker .patch ("os.path.exists" , side_effect = exists_side_effect )
269+ mocker .patch ("os.access" , side_effect = access_side_effect )
270+
271+ result = common .which ("python" )
272+ assert result == "/usr/bin/python"
273+
274+
275+ def test_get_workingcopytypes_duplicate_error (mocker , caplog ):
276+ """Test get_workingcopytypes() with duplicate entry."""
277+
278+ class FakeEntryPoint :
279+ name = "duplicate"
280+ value = "fake.module:FakeClass"
281+
282+ def load (self ):
283+ return "FakeWorkingCopy"
284+
285+ # Save the original workingcopytypes
286+ original_wct = common ._workingcopytypes .copy ()
287+
288+ try :
289+ exit_mock = mocker .patch ("sys.exit" )
290+ mocker .patch (
291+ "mxdev.vcs.common.load_eps_by_group" ,
292+ return_value = [FakeEntryPoint (), FakeEntryPoint ()],
293+ )
294+ common ._workingcopytypes .clear ()
295+
296+ common .get_workingcopytypes ()
297+ exit_mock .assert_called_once_with (1 )
298+ finally :
299+ # Restore the original workingcopytypes
300+ common ._workingcopytypes .clear ()
301+ common ._workingcopytypes .update (original_wct )
302+
303+
304+ def test_WorkingCopies_matches (mocker , caplog ):
305+ """Test WorkingCopies.matches() method."""
306+
307+ class TestWorkingCopy (common .BaseWorkingCopy ):
308+ def checkout (self , ** kwargs ):
309+ return None
310+
311+ def status (self , ** kwargs ):
312+ return "clean"
313+
314+ def matches (self ):
315+ return True
316+
317+ def update (self , ** kwargs ):
318+ return None
319+
320+ exit_mock = mocker .patch ("sys.exit" )
321+ mocker .patch ("mxdev.vcs.common._workingcopytypes" , {"test" : TestWorkingCopy })
322+
323+ wc = common .WorkingCopies (
324+ sources = {"package" : {"vcs" : "test" , "name" : "package" , "url" : "test://url" }}
325+ )
326+
327+ # Test successful match
328+ result = wc .matches ({"name" : "package" })
329+ assert result is True
330+
331+ # Test with missing source
332+ try :
333+ wc .matches ({"name" : "missing" })
334+ except KeyError :
335+ pass # Expected - sys.exit() is mocked so code continues
336+ exit_mock .assert_called_with (1 )
337+ assert "Checkout failed. No source defined for 'missing'." in caplog .text
338+ caplog .clear ()
339+ exit_mock .reset_mock ()
340+
341+ # Test with unregistered VCS type
342+ wc .sources = {"package" : {"vcs" : "unknown" , "name" : "package" }}
343+ try :
344+ wc .matches ({"name" : "package" })
345+ except TypeError :
346+ pass # Expected - sys.exit() is mocked so code continues
347+ exit_mock .assert_called_with (1 )
348+ assert "Unregistered repository type unknown" in caplog .text
349+ caplog .clear ()
350+ exit_mock .reset_mock ()
351+
352+ # Test with WCError exception
353+ class ErrorWorkingCopy (TestWorkingCopy ):
354+ def matches (self ):
355+ raise common .WCError ("Test error" )
356+
357+ wc .workingcopytypes = {"test" : ErrorWorkingCopy }
358+ wc .sources = {"package" : {"vcs" : "test" , "name" : "package" }}
359+ try :
360+ wc .matches ({"name" : "package" })
361+ except (TypeError , common .WCError ):
362+ pass # Expected - WCError is raised
363+ assert "Can not get matches!" in caplog .text
364+ exit_mock .assert_called_with (1 )
365+
366+
367+ def test_WorkingCopies_status (mocker , caplog ):
368+ """Test WorkingCopies.status() method."""
369+
370+ class TestWorkingCopy (common .BaseWorkingCopy ):
371+ def checkout (self , ** kwargs ):
372+ return None
373+
374+ def status (self , ** kwargs ):
375+ return "clean"
376+
377+ def matches (self ):
378+ return True
379+
380+ def update (self , ** kwargs ):
381+ return None
382+
383+ exit_mock = mocker .patch ("sys.exit" )
384+ mocker .patch ("mxdev.vcs.common._workingcopytypes" , {"test" : TestWorkingCopy })
385+
386+ wc = common .WorkingCopies (
387+ sources = {"package" : {"vcs" : "test" , "name" : "package" , "url" : "test://url" }}
388+ )
389+
390+ # Test successful status
391+ result = wc .status ({"name" : "package" })
392+ assert result == "clean"
393+
394+ # Test with missing source
395+ try :
396+ wc .status ({"name" : "missing" })
397+ except KeyError :
398+ pass # Expected - sys.exit() is mocked so code continues
399+ exit_mock .assert_called_with (1 )
400+ assert "Status failed. No source defined for 'missing'." in caplog .text
401+ caplog .clear ()
402+ exit_mock .reset_mock ()
403+
404+ # Test with unregistered VCS type
405+ wc .sources = {"package" : {"vcs" : "unknown" , "name" : "package" }}
406+ try :
407+ wc .status ({"name" : "package" })
408+ except TypeError :
409+ pass # Expected - sys.exit() is mocked so code continues
410+ assert "Unregistered repository type unknown" in caplog .text
411+ exit_mock .assert_called_with (1 )
412+ caplog .clear ()
413+ exit_mock .reset_mock ()
414+
415+ # Test with WCError exception
416+ class ErrorWorkingCopy (TestWorkingCopy ):
417+ def status (self , ** kwargs ):
418+ raise common .WCError ("Test error" )
419+
420+ wc .workingcopytypes = {"test" : ErrorWorkingCopy }
421+ wc .sources = {"package" : {"vcs" : "test" , "name" : "package" }}
422+ try :
423+ wc .status ({"name" : "package" })
424+ except (TypeError , common .WCError ):
425+ pass # Expected - WCError is raised
426+ assert "Can not get status!" in caplog .text
427+ exit_mock .assert_called_with (1 )
428+
429+
430+ def test_WorkingCopies_update (mocker , caplog , tmp_path ):
431+ """Test WorkingCopies.update() method."""
432+ caplog .set_level (logging .INFO )
433+
434+ class TestWorkingCopy (common .BaseWorkingCopy ):
435+ package_status = "clean"
436+
437+ def checkout (self , ** kwargs ):
438+ return None
439+
440+ def status (self , ** kwargs ):
441+ return self .package_status
442+
443+ def matches (self ):
444+ return True
445+
446+ def update (self , ** kwargs ):
447+ common .logger .info (f"Update called with: { kwargs } " )
448+ return None
449+
450+ exit_mock = mocker .patch ("sys.exit" )
451+ mocker .patch ("mxdev.vcs.common._workingcopytypes" , {"test" : TestWorkingCopy })
452+
453+ package_dir = tmp_path / "package"
454+ package_dir .mkdir ()
455+ wc = common .WorkingCopies (
456+ sources = {
457+ "package" : {"vcs" : "test" , "name" : "package" , "path" : str (package_dir )}
458+ },
459+ threads = 1 ,
460+ )
461+
462+ # Test clean update
463+ wc .update (packages = ["package" ])
464+ assert "Queued 'package' for update." in caplog .text
465+ assert "Update called with:" in caplog .text
466+ caplog .clear ()
467+
468+ # Test with missing package - should skip
469+ wc .update (packages = ["missing" ])
470+ assert "missing" not in caplog .text
471+ caplog .clear ()
472+
473+ # Test with unregistered VCS type
474+ wc .sources = {
475+ "package" : {"vcs" : "unknown" , "name" : "package" , "path" : str (package_dir )}
476+ }
477+ try :
478+ wc .update (packages = ["package" ])
479+ except TypeError :
480+ # Expected - sys.exit() is mocked so code continues and tries to call None
481+ pass
482+ exit_mock .assert_called_with (1 )
483+ assert "Unregistered repository type unknown" in caplog .text
484+ caplog .clear ()
485+ exit_mock .reset_mock ()
486+
487+ # Test dirty package with user declining update
488+ input_mock = mocker .patch ("mxdev.vcs.common.input" , new_callable = Input )
489+ input_mock .answer = "n"
490+ print_stderr = mocker .patch ("mxdev.vcs.common.print_stderr" )
491+
492+ TestWorkingCopy .package_status = "dirty"
493+ wc .sources = {
494+ "package" : {"vcs" : "test" , "name" : "package" , "path" : str (package_dir )}
495+ }
496+ wc .update (packages = ["package" ])
497+ print_stderr .assert_called_with ("The package 'package' is dirty." )
498+ assert "Skipped update of 'package'." in caplog .text
499+ caplog .clear ()
500+
501+ # Test dirty package with user accepting update
502+ input_mock .answer = "y"
503+ wc .update (packages = ["package" ])
504+ assert "Queued 'package' for update." in caplog .text
505+ assert "'force': True" in caplog .text
506+ caplog .clear ()
507+
508+ # Test dirty package with 'all' answer
509+ input_mock .answer = "all"
510+ wc .update (packages = ["package" ])
511+ assert "Queued 'package' for update." in caplog .text
512+ caplog .clear ()
513+
514+
515+ def test_worker_with_error (mocker , caplog ):
516+ """Test worker() function with WCError exception."""
517+
518+ class TestWorkingCopy (common .BaseWorkingCopy ):
519+ def checkout (self , ** kwargs ):
520+ raise common .WCError ("Test error" )
521+
522+ def status (self , ** kwargs ):
523+ return "clean"
524+
525+ def matches (self ):
526+ return True
527+
528+ def update (self , ** kwargs ):
529+ return None
530+
531+ wc = TestWorkingCopy (source = {"url" : "test://url" })
532+ wc .output ((logging .error , "Error message" ))
533+
534+ working_copies = common .WorkingCopies (sources = {})
535+ test_queue = queue .Queue ()
536+ test_queue .put ((wc , wc .checkout , {}))
537+
538+ common .worker (working_copies , test_queue )
539+
540+ assert working_copies .errors is True
541+ assert "Can not execute action!" in caplog .text
542+
543+
544+ def test_worker_with_bytes_output (mocker ):
545+ """Test worker() function with bytes output."""
546+
547+ class TestWorkingCopy (common .BaseWorkingCopy ):
548+ def checkout (self , ** kwargs ):
549+ return b"bytes output"
550+
551+ def status (self , ** kwargs ):
552+ return "clean"
553+
554+ def matches (self ):
555+ return True
556+
557+ def update (self , ** kwargs ):
558+ return None
559+
560+ wc = TestWorkingCopy (source = {"url" : "test://url" })
561+
562+ working_copies = common .WorkingCopies (sources = {})
563+ test_queue = queue .Queue ()
564+ test_queue .put ((wc , wc .checkout , {"verbose" : True }))
565+
566+ print_mock = mocker .patch ("builtins.print" )
567+ common .worker (working_copies , test_queue )
568+
569+ print_mock .assert_called_once_with ("bytes output" )
570+
571+
572+ def test_worker_errors_flag (mocker ):
573+ """Test worker() respects the errors flag."""
574+ working_copies = common .WorkingCopies (sources = {})
575+ working_copies .errors = True
576+ test_queue = queue .Queue ()
577+
578+ # Should return immediately without processing queue
579+ common .worker (working_copies , test_queue )
580+ assert test_queue .qsize () == 0 # Queue should not be modified
0 commit comments