1+ import pytest
12import datajoint as dj
23import numpy as np
3-
4- from . import PREFIX , CONN_INFO
4+ from . import PREFIX
55from numpy .testing import assert_array_equal
6- from nose .tools import assert_true
7-
8-
9- schema_in = dj .Schema (
10- PREFIX + "_test_bypass_serialization_in" , connection = dj .conn (** CONN_INFO )
11- )
12-
13- schema_out = dj .Schema (
14- PREFIX + "_test_blob_bypass_serialization_out" , connection = dj .conn (** CONN_INFO )
15- )
16-
176
187test_blob = np .array ([1 , 2 , 3 ])
198
209
21- @schema_in
2210class Input (dj .Lookup ):
2311 definition = """
2412 id: int
@@ -28,7 +16,6 @@ class Input(dj.Lookup):
2816 contents = [(0 , test_blob )]
2917
3018
31- @schema_out
3219class Output (dj .Manual ):
3320 definition = """
3421 id: int
@@ -37,10 +24,34 @@ class Output(dj.Manual):
3724 """
3825
3926
40- def test_bypass_serialization ():
27+ @pytest .fixture
28+ def schema_in (connection_test ):
29+ schema = dj .Schema (
30+ PREFIX + "_test_bypass_serialization_in" ,
31+ context = dict (Input = Input ),
32+ connection = connection_test ,
33+ )
34+ schema (Input )
35+ yield schema
36+ schema .drop ()
37+
38+
39+ @pytest .fixture
40+ def schema_out (connection_test ):
41+ schema = dj .Schema (
42+ PREFIX + "_test_blob_bypass_serialization_out" ,
43+ context = dict (Output = Output ),
44+ connection = connection_test ,
45+ )
46+ schema (Output )
47+ yield schema
48+ schema .drop ()
49+
50+
51+ def test_bypass_serialization (schema_in , schema_out ):
4152 dj .blob .bypass_serialization = True
4253 contents = Input .fetch (as_dict = True )
43- assert_true ( isinstance (contents [0 ]["data" ], bytes ) )
54+ assert isinstance (contents [0 ]["data" ], bytes )
4455 Output .insert (contents )
4556 dj .blob .bypass_serialization = False
4657 assert_array_equal (Input .fetch1 ("data" ), Output .fetch1 ("data" ))
0 commit comments