|
4 | 4 | edge cases, update display, and display helpers. |
5 | 5 | """ |
6 | 6 |
|
| 7 | +import functools |
| 8 | +import io |
7 | 9 | import sys |
| 10 | +import tempfile |
8 | 11 | import time |
9 | 12 | import unittest |
10 | 13 | from unittest import mock |
11 | | -from test.support import requires |
| 14 | +from test.support import requires, is_emscripten |
12 | 15 | from test.support.import_helper import import_module |
13 | 16 |
|
14 | 17 | # Only run these tests if curses is available |
15 | 18 | requires("curses") |
16 | 19 | curses = import_module("curses") |
17 | 20 |
|
18 | 21 | from profiling.sampling.live_collector import LiveStatsCollector, MockDisplay |
| 22 | +from profiling.sampling.cli import main |
19 | 23 | from ._live_collector_helpers import ( |
20 | 24 | MockThreadInfo, |
21 | 25 | MockInterpreterInfo, |
@@ -816,5 +820,76 @@ def test_get_all_lines_full_display(self): |
816 | 820 | self.assertTrue(any("PID" in line for line in lines)) |
817 | 821 |
|
818 | 822 |
|
| 823 | +class TestLiveModeErrors(unittest.TestCase): |
| 824 | + """Tests running error commands in the live mode fails gracefully.""" |
| 825 | + |
| 826 | + def mock_curses_wrapper(self, func): |
| 827 | + func(mock.MagicMock()) |
| 828 | + |
| 829 | + def mock_init_curses_side_effect(self, n_times, mock_self, stdscr): |
| 830 | + mock_self.display = MockDisplay() |
| 831 | + # Allow the loop to run for a bit (approx 0.5s) before quitting |
| 832 | + # This ensures we don't exit too early while the subprocess is |
| 833 | + # still failing |
| 834 | + for _ in range(n_times): |
| 835 | + mock_self.display.simulate_input(-1) |
| 836 | + if n_times >= 500: |
| 837 | + mock_self.display.simulate_input(ord('q')) |
| 838 | + |
| 839 | + @unittest.skipIf(is_emscripten, "subprocess not available") |
| 840 | + def test_run_failed_module_live(self): |
| 841 | + """Test that running a existing module that fails exists with clean error.""" |
| 842 | + |
| 843 | + args = [ |
| 844 | + "profiling.sampling.cli", "run", "--live", "-m", "test", |
| 845 | + "test_asdasd" |
| 846 | + ] |
| 847 | + |
| 848 | + with ( |
| 849 | + mock.patch( |
| 850 | + 'profiling.sampling.live_collector.collector.LiveStatsCollector.init_curses', |
| 851 | + autospec=True, |
| 852 | + side_effect=functools.partial(self.mock_init_curses_side_effect, 1000) |
| 853 | + ), |
| 854 | + mock.patch('curses.wrapper', side_effect=self.mock_curses_wrapper), |
| 855 | + mock.patch("sys.argv", args), |
| 856 | + mock.patch('sys.stderr', new=io.StringIO()) as fake_stderr |
| 857 | + ): |
| 858 | + main() |
| 859 | + self.assertStartsWith( |
| 860 | + fake_stderr.getvalue(), |
| 861 | + '\x1b[31mtest test_asdasd crashed -- Traceback (most recent call last):' |
| 862 | + ) |
| 863 | + |
| 864 | + @unittest.skipIf(is_emscripten, "subprocess not available") |
| 865 | + def test_run_failed_script_live(self): |
| 866 | + """Test that running a failing script exits with clean error.""" |
| 867 | + script = tempfile.NamedTemporaryFile(suffix=".py") |
| 868 | + script.write(b'1/0\n') |
| 869 | + script.seek(0) |
| 870 | + |
| 871 | + args = ["profiling.sampling.cli", "run", "--live", script.name] |
| 872 | + |
| 873 | + with ( |
| 874 | + mock.patch( |
| 875 | + 'profiling.sampling.live_collector.collector.LiveStatsCollector.init_curses', |
| 876 | + autospec=True, |
| 877 | + side_effect=functools.partial(self.mock_init_curses_side_effect, 200) |
| 878 | + ), |
| 879 | + mock.patch('curses.wrapper', side_effect=self.mock_curses_wrapper), |
| 880 | + mock.patch("sys.argv", args), |
| 881 | + mock.patch('sys.stderr', new=io.StringIO()) as fake_stderr |
| 882 | + ): |
| 883 | + main() |
| 884 | + stderr = fake_stderr.getvalue() |
| 885 | + self.assertIn( |
| 886 | + 'sample(s) collected (minimum 200 required for TUI)', stderr |
| 887 | + ) |
| 888 | + self.assertEndsWith( |
| 889 | + stderr, |
| 890 | + 'ZeroDivisionError\x1b[0m: \x1b[35mdivision by zero\x1b[0m\n\n' |
| 891 | + ) |
| 892 | + |
| 893 | + |
819 | 894 | if __name__ == "__main__": |
820 | 895 | unittest.main() |
0 commit comments