|
| 1 | +""" |
| 2 | + Utilities for reading data from filterbank file |
| 3 | +""" |
| 4 | + |
| 5 | +import os |
| 6 | +import numpy as np |
| 7 | +from filterbank.header import read_header, len_header |
| 8 | + |
| 9 | + |
| 10 | +class Filterbank: |
| 11 | + """ |
| 12 | + Processing .fil files |
| 13 | + """ |
| 14 | + |
| 15 | + # pylint: disable=too-many-instance-attributes |
| 16 | + |
| 17 | + def __init__(self, filename, freq_range=None, time_range=None): |
| 18 | + """ |
| 19 | + Initialize Filterbank object |
| 20 | +
|
| 21 | + Args: |
| 22 | + freq_range, tuple of freq_start and freq_stop in MHz |
| 23 | + time_range, tuple of time_start and time_stop |
| 24 | + """ |
| 25 | + self.freqs = None |
| 26 | + self.timestamps = None |
| 27 | + if os.path.isfile(filename): |
| 28 | + self.filename = filename |
| 29 | + self.header = read_header(filename) |
| 30 | + self.idx_data = len_header(filename) |
| 31 | + self.n_bytes = int(self.header[b'nbits'] / 8) |
| 32 | + self.n_chans = self.header[b'nchans'] |
| 33 | + self.n_ifs = self.header[b'nifs'] |
| 34 | + self.read_filterbank(freq_range, time_range) |
| 35 | + else: |
| 36 | + raise FileNotFoundError(filename) |
| 37 | + |
| 38 | + def read_filterbank(self, freq_range=None, time_range=None): |
| 39 | + """ |
| 40 | + Read filterbank file to 3d numpy array |
| 41 | + """ |
| 42 | + fil = open(self.filename, 'rb') |
| 43 | + fil.seek(self.idx_data) |
| 44 | + |
| 45 | + ii_start, n_ints = self.setup_time(time_range) |
| 46 | + # search for start of data |
| 47 | + fil.seek(int(ii_start * self.n_bytes * self.n_ifs * self.n_chans), 1) |
| 48 | + |
| 49 | + i_0, i_1 = self.setup_chans(freq_range) |
| 50 | + |
| 51 | + n_chans_selected = self.freqs.shape[0] |
| 52 | + |
| 53 | + # decide appropriate datatype |
| 54 | + if self.n_bytes == 4: |
| 55 | + dd_type = b'float32' |
| 56 | + elif self.n_bytes == 2: |
| 57 | + dd_type = b'uint16' |
| 58 | + elif self.n_bytes == 1: |
| 59 | + dd_type = b'uint8' |
| 60 | + # create numpy array with 3d shape |
| 61 | + self.data = np.zeros((n_ints, self.n_ifs, n_chans_selected), dtype=dd_type) |
| 62 | + |
| 63 | + for i_i in range(n_ints): |
| 64 | + for j_j in range(self.n_ifs): |
| 65 | + fil.seek(self.n_bytes * i_0, 1) |
| 66 | + # add to matrix |
| 67 | + self.data[i_i, j_j] = np.fromfile(fil, count=n_chans_selected, dtype=dd_type) |
| 68 | + # search for start of next chunk |
| 69 | + fil.seek(self.n_bytes * (self.n_chans - i_1), 1) |
| 70 | + |
| 71 | + def setup_freqs(self, freq_range=None): |
| 72 | + """ |
| 73 | + Calculate the frequency range |
| 74 | + """ |
| 75 | + f_delt = self.header[b'foff'] |
| 76 | + f_0 = self.header[b'fch1'] |
| 77 | + |
| 78 | + i_start, i_stop = 0, self.n_chans |
| 79 | + |
| 80 | + if freq_range: |
| 81 | + if freq_range[0]: |
| 82 | + i_start = int((freq_range[0] - f_0) / f_delt) |
| 83 | + if freq_range[1]: |
| 84 | + i_stop = int((freq_range[1] - f_0) / f_delt) |
| 85 | + |
| 86 | + chan_start_idx = np.int(i_start) |
| 87 | + chan_stop_idx = np.int(i_stop) |
| 88 | + |
| 89 | + if i_start < i_stop: |
| 90 | + i_vals = np.arange(chan_start_idx, chan_stop_idx) |
| 91 | + else: |
| 92 | + i_vals = np.arange(chan_stop_idx, chan_start_idx) |
| 93 | + |
| 94 | + self.freqs = f_delt * i_vals + f_0 |
| 95 | + |
| 96 | + if chan_stop_idx < chan_start_idx: |
| 97 | + chan_stop_idx, chan_start_idx = chan_start_idx, chan_stop_idx |
| 98 | + |
| 99 | + return chan_start_idx, chan_stop_idx |
| 100 | + |
| 101 | + def setup_time(self, time_range=None): |
| 102 | + """ |
| 103 | + Calculate the time range |
| 104 | + """ |
| 105 | + t_delt = self.header[b'tsamp'] |
| 106 | + t_0 = self.header[b'tstart'] |
| 107 | + |
| 108 | + n_bytes_data = os.path.getsize(self.filename) - self.idx_data |
| 109 | + n_ints_data = int(n_bytes_data / (self.n_bytes * self.n_chans * self.n_ifs)) |
| 110 | + |
| 111 | + ii_start, ii_stop = 0, n_ints_data |
| 112 | + |
| 113 | + if time_range: |
| 114 | + if time_range[0]: |
| 115 | + ii_start = time_range[0] |
| 116 | + if time_range[1]: |
| 117 | + ii_stop = time_range[1] |
| 118 | + |
| 119 | + n_ints = ii_stop - ii_start |
| 120 | + |
| 121 | + self.timestamps = np.arange(0, n_ints) * t_delt / 24. / 60. / 60. + t_0 |
| 122 | + |
| 123 | + return ii_start, n_ints |
| 124 | + |
| 125 | + def setup_chans(self, freq_range=None): |
| 126 | + """ |
| 127 | + Calculate the channel range |
| 128 | + """ |
| 129 | + chan_start_idx, chan_stop_idx = self.setup_freqs(freq_range) |
| 130 | + |
| 131 | + i_0 = np.min((chan_start_idx, chan_stop_idx)) |
| 132 | + i_1 = np.max((chan_start_idx, chan_stop_idx)) |
| 133 | + |
| 134 | + return i_0, i_1 |
| 135 | + |
| 136 | + def select_data(self, freq_start=None, freq_stop=None, time_start=None, time_stop=None): |
| 137 | + """ |
| 138 | + Select a range of data from the filterbank file |
| 139 | + """ |
| 140 | + # if no frequency range is specified, select all frequencies |
| 141 | + if freq_start is None: |
| 142 | + freq_start = self.freqs[0] |
| 143 | + if freq_stop is None: |
| 144 | + freq_stop = self.freqs[-1] |
| 145 | + |
| 146 | + i_0 = np.argmin(np.abs(self.freqs - freq_start)) |
| 147 | + i_1 = np.argmin(np.abs(self.freqs - freq_stop)) |
| 148 | + |
| 149 | + if i_0 < i_1: |
| 150 | + freq_data = self.freqs[i_0:i_1 + 1] |
| 151 | + fil_data = np.squeeze(self.data[time_start:time_stop, ..., i_0:i_1 + 1]) |
| 152 | + else: |
| 153 | + freq_data = self.freqs[i_1:i_0 + 1] |
| 154 | + fil_data = np.squeeze(self.data[time_start:time_stop, ..., i_1:i_0 + 1]) |
| 155 | + |
| 156 | + return freq_data, fil_data |
0 commit comments