|
| 1 | +from diffpy.utils.parsers import serialize_data, deserialize_data |
| 2 | +from diffpy.utils.parsers import loadData |
| 3 | +from diffpy.utils.tests.testhelpers import datafile |
| 4 | + |
| 5 | +from diffpy.utils.parsers.custom_exceptions import UnsupportedTypeError, ImproperSizeError |
| 6 | + |
| 7 | +import os |
| 8 | +import pytest |
| 9 | +import numpy |
| 10 | + |
| 11 | +tests_dir = os.path.dirname(os.path.abspath(locals().get('__file__', 'file.py'))) |
| 12 | + |
| 13 | +targetjson = datafile('targetjson.json') |
| 14 | +schemaname = datafile('strumining.json') |
| 15 | +wrongtype = datafile('wrong.type') |
| 16 | +loadfile = datafile('loadfile.txt') |
| 17 | +nodt = datafile('loaddatawithheaders.txt') |
| 18 | + |
| 19 | + |
| 20 | +def test_load_multiple(tmp_path): |
| 21 | + # generate json and apply schema |
| 22 | + generatedjson = tmp_path / "generated_serialization.json" |
| 23 | + tlm_list = os.listdir(os.path.join(tests_dir, "testdata", "dbload")) |
| 24 | + tlm_list.sort() |
| 25 | + generated_data = None |
| 26 | + for hfname in tlm_list: |
| 27 | + # gather data using loadData |
| 28 | + headerfile = os.path.normpath(os.path.join(tests_dir, "testdata", "dbload", hfname)) |
| 29 | + hdata = loadData(headerfile, headers=True) |
| 30 | + data_table = loadData(headerfile) |
| 31 | + |
| 32 | + # check path extraction |
| 33 | + generated_data = serialize_data(headerfile, hdata, data_table, dt_colnames=['r', 'gr'], show_path=True) |
| 34 | + assert headerfile == os.path.normpath(generated_data[hfname].pop('path')) |
| 35 | + |
| 36 | + # rerun without path information and save to file |
| 37 | + generated_data = serialize_data(headerfile, hdata, data_table, dt_colnames=['r', 'gr'], |
| 38 | + show_path=False, serial_file=generatedjson) |
| 39 | + |
| 40 | + # compare to target |
| 41 | + target_data = deserialize_data(targetjson) |
| 42 | + assert target_data == generated_data |
| 43 | + # ensure file saved properly |
| 44 | + assert target_data == deserialize_data(generatedjson) |
| 45 | + |
| 46 | + |
| 47 | +def test_exceptions(): |
| 48 | + hdata = loadData(loadfile, headers=True) |
| 49 | + data_table = loadData(loadfile) |
| 50 | + |
| 51 | + # improper file types |
| 52 | + with pytest.raises(UnsupportedTypeError): |
| 53 | + serialize_data(loadfile, hdata, data_table, serial_file=wrongtype) |
| 54 | + with pytest.raises(UnsupportedTypeError): |
| 55 | + deserialize_data(wrongtype) |
| 56 | + |
| 57 | + # various dt_colnames inputs |
| 58 | + with pytest.raises(ImproperSizeError): |
| 59 | + serialize_data(loadfile, hdata, data_table, dt_colnames=["one", "two", "three is too many"]) |
| 60 | + # check proper output |
| 61 | + normal = serialize_data(loadfile, hdata, data_table, dt_colnames=['r', 'gr']) |
| 62 | + data_name = list(normal.keys())[0] |
| 63 | + r_list = normal[data_name]['r'] |
| 64 | + gr_list = normal[data_name]['gr'] |
| 65 | + # three equivalent ways to denote no column names |
| 66 | + missing_parameter = serialize_data(loadfile, hdata, data_table, show_path=False) |
| 67 | + empty_parameter = serialize_data(loadfile, hdata, data_table, show_path=False, dt_colnames=[]) |
| 68 | + none_entry_parameter = serialize_data(loadfile, hdata, data_table, show_path=False, dt_colnames=[None, None]) |
| 69 | + # check equivalence |
| 70 | + assert missing_parameter == empty_parameter |
| 71 | + assert missing_parameter == none_entry_parameter |
| 72 | + print(data_table) |
| 73 | + print(missing_parameter[data_name]['data table prime']) |
| 74 | + assert numpy.allclose(missing_parameter[data_name]['data table prime'], data_table) |
| 75 | + # extract a single column |
| 76 | + r_extract = serialize_data(loadfile, hdata, data_table, show_path=False, dt_colnames=['r']) |
| 77 | + gr_extract = serialize_data(loadfile, hdata, data_table, show_path=False, dt_colnames=[None, 'gr']) |
| 78 | + incorrect_r_extract = serialize_data(loadfile, hdata, data_table, show_path=False, dt_colnames=[None, 'r']) |
| 79 | + # check proper columns extracted |
| 80 | + assert numpy.allclose(gr_extract[data_name]['gr'], incorrect_r_extract[data_name]['r']) |
| 81 | + assert 'r' not in gr_extract[data_name] |
| 82 | + assert 'gr' not in r_extract[data_name] and 'gr' not in incorrect_r_extract[data_name] |
| 83 | + # check correct values extracted |
| 84 | + assert numpy.allclose(r_extract[data_name]['r'], r_list) |
| 85 | + assert numpy.allclose(gr_extract[data_name]['gr'], gr_list) |
| 86 | + # no datatable |
| 87 | + nodt_hdata = loadData(nodt, headers=True) |
| 88 | + nodt_dt = loadData(nodt) |
| 89 | + no_dt = serialize_data(nodt, nodt_hdata, nodt_dt, show_path=False) |
| 90 | + nodt_data_name = list(no_dt.keys())[0] |
| 91 | + assert numpy.allclose(no_dt[nodt_data_name]['data table'], nodt_dt) |
0 commit comments