|
1 | 1 | """Unit tests for contextlib.py, and other context managers.""" |
2 | 2 |
|
3 | 3 | import io |
| 4 | +import os |
4 | 5 | import sys |
5 | 6 | import tempfile |
6 | 7 | import threading |
| 8 | +import traceback |
7 | 9 | import unittest |
8 | 10 | from contextlib import * # Tests __all__ |
9 | 11 | from test import support |
@@ -86,6 +88,58 @@ def woohoo(): |
86 | 88 | raise ZeroDivisionError() |
87 | 89 | self.assertEqual(state, [1, 42, 999]) |
88 | 90 |
|
| 91 | + # TODO: RUSTPYTHON |
| 92 | + @unittest.expectedFailure |
| 93 | + def test_contextmanager_traceback(self): |
| 94 | + @contextmanager |
| 95 | + def f(): |
| 96 | + yield |
| 97 | + |
| 98 | + try: |
| 99 | + with f(): |
| 100 | + 1/0 |
| 101 | + except ZeroDivisionError as e: |
| 102 | + frames = traceback.extract_tb(e.__traceback__) |
| 103 | + |
| 104 | + self.assertEqual(len(frames), 1) |
| 105 | + self.assertEqual(frames[0].name, 'test_contextmanager_traceback') |
| 106 | + self.assertEqual(frames[0].line, '1/0') |
| 107 | + |
| 108 | + # Repeat with RuntimeError (which goes through a different code path) |
| 109 | + class RuntimeErrorSubclass(RuntimeError): |
| 110 | + pass |
| 111 | + |
| 112 | + try: |
| 113 | + with f(): |
| 114 | + raise RuntimeErrorSubclass(42) |
| 115 | + except RuntimeErrorSubclass as e: |
| 116 | + frames = traceback.extract_tb(e.__traceback__) |
| 117 | + |
| 118 | + self.assertEqual(len(frames), 1) |
| 119 | + self.assertEqual(frames[0].name, 'test_contextmanager_traceback') |
| 120 | + self.assertEqual(frames[0].line, 'raise RuntimeErrorSubclass(42)') |
| 121 | + |
| 122 | + class StopIterationSubclass(StopIteration): |
| 123 | + pass |
| 124 | + |
| 125 | + for stop_exc in ( |
| 126 | + StopIteration('spam'), |
| 127 | + StopIterationSubclass('spam'), |
| 128 | + ): |
| 129 | + with self.subTest(type=type(stop_exc)): |
| 130 | + try: |
| 131 | + with f(): |
| 132 | + raise stop_exc |
| 133 | + except type(stop_exc) as e: |
| 134 | + self.assertIs(e, stop_exc) |
| 135 | + frames = traceback.extract_tb(e.__traceback__) |
| 136 | + else: |
| 137 | + self.fail(f'{stop_exc} was suppressed') |
| 138 | + |
| 139 | + self.assertEqual(len(frames), 1) |
| 140 | + self.assertEqual(frames[0].name, 'test_contextmanager_traceback') |
| 141 | + self.assertEqual(frames[0].line, 'raise stop_exc') |
| 142 | + |
89 | 143 | def test_contextmanager_no_reraise(self): |
90 | 144 | @contextmanager |
91 | 145 | def whee(): |
@@ -126,19 +180,22 @@ def woohoo(): |
126 | 180 | self.assertEqual(state, [1, 42, 999]) |
127 | 181 |
|
128 | 182 | def test_contextmanager_except_stopiter(self): |
129 | | - stop_exc = StopIteration('spam') |
130 | 183 | @contextmanager |
131 | 184 | def woohoo(): |
132 | 185 | yield |
133 | | - try: |
134 | | - with self.assertWarnsRegex(DeprecationWarning, |
135 | | - "StopIteration"): |
136 | | - with woohoo(): |
137 | | - raise stop_exc |
138 | | - except Exception as ex: |
139 | | - self.assertIs(ex, stop_exc) |
140 | | - else: |
141 | | - self.fail('StopIteration was suppressed') |
| 186 | + |
| 187 | + class StopIterationSubclass(StopIteration): |
| 188 | + pass |
| 189 | + |
| 190 | + for stop_exc in (StopIteration('spam'), StopIterationSubclass('spam')): |
| 191 | + with self.subTest(type=type(stop_exc)): |
| 192 | + try: |
| 193 | + with woohoo(): |
| 194 | + raise stop_exc |
| 195 | + except Exception as ex: |
| 196 | + self.assertIs(ex, stop_exc) |
| 197 | + else: |
| 198 | + self.fail(f'{stop_exc} was suppressed') |
142 | 199 |
|
143 | 200 | # TODO: RUSTPYTHON |
144 | 201 | @unittest.expectedFailure |
@@ -230,6 +287,8 @@ class A: |
230 | 287 | def woohoo(a, b): |
231 | 288 | a = weakref.ref(a) |
232 | 289 | b = weakref.ref(b) |
| 290 | + # Allow test to work with a non-refcounted GC |
| 291 | + support.gc_collect() |
233 | 292 | self.assertIsNone(a()) |
234 | 293 | self.assertIsNone(b()) |
235 | 294 | yield |
@@ -318,13 +377,13 @@ def testWithOpen(self): |
318 | 377 | tfn = tempfile.mktemp() |
319 | 378 | try: |
320 | 379 | f = None |
321 | | - with open(tfn, "w") as f: |
| 380 | + with open(tfn, "w", encoding="utf-8") as f: |
322 | 381 | self.assertFalse(f.closed) |
323 | 382 | f.write("Booh\n") |
324 | 383 | self.assertTrue(f.closed) |
325 | 384 | f = None |
326 | 385 | with self.assertRaises(ZeroDivisionError): |
327 | | - with open(tfn, "r") as f: |
| 386 | + with open(tfn, "r", encoding="utf-8") as f: |
328 | 387 | self.assertFalse(f.closed) |
329 | 388 | self.assertEqual(f.read(), "Booh\n") |
330 | 389 | 1 / 0 |
@@ -486,26 +545,30 @@ def method(self, a, b, c=None): |
486 | 545 | self.assertEqual(test.b, 2) |
487 | 546 |
|
488 | 547 |
|
| 548 | + # TODO: RUSTPYTHON |
| 549 | + @unittest.expectedFailure |
489 | 550 | def test_typo_enter(self): |
490 | 551 | class mycontext(ContextDecorator): |
491 | 552 | def __unter__(self): |
492 | 553 | pass |
493 | 554 | def __exit__(self, *exc): |
494 | 555 | pass |
495 | 556 |
|
496 | | - with self.assertRaises(AttributeError): |
| 557 | + with self.assertRaisesRegex(TypeError, 'the context manager'): |
497 | 558 | with mycontext(): |
498 | 559 | pass |
499 | 560 |
|
500 | 561 |
|
| 562 | + # TODO: RUSTPYTHON |
| 563 | + @unittest.expectedFailure |
501 | 564 | def test_typo_exit(self): |
502 | 565 | class mycontext(ContextDecorator): |
503 | 566 | def __enter__(self): |
504 | 567 | pass |
505 | 568 | def __uxit__(self, *exc): |
506 | 569 | pass |
507 | 570 |
|
508 | | - with self.assertRaises(AttributeError): |
| 571 | + with self.assertRaisesRegex(TypeError, 'the context manager.*__exit__'): |
509 | 572 | with mycontext(): |
510 | 573 | pass |
511 | 574 |
|
@@ -608,9 +671,9 @@ def _exit(*args, **kwds): |
608 | 671 | stack.callback(arg=1) |
609 | 672 | with self.assertRaises(TypeError): |
610 | 673 | self.exit_stack.callback(arg=2) |
611 | | - with self.assertWarns(DeprecationWarning): |
| 674 | + with self.assertRaises(TypeError): |
612 | 675 | stack.callback(callback=_exit, arg=3) |
613 | | - self.assertEqual(result, [((), {'arg': 3})]) |
| 676 | + self.assertEqual(result, []) |
614 | 677 |
|
615 | 678 | def test_push(self): |
616 | 679 | exc_raised = ZeroDivisionError |
@@ -665,6 +728,27 @@ def _exit(): |
665 | 728 | result.append(2) |
666 | 729 | self.assertEqual(result, [1, 2, 3, 4]) |
667 | 730 |
|
| 731 | + # TODO: RUSTPYTHON |
| 732 | + @unittest.expectedFailure |
| 733 | + def test_enter_context_errors(self): |
| 734 | + class LacksEnterAndExit: |
| 735 | + pass |
| 736 | + class LacksEnter: |
| 737 | + def __exit__(self, *exc_info): |
| 738 | + pass |
| 739 | + class LacksExit: |
| 740 | + def __enter__(self): |
| 741 | + pass |
| 742 | + |
| 743 | + with self.exit_stack() as stack: |
| 744 | + with self.assertRaisesRegex(TypeError, 'the context manager'): |
| 745 | + stack.enter_context(LacksEnterAndExit()) |
| 746 | + with self.assertRaisesRegex(TypeError, 'the context manager'): |
| 747 | + stack.enter_context(LacksEnter()) |
| 748 | + with self.assertRaisesRegex(TypeError, 'the context manager'): |
| 749 | + stack.enter_context(LacksExit()) |
| 750 | + self.assertFalse(stack._exit_callbacks) |
| 751 | + |
668 | 752 | def test_close(self): |
669 | 753 | result = [] |
670 | 754 | with self.exit_stack() as stack: |
@@ -700,6 +784,40 @@ def test_exit_suppress(self): |
700 | 784 | stack.push(lambda *exc: True) |
701 | 785 | 1/0 |
702 | 786 |
|
| 787 | + # TODO: RUSTPYTHON |
| 788 | + @unittest.expectedFailure |
| 789 | + def test_exit_exception_traceback(self): |
| 790 | + # This test captures the current behavior of ExitStack so that we know |
| 791 | + # if we ever unintendedly change it. It is not a statement of what the |
| 792 | + # desired behavior is (for instance, we may want to remove some of the |
| 793 | + # internal contextlib frames). |
| 794 | + |
| 795 | + def raise_exc(exc): |
| 796 | + raise exc |
| 797 | + |
| 798 | + try: |
| 799 | + with self.exit_stack() as stack: |
| 800 | + stack.callback(raise_exc, ValueError) |
| 801 | + 1/0 |
| 802 | + except ValueError as e: |
| 803 | + exc = e |
| 804 | + |
| 805 | + self.assertIsInstance(exc, ValueError) |
| 806 | + ve_frames = traceback.extract_tb(exc.__traceback__) |
| 807 | + expected = \ |
| 808 | + [('test_exit_exception_traceback', 'with self.exit_stack() as stack:')] + \ |
| 809 | + self.callback_error_internal_frames + \ |
| 810 | + [('_exit_wrapper', 'callback(*args, **kwds)'), |
| 811 | + ('raise_exc', 'raise exc')] |
| 812 | + |
| 813 | + self.assertEqual( |
| 814 | + [(f.name, f.line) for f in ve_frames], expected) |
| 815 | + |
| 816 | + self.assertIsInstance(exc.__context__, ZeroDivisionError) |
| 817 | + zde_frames = traceback.extract_tb(exc.__context__.__traceback__) |
| 818 | + self.assertEqual([(f.name, f.line) for f in zde_frames], |
| 819 | + [('test_exit_exception_traceback', '1/0')]) |
| 820 | + |
703 | 821 | def test_exit_exception_chaining_reference(self): |
704 | 822 | # Sanity check to make sure that ExitStack chaining matches |
705 | 823 | # actual nested with statements |
@@ -781,6 +899,42 @@ def suppress_exc(*exc_details): |
781 | 899 | self.assertIsInstance(inner_exc, ValueError) |
782 | 900 | self.assertIsInstance(inner_exc.__context__, ZeroDivisionError) |
783 | 901 |
|
| 902 | + # TODO: RUSTPYTHON |
| 903 | + @unittest.expectedFailure |
| 904 | + def test_exit_exception_explicit_none_context(self): |
| 905 | + # Ensure ExitStack chaining matches actual nested `with` statements |
| 906 | + # regarding explicit __context__ = None. |
| 907 | + |
| 908 | + class MyException(Exception): |
| 909 | + pass |
| 910 | + |
| 911 | + @contextmanager |
| 912 | + def my_cm(): |
| 913 | + try: |
| 914 | + yield |
| 915 | + except BaseException: |
| 916 | + exc = MyException() |
| 917 | + try: |
| 918 | + raise exc |
| 919 | + finally: |
| 920 | + exc.__context__ = None |
| 921 | + |
| 922 | + @contextmanager |
| 923 | + def my_cm_with_exit_stack(): |
| 924 | + with self.exit_stack() as stack: |
| 925 | + stack.enter_context(my_cm()) |
| 926 | + yield stack |
| 927 | + |
| 928 | + for cm in (my_cm, my_cm_with_exit_stack): |
| 929 | + with self.subTest(): |
| 930 | + try: |
| 931 | + with cm(): |
| 932 | + raise IndexError() |
| 933 | + except MyException as exc: |
| 934 | + self.assertIsNone(exc.__context__) |
| 935 | + else: |
| 936 | + self.fail("Expected IndexError, but no exception was raised") |
| 937 | + |
784 | 938 | def test_exit_exception_non_suppressing(self): |
785 | 939 | # http://bugs.python.org/issue19092 |
786 | 940 | def raise_exc(exc): |
@@ -893,12 +1047,16 @@ def test_excessive_nesting(self): |
893 | 1047 | for i in range(10000): |
894 | 1048 | stack.callback(int) |
895 | 1049 |
|
| 1050 | + # TODO: RUSTPYTHON |
| 1051 | + @unittest.expectedFailure |
896 | 1052 | def test_instance_bypass(self): |
897 | 1053 | class Example(object): pass |
898 | 1054 | cm = Example() |
| 1055 | + cm.__enter__ = object() |
899 | 1056 | cm.__exit__ = object() |
900 | 1057 | stack = self.exit_stack() |
901 | | - self.assertRaises(AttributeError, stack.enter_context, cm) |
| 1058 | + with self.assertRaisesRegex(TypeError, 'the context manager'): |
| 1059 | + stack.enter_context(cm) |
902 | 1060 | stack.push(cm) |
903 | 1061 | self.assertIs(stack._exit_callbacks[-1][1], cm) |
904 | 1062 |
|
@@ -939,6 +1097,10 @@ def first(): |
939 | 1097 |
|
940 | 1098 | class TestExitStack(TestBaseExitStack, unittest.TestCase): |
941 | 1099 | exit_stack = ExitStack |
| 1100 | + callback_error_internal_frames = [ |
| 1101 | + ('__exit__', 'raise exc_details[1]'), |
| 1102 | + ('__exit__', 'if cb(*exc_details):'), |
| 1103 | + ] |
942 | 1104 |
|
943 | 1105 |
|
944 | 1106 | class TestRedirectStream: |
@@ -1064,5 +1226,59 @@ def test_cm_is_reentrant(self): |
1064 | 1226 | 1/0 |
1065 | 1227 | self.assertTrue(outer_continued) |
1066 | 1228 |
|
| 1229 | + |
| 1230 | +class TestChdir(unittest.TestCase): |
| 1231 | + def make_relative_path(self, *parts): |
| 1232 | + return os.path.join( |
| 1233 | + os.path.dirname(os.path.realpath(__file__)), |
| 1234 | + *parts, |
| 1235 | + ) |
| 1236 | + |
| 1237 | + # TODO: RUSTPYTHON |
| 1238 | + @unittest.expectedFailure |
| 1239 | + def test_simple(self): |
| 1240 | + old_cwd = os.getcwd() |
| 1241 | + target = self.make_relative_path('data') |
| 1242 | + self.assertNotEqual(old_cwd, target) |
| 1243 | + |
| 1244 | + with chdir(target): |
| 1245 | + self.assertEqual(os.getcwd(), target) |
| 1246 | + self.assertEqual(os.getcwd(), old_cwd) |
| 1247 | + |
| 1248 | + # TODO: RUSTPYTHON |
| 1249 | + @unittest.expectedFailure |
| 1250 | + def test_reentrant(self): |
| 1251 | + old_cwd = os.getcwd() |
| 1252 | + target1 = self.make_relative_path('data') |
| 1253 | + target2 = self.make_relative_path('ziptestdata') |
| 1254 | + self.assertNotIn(old_cwd, (target1, target2)) |
| 1255 | + chdir1, chdir2 = chdir(target1), chdir(target2) |
| 1256 | + |
| 1257 | + with chdir1: |
| 1258 | + self.assertEqual(os.getcwd(), target1) |
| 1259 | + with chdir2: |
| 1260 | + self.assertEqual(os.getcwd(), target2) |
| 1261 | + with chdir1: |
| 1262 | + self.assertEqual(os.getcwd(), target1) |
| 1263 | + self.assertEqual(os.getcwd(), target2) |
| 1264 | + self.assertEqual(os.getcwd(), target1) |
| 1265 | + self.assertEqual(os.getcwd(), old_cwd) |
| 1266 | + |
| 1267 | + # TODO: RUSTPYTHON |
| 1268 | + @unittest.expectedFailure |
| 1269 | + def test_exception(self): |
| 1270 | + old_cwd = os.getcwd() |
| 1271 | + target = self.make_relative_path('data') |
| 1272 | + self.assertNotEqual(old_cwd, target) |
| 1273 | + |
| 1274 | + try: |
| 1275 | + with chdir(target): |
| 1276 | + self.assertEqual(os.getcwd(), target) |
| 1277 | + raise RuntimeError("boom") |
| 1278 | + except RuntimeError as re: |
| 1279 | + self.assertEqual(str(re), "boom") |
| 1280 | + self.assertEqual(os.getcwd(), old_cwd) |
| 1281 | + |
| 1282 | + |
1067 | 1283 | if __name__ == "__main__": |
1068 | 1284 | unittest.main() |
0 commit comments