brukeropus.file.parse

  1import os, struct, errno
  2import numpy as np
  3from brukeropus.file.constants import STRUCT_3D_INFO_BLOCK, SUBREPORT_TYPE_FMT
  4
  5
  6__docformat__ = "google"
  7
  8
  9def read_opus_file_bytes(filepath) -> bytes:
 10    '''Returns `bytes` of an OPUS file specified by `filepath` (or `None`).
 11
 12    Function determines if `filepath` points to an OPUS file by reading the first four bytes which are always the same
 13    for OPUS files.  If `filepath` is not a file, or points to a non-OPUS file, the function returns `None`.  Otherwise
 14    the function returns the entire file as raw `bytes`.
 15
 16    Args:
 17        filepath (str or Path): full filepath to OPUS file
 18
 19    Returns:
 20        **filebytes (bytes):** raw bytes of OPUS file or `None` (if filepath does not point to an OPUS file)
 21    '''
 22    filebytes = None
 23    if os.path.isfile(filepath):
 24        with open(filepath, 'rb') as f:
 25            try:
 26                first_four = f.read(4)
 27                if first_four == b'\n\n\xfe\xfe':
 28                    filebytes = first_four + f.read()
 29            except:
 30                pass # Empty file (or file with fewer than 4 bytes)
 31    else:
 32        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filepath)
 33    return filebytes
 34
 35
 36def get_block_type(type_int: int) -> tuple:
 37    '''Converts an int32 block type code to a six-integer tuple `block_type`.
 38
 39    This function is used to decode the `type_int` from the directory block of an OPUS file into a tuple of integers.
 40    Each integer in the tuple provides information about the associated data block.
 41
 42    Args:
 43        type_int: 32-bit integer decoded from file directory block
 44
 45    Returns:
 46        **block_type (tuple):** six-integer tuple which specifies the block type
 47    '''
 48    type_bit_str = format(type_int, '#034b')  # binary representation as string
 49    block_type = (
 50        int(type_bit_str[-2:], 2),
 51        int(type_bit_str[-4:-2], 2),
 52        int(type_bit_str[-10:-4], 2),
 53        int(type_bit_str[-17:-10], 2),
 54        int(type_bit_str[-19:-17], 2),
 55        int(type_bit_str[-22:-19], 2)
 56    )
 57    return block_type
 58
 59
 60def decode_str(size: int, blockbytes: bytes, offset: int) -> str:
 61    '''Decode string that is packed as bytes in `blockbytes` starting from `offset`.
 62
 63    Strings are frequently stored in OPUS files with a size designation that is larger than the actual string. The end
 64    of the string is designated by a terminator byte: b'\x00'. This function unpacks the string using the size
 65    designator, truncates at the terminator byte if found, and decodes as "latin-1"
 66
 67    Args:
 68        size: size (number of bytes) of the string
 69        blockbytes: raw bytes of an OPUS file block
 70        offset: offset location where string begins in blockbytes
 71
 72    Returns:
 73        string: decoded string
 74    '''
 75    fmt = '<' + str(size) + 's'
 76    try:
 77        val = struct.unpack_from(fmt, blockbytes, offset)[0]
 78        x00_pos = val.find(b'\x00')
 79        if x00_pos != -1:
 80            val = val[:x00_pos].decode('latin-1')
 81        else:
 82            val = val.decode('latin-1')
 83    except Exception as e:
 84        val = 'Failed to decode: ' + str(e)
 85    return val
 86
 87
 88def parse_header(filebytes: bytes) -> tuple:
 89    '''Parses the OPUS file header.
 90
 91    The header of an OPUS file contains some basic information about the file including the version number, location of
 92    the directory block, and number of blocks in the file. This header is first to be parsed as it specifies how to
 93    read the file directory block (which contains information about each block in the file)
 94
 95    Args:
 96        filebytes: raw bytes of OPUS file (all bytes)
 97
 98    Returns:
 99        **header_info (tuple):**  
100            (  
101                **version (float64):** program version number as a floating-point date (later versions always greater)  
102                **directory_start (int32):** pointer to start location of directory block (number of bytes)  
103                **max_blocks (int32):** maximum number of blocks supported by the directory block (this should only be
104                    relevant when trying to edit an OPUS file, i.e. when adding data blocks to a file)  
105                **num_blocks (int32):** total number of blocks in the opus file  
106            )
107    '''
108    version = struct.unpack_from('<d', filebytes, 4)[0]
109    directory_start = struct.unpack_from('<i', filebytes, 12)[0]
110    max_blocks = struct.unpack_from('<i', filebytes, 16)[0]
111    num_blocks = struct.unpack_from('<i', filebytes, 20)[0]
112    return version, directory_start, max_blocks, num_blocks
113
114
115def parse_directory(blockbytes: bytes) -> list:
116    '''Parses directory block of OPUS file and returns a list of block info tuples: (type, size, start).
117
118    The directory block of an OPUS file contains information about every block in the file. The block information is
119    stored as three int32 values: `type_int`, `size_int`, `start`.  `type_int` is an integer representation of the block
120    type. The bits of this `type_int` have meaning and are parsed into a tuple using `get_block_type`. The `size_int` is
121    the size of the block in 32-bit words. `start` is the starting location of the block (in number of bytes).
122
123    Args:
124        blockbytes: raw bytes of an OPUS file directory block
125
126    Returns:
127        **blocks (list):** list of block_info tuples
128            **block_info (tuple):**
129                (  
130                    **block_type (tuple):** six-integer tuple which specifies the block type (see: `get_block_type`)  
131                    **size (int):** size (number of bytes) of the block  
132                    **start (int):** pointer to start location of the block (number of bytes)
133                )
134    '''
135    loc = 0
136    blocks = []
137    while loc < len(blockbytes):
138        type_int, size_int, start = struct.unpack_from('<3i', blockbytes, loc)
139        loc = loc + 12
140        if start > 0:
141            block_type = get_block_type(type_int)
142            size = size_int*4
143            blocks.append((block_type, size, start))
144        else:
145            break
146    return blocks
147
148
149def parse_params(blockbytes: bytes) -> dict:
150    '''Parses the bytes in a parameter block and returns a dict containing the decoded keys and vals.
151
152    Parameter blocks are in the form: `XXX`, `dtype_code`, `size`, `val`.  `XXX` is a three char abbreviation of the
153    parameter (key). The value of the parameter is decoded according to the `dtype_code` and size integers to be either:
154    `int`, `float`, or `string`.
155
156    Args:
157        blockbytes: raw bytes of an OPUS file parameter block
158
159    Returns:
160        **items (tuple):** (key, value) pairs where key is three char string (lowercase) and value can be `int`, `float`
161            or `string`.
162    '''
163    loc = 0
164    params = dict()
165    while loc < len(blockbytes):
166        key = blockbytes[loc:loc + 3].decode('utf-8')
167        if key == 'END':
168            break
169        dtype_code, val_size = struct.unpack_from('<2h', blockbytes[loc + 4:loc + 8])
170        val_size = val_size * 2
171        if dtype_code == 0:
172            val = struct.unpack_from('<i', blockbytes, loc + 8)[0]
173        elif dtype_code == 1:
174            val = struct.unpack_from('<d', blockbytes, loc + 8)[0]
175        else:
176            val = decode_str(val_size, blockbytes, loc + 8)
177        params[key.lower()] = val
178        loc = loc + val_size + 8
179    return params
180
181
182def get_dpf_dtype_count(dpf: int, size: int) -> tuple:
183    '''Returns numpy dtype and array count from the data point format (dpf) and block size (in bytes).
184
185    Args:
186        dpf: data point format integer stored in data status block.
187            dpf = 1 -> array of float32
188            dpf = 2 -> array of int32
189        size: Block size in bytes.
190
191    Returns:
192        **dtype (numpy.dtype):** `numpy` dtype for defining an `ndarray` to store the data
193        **count (int):** length of array calculated from the block size and byte size of the dtype.
194    '''
195    if dpf == 1:
196        dtype = np.float32
197        count = round(size/4)
198    elif dpf == 2:
199        dtype = np.int32
200        count = round(size/4)
201    else:
202        print('Unknown Data Point Format Requested:', dpf, '[using default: `float32`]')
203        dtype = np.float32
204        count = round(size/4)
205    return dtype, count
206
207
208def parse_data(blockbytes: bytes, dpf: int = 1) -> np.ndarray:
209    '''Parses the bytes in a data block and returns a `numpy` array.
210
211    Data blocks contain no metadata, only the y-values of a data array. Data arrays include: single-channel sample,
212    reference, phase, interferograms, and a variety of resultant data (transmission, absorption, etc.).  Every data
213    block should have a corresponding data status parameter block which can be used to generate the x-array values for
214    the data block. The data status block also specifies the data type of the data array with the `DPF` parameter. It
215    appears that OPUS currently exclusively stores data blocks as 32-bit floats, but has a reservation for 32-bit
216    integers when `DPF` = 2.
217
218    Args:
219        blockbytes: raw bytes of data block
220        dpf: data-point-format integer stored in corresponding data status block.
221
222    Returns:
223        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
224    '''
225    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
226    return np.frombuffer(blockbytes, dtype=dtype, count=count)
227
228
229def parse_data_compact(blockbytes: bytes, npt: int, dpf: int = 1) -> np.ndarray:
230    '''Parses the bytes in a data compact block and returns a `numpy` array.
231
232    Some data blocks are stored in the "Compact" block format that includes some metadata that preceeds the raw data. At
233    this time, the metadata is ignored, and the compact spectra is extracted from the last bytes of the block that fit
234    the known array size (specified with "npt" in corresponding data status block).
235
236    Args:
237        blockbytes: raw bytes of data block
238        npt: number of data points in the spectra (from data status block)
239        dpf: data-point-format integer stored in corresponding data status block.
240
241    Returns:
242        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
243    '''
244    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
245    return np.frombuffer(blockbytes, dtype=dtype, count=count)
246
247
248def parse_data_series(blockbytes: bytes, dpf: int = 1) -> dict:
249    '''Parses the bytes in a 3D data block (series of spectra) and returns a data `dict` containing data and metadata.
250
251    3D data blocks are structured differently than standard data blocks. In addition to the series of spectra, they
252    include metadata for each of the spectrum.  This function returns a `dict` containing all the extracted information
253    from the data block.  The series spectra is formed into a 2D array while metadata captured for each spectra is
254    formed into a 1D array (length = number of spectral measurements in the series).
255
256    Args:
257        blockbytes: raw bytes of the data series block
258        dpf: data-point-format integer stored in corresponding data status block.
259
260    Returns:
261        **data_dict (dict):** `dict` containing all extracted information from the data block  
262            {  
263                **version:** file format version number (should be 0)  
264                **num_blocks:** number of sub blocks; each sub block features a data spectra and associated metadata  
265                **offset:** offset in bytes to the first sub data block  
266                **data_size:** size in bytes of each sub data block  
267                **info_size:** size in bytes of the metadata info block immediately following the sub data block  
268                **store_table:** run numbers of the first and last blocks to keep track of skipped spectra  
269                **y:** 2D `numpy` array containing all spectra (C-order)  
270                **metadata arrays:** series of metadata arrays in 1D array format (e.g. `npt`, `mny`, `mxy`, `ert`).
271                    The most useful one is generally `ert`, which can be used as the time axis for 3D data plots.
272            }
273    '''
274    header = struct.unpack_from('<6i', blockbytes, 0)
275    data = {
276        'version': header[0],
277        'num_blocks': header[1],
278        'offset': header[2],
279        'data_size': header[3],
280        'info_size': header[4],
281    }
282    data['store_table'] = [struct.unpack_from('<2i', blockbytes, 24 + i * 8) for i in range(header[5])]
283    dtype, count = get_dpf_dtype_count(dpf, data['data_size'])
284    data['y'] = np.zeros((data['num_blocks'], count), dtype=dtype)
285    for entry in STRUCT_3D_INFO_BLOCK:
286        data[entry['key']] = np.zeros((data['num_blocks']), dtype=entry['dtype'])
287    offset = data['offset']
288    for i in range(data['num_blocks']):
289        data['y'][i] = np.frombuffer(blockbytes[offset:], dtype=dtype, count=count)
290        offset = offset + data['data_size']
291        info_vals = struct.unpack_from('<' + ''.join([e['fmt'] for e in STRUCT_3D_INFO_BLOCK]), blockbytes, offset)
292        for j, entry in enumerate(STRUCT_3D_INFO_BLOCK):
293            data[entry['key']][i] = info_vals[j]
294        offset = offset + data['info_size']
295        if offset >= len(blockbytes):
296            num_spectra = i
297            break # Not all blocks are necessarily stored (see Store Table)
298        num_spectra = i
299    data['y'] = data['y'][:num_spectra]
300    for entry in STRUCT_3D_INFO_BLOCK:
301        data[entry['key']] = data[entry['key']][:num_spectra]
302    return data
303
304
305def parse_text(block_bytes: bytes, encoding='utf-8', backup_encoding='utf-16') -> str:
306    '''Parses and OPUS file block as text (e.g. history or file-log block).
307
308    The history (aka file-log) block of an OPUS file contains some information about how the file was generated and
309    edits that have been performed on the file.  This function parses the text block but does not take any steps to
310    parameterizing what is contained in the text.  The history block is generally not needed to retrieve the file data
311    and metadata, but might be useful for inspecting the file.
312
313    Args:
314        blockbytes: raw bytes of the text block (e.g. history or file-log)
315        encoding: string represting text encoding type. Can be set to "utf-16" for chinese character support
316        backup_encoding: if default encoding fails, this encoding string will be attempted.
317
318    Returns:
319        text: string of text contained in the file block.
320    '''
321    byte_string = struct.unpack('<' + str(len(block_bytes)) + 's', block_bytes)[0]
322    byte_strings = byte_string.split(b'\x00')
323    strings = []
324    for entry in byte_strings:
325        if entry != b'':
326            try:
327                strings.append(entry.decode(encoding))
328            except Exception:
329                try:
330                    strings.append(entry.decode(backup_encoding))
331                except Exception:
332                    try:
333                        strings.append(entry.decode('latin-1'))
334                    except Exception as e:
335                        strings.append('<Decode Exception>: ' + str(e))
336    return '\n'.join(strings)
337
338
339def parse_subreport(subreport_bytes: bytes) -> dict:
340    '''Parses the bytes of a subreport and returns the extracted data as a dictionary
341
342    Subreports are contained within a report block (e.g. Multi-Evaluation Test Report). A report can contain multiple
343    subreports, and they generally follow a table format. This sub-block is organized with a mini parameter block
344    followed by packed data. The mini parameter block contains information about how to read the packed data:
345        nco: number of columns
346        nln: number of rows
347        siz: size of mini parameter block (number of bytes)
348        src: size in bytes of entire row of data (offset for extracting column data from row 2, 3 ...)
349        f00, f01 ... fxx: start position of data in column 0, 1 ... xx (relative to end of mini param block)
350        t00, t01 ... txx: integer representing type of data (e.g. int32, float32, float64, str, etc.)
351        s00, s01 ... sxx: column header label
352        p00, p01 ... pxx: formatting string for numbers in column 0, 1 ... xx (not included for every column)
353
354    Args:
355        subreport_bytes: raw bytes of the subreport. Needs to start precisely where subreport begins, but can include
356        data beyond the end of the subreport (i.e. end of subreport does not need to be determined a priori).
357
358    Returns:
359        **subreport (dict):** `dict` containing subreport data and extraction/formatting parameters  
360            {  
361                **info:** `dict` of parameters extracted directly from subreport that describes how to read the data
362                    table and provides some basic metadata about the table (e.g. column header labels).
363                **data:** `list` of lists of data (table format) contained in the subreport
364            }
365    '''
366    info = parse_params(subreport_bytes)
367    data = []
368    for row in range(info['nln']):
369        data.append([])
370        for col in range(info['nco']):
371            offset = info['siz'] + row * info['src'] + info['f' + f'{col:02}']
372            type_int = info['t' + f'{col:02}']
373            if col < info['nco'] - 1:
374                size = min([type_int - 1000, info['f' + f'{col + 1:02}'] - info['f' + f'{col:02}']])
375            else:
376                size = info['src'] - info['f' + f'{col:02}']
377            if type_int > 1000:
378                val = decode_str(size, subreport_bytes, offset)
379            elif type_int in SUBREPORT_TYPE_FMT.keys():
380                fmt = SUBREPORT_TYPE_FMT[type_int]
381                val = struct.unpack_from(fmt, subreport_bytes, offset)[0]
382            else:
383                val = subreport_bytes[offset:offset + size]
384            data[row].append(val)
385    return {'info': info, 'data': data}
386
387
388def parse_report(blockbytes: bytes) -> dict:
389    '''Parses the report block of an OPUS file, such as Multi-Evaluation test reports, returning the report as a dict.
390
391    Report blocks are formatted in a very general way, potentially enabling a variety of different report structures.
392    This algorithm was developed using several OPUS files with a variety of different Multi-Evaluation Test Reports.
393    It is possible that other classes of test reports could be generated by OPUS that might change the structure, but
394    the overal organization and decoding methods should be similar.  In particular, the report block structure might
395    support multiple reports, but no such file has been available for testing to date.  This algorithm will extract a
396    single report and all the associated subreports.
397
398    Report blocks start with a mini parameter block that begins after the 12th byte.  It contains the following:
399        tit: Title of the report
400        f00: Starting position of the report summary data
401        Known unused parameters: bid, nrp, siz, e00, z00
402    This is followed by the report summary. For a multi-evaluation test report, this is a pair of tables summarizing the
403    methods applied to the spectrum.  It also specifies the number of subreports that follow, and the starting position
404    and title of each subreport. Some of the keys in this parameter set are described in the `parse_subreport` method.
405    Other parameters in the report summary include:
406        sub: Number of subreports
407        h00, h01 ... hxx: header labels of first summary table
408        v00, v01 ... vxx: corresponding values of first summary table
409        g00, g01 ... gxx: starting positions of each subreport relative to the start of this report summary
410        u00, u02 ... uxx: titles of each subreport
411    It should be noted that the only class of reports used for testing this algorithm were a variety of multi-evaluation
412    test reports. It is possible there are other similar report blocks OPUS writes that follow a similar structure but
413    could vary in some way that is incompatible with some of the assumptions made by this algorithm.
414
415    Args:
416        blockbytes: raw bytes of an OPUS file report block
417
418    Returns:
419        **report (dict):** `dict` containing report and subreport data 
420            {
421                **header:** `dict` of parameters from first mini param block
422                **info:** `dict` of parameters extracted from second mini param block
423                **data:** `list` of data that comprises second summary table
424                **subreports:** `list` of subreport `dict`s beneath the main report.
425            }
426    '''
427    header_ints = struct.unpack('<3i', blockbytes[:12])
428    header_info = parse_params(blockbytes[12:])
429    header_info['ints'] = header_ints
430    report_info = parse_subreport(blockbytes[header_info['f00']:])
431    report = {'header': header_info, 'info': report_info['info'], 'data': report_info['data']}
432    subreports = []
433    for idx in range(report_info['info']['sub']):
434        offset = header_info['f00'] + report_info['info']['g' + f'{idx:02}']
435        subreports.append(parse_subreport(blockbytes[offset:]))
436    report['subreports'] = subreports
437    return report
def read_opus_file_bytes(filepath) -> bytes:
10def read_opus_file_bytes(filepath) -> bytes:
11    '''Returns `bytes` of an OPUS file specified by `filepath` (or `None`).
12
13    Function determines if `filepath` points to an OPUS file by reading the first four bytes which are always the same
14    for OPUS files.  If `filepath` is not a file, or points to a non-OPUS file, the function returns `None`.  Otherwise
15    the function returns the entire file as raw `bytes`.
16
17    Args:
18        filepath (str or Path): full filepath to OPUS file
19
20    Returns:
21        **filebytes (bytes):** raw bytes of OPUS file or `None` (if filepath does not point to an OPUS file)
22    '''
23    filebytes = None
24    if os.path.isfile(filepath):
25        with open(filepath, 'rb') as f:
26            try:
27                first_four = f.read(4)
28                if first_four == b'\n\n\xfe\xfe':
29                    filebytes = first_four + f.read()
30            except:
31                pass # Empty file (or file with fewer than 4 bytes)
32    else:
33        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filepath)
34    return filebytes

Returns bytes of an OPUS file specified by filepath (or None).

Function determines if filepath points to an OPUS file by reading the first four bytes which are always the same for OPUS files. If filepath is not a file, or points to a non-OPUS file, the function returns None. Otherwise the function returns the entire file as raw bytes.

Arguments:
  • filepath (str or Path): full filepath to OPUS file
Returns:

filebytes (bytes): raw bytes of OPUS file or None (if filepath does not point to an OPUS file)

def get_block_type(type_int: int) -> tuple:
37def get_block_type(type_int: int) -> tuple:
38    '''Converts an int32 block type code to a six-integer tuple `block_type`.
39
40    This function is used to decode the `type_int` from the directory block of an OPUS file into a tuple of integers.
41    Each integer in the tuple provides information about the associated data block.
42
43    Args:
44        type_int: 32-bit integer decoded from file directory block
45
46    Returns:
47        **block_type (tuple):** six-integer tuple which specifies the block type
48    '''
49    type_bit_str = format(type_int, '#034b')  # binary representation as string
50    block_type = (
51        int(type_bit_str[-2:], 2),
52        int(type_bit_str[-4:-2], 2),
53        int(type_bit_str[-10:-4], 2),
54        int(type_bit_str[-17:-10], 2),
55        int(type_bit_str[-19:-17], 2),
56        int(type_bit_str[-22:-19], 2)
57    )
58    return block_type

Converts an int32 block type code to a six-integer tuple block_type.

This function is used to decode the type_int from the directory block of an OPUS file into a tuple of integers. Each integer in the tuple provides information about the associated data block.

Arguments:
  • type_int: 32-bit integer decoded from file directory block
Returns:

block_type (tuple): six-integer tuple which specifies the block type

def decode_str(size: int, blockbytes: bytes, offset: int) -> str:
61def decode_str(size: int, blockbytes: bytes, offset: int) -> str:
62    '''Decode string that is packed as bytes in `blockbytes` starting from `offset`.
63
64    Strings are frequently stored in OPUS files with a size designation that is larger than the actual string. The end
65    of the string is designated by a terminator byte: b'\x00'. This function unpacks the string using the size
66    designator, truncates at the terminator byte if found, and decodes as "latin-1"
67
68    Args:
69        size: size (number of bytes) of the string
70        blockbytes: raw bytes of an OPUS file block
71        offset: offset location where string begins in blockbytes
72
73    Returns:
74        string: decoded string
75    '''
76    fmt = '<' + str(size) + 's'
77    try:
78        val = struct.unpack_from(fmt, blockbytes, offset)[0]
79        x00_pos = val.find(b'\x00')
80        if x00_pos != -1:
81            val = val[:x00_pos].decode('latin-1')
82        else:
83            val = val.decode('latin-1')
84    except Exception as e:
85        val = 'Failed to decode: ' + str(e)
86    return val

Decode string that is packed as bytes in blockbytes starting from offset.

Strings are frequently stored in OPUS files with a size designation that is larger than the actual string. The end of the string is designated by a terminator byte: b''. This function unpacks the string using the size designator, truncates at the terminator byte if found, and decodes as "latin-1"

Arguments:
  • size: size (number of bytes) of the string
  • blockbytes: raw bytes of an OPUS file block
  • offset: offset location where string begins in blockbytes
Returns:

string: decoded string

def parse_header(filebytes: bytes) -> tuple:
 89def parse_header(filebytes: bytes) -> tuple:
 90    '''Parses the OPUS file header.
 91
 92    The header of an OPUS file contains some basic information about the file including the version number, location of
 93    the directory block, and number of blocks in the file. This header is first to be parsed as it specifies how to
 94    read the file directory block (which contains information about each block in the file)
 95
 96    Args:
 97        filebytes: raw bytes of OPUS file (all bytes)
 98
 99    Returns:
100        **header_info (tuple):**  
101            (  
102                **version (float64):** program version number as a floating-point date (later versions always greater)  
103                **directory_start (int32):** pointer to start location of directory block (number of bytes)  
104                **max_blocks (int32):** maximum number of blocks supported by the directory block (this should only be
105                    relevant when trying to edit an OPUS file, i.e. when adding data blocks to a file)  
106                **num_blocks (int32):** total number of blocks in the opus file  
107            )
108    '''
109    version = struct.unpack_from('<d', filebytes, 4)[0]
110    directory_start = struct.unpack_from('<i', filebytes, 12)[0]
111    max_blocks = struct.unpack_from('<i', filebytes, 16)[0]
112    num_blocks = struct.unpack_from('<i', filebytes, 20)[0]
113    return version, directory_start, max_blocks, num_blocks

Parses the OPUS file header.

The header of an OPUS file contains some basic information about the file including the version number, location of the directory block, and number of blocks in the file. This header is first to be parsed as it specifies how to read the file directory block (which contains information about each block in the file)

Arguments:
  • filebytes: raw bytes of OPUS file (all bytes)
Returns:

header_info (tuple):
(
version (float64): program version number as a floating-point date (later versions always greater)
directory_start (int32): pointer to start location of directory block (number of bytes)
max_blocks (int32): maximum number of blocks supported by the directory block (this should only be relevant when trying to edit an OPUS file, i.e. when adding data blocks to a file)
num_blocks (int32): total number of blocks in the opus file
)

def parse_directory(blockbytes: bytes) -> list:
116def parse_directory(blockbytes: bytes) -> list:
117    '''Parses directory block of OPUS file and returns a list of block info tuples: (type, size, start).
118
119    The directory block of an OPUS file contains information about every block in the file. The block information is
120    stored as three int32 values: `type_int`, `size_int`, `start`.  `type_int` is an integer representation of the block
121    type. The bits of this `type_int` have meaning and are parsed into a tuple using `get_block_type`. The `size_int` is
122    the size of the block in 32-bit words. `start` is the starting location of the block (in number of bytes).
123
124    Args:
125        blockbytes: raw bytes of an OPUS file directory block
126
127    Returns:
128        **blocks (list):** list of block_info tuples
129            **block_info (tuple):**
130                (  
131                    **block_type (tuple):** six-integer tuple which specifies the block type (see: `get_block_type`)  
132                    **size (int):** size (number of bytes) of the block  
133                    **start (int):** pointer to start location of the block (number of bytes)
134                )
135    '''
136    loc = 0
137    blocks = []
138    while loc < len(blockbytes):
139        type_int, size_int, start = struct.unpack_from('<3i', blockbytes, loc)
140        loc = loc + 12
141        if start > 0:
142            block_type = get_block_type(type_int)
143            size = size_int*4
144            blocks.append((block_type, size, start))
145        else:
146            break
147    return blocks

Parses directory block of OPUS file and returns a list of block info tuples: (type, size, start).

The directory block of an OPUS file contains information about every block in the file. The block information is stored as three int32 values: type_int, size_int, start. type_int is an integer representation of the block type. The bits of this type_int have meaning and are parsed into a tuple using get_block_type. The size_int is the size of the block in 32-bit words. start is the starting location of the block (in number of bytes).

Arguments:
  • blockbytes: raw bytes of an OPUS file directory block
Returns:

blocks (list): list of block_info tuples block_info (tuple): (
block_type (tuple): six-integer tuple which specifies the block type (see: get_block_type)
size (int): size (number of bytes) of the block
start (int): pointer to start location of the block (number of bytes) )

def parse_params(blockbytes: bytes) -> dict:
150def parse_params(blockbytes: bytes) -> dict:
151    '''Parses the bytes in a parameter block and returns a dict containing the decoded keys and vals.
152
153    Parameter blocks are in the form: `XXX`, `dtype_code`, `size`, `val`.  `XXX` is a three char abbreviation of the
154    parameter (key). The value of the parameter is decoded according to the `dtype_code` and size integers to be either:
155    `int`, `float`, or `string`.
156
157    Args:
158        blockbytes: raw bytes of an OPUS file parameter block
159
160    Returns:
161        **items (tuple):** (key, value) pairs where key is three char string (lowercase) and value can be `int`, `float`
162            or `string`.
163    '''
164    loc = 0
165    params = dict()
166    while loc < len(blockbytes):
167        key = blockbytes[loc:loc + 3].decode('utf-8')
168        if key == 'END':
169            break
170        dtype_code, val_size = struct.unpack_from('<2h', blockbytes[loc + 4:loc + 8])
171        val_size = val_size * 2
172        if dtype_code == 0:
173            val = struct.unpack_from('<i', blockbytes, loc + 8)[0]
174        elif dtype_code == 1:
175            val = struct.unpack_from('<d', blockbytes, loc + 8)[0]
176        else:
177            val = decode_str(val_size, blockbytes, loc + 8)
178        params[key.lower()] = val
179        loc = loc + val_size + 8
180    return params

Parses the bytes in a parameter block and returns a dict containing the decoded keys and vals.

Parameter blocks are in the form: XXX, dtype_code, size, val. XXX is a three char abbreviation of the parameter (key). The value of the parameter is decoded according to the dtype_code and size integers to be either: int, float, or string.

Arguments:
  • blockbytes: raw bytes of an OPUS file parameter block
Returns:

items (tuple): (key, value) pairs where key is three char string (lowercase) and value can be int, float or string.

def get_dpf_dtype_count(dpf: int, size: int) -> tuple:
183def get_dpf_dtype_count(dpf: int, size: int) -> tuple:
184    '''Returns numpy dtype and array count from the data point format (dpf) and block size (in bytes).
185
186    Args:
187        dpf: data point format integer stored in data status block.
188            dpf = 1 -> array of float32
189            dpf = 2 -> array of int32
190        size: Block size in bytes.
191
192    Returns:
193        **dtype (numpy.dtype):** `numpy` dtype for defining an `ndarray` to store the data
194        **count (int):** length of array calculated from the block size and byte size of the dtype.
195    '''
196    if dpf == 1:
197        dtype = np.float32
198        count = round(size/4)
199    elif dpf == 2:
200        dtype = np.int32
201        count = round(size/4)
202    else:
203        print('Unknown Data Point Format Requested:', dpf, '[using default: `float32`]')
204        dtype = np.float32
205        count = round(size/4)
206    return dtype, count

Returns numpy dtype and array count from the data point format (dpf) and block size (in bytes).

Arguments:
  • dpf: data point format integer stored in data status block. dpf = 1 -> array of float32 dpf = 2 -> array of int32
  • size: Block size in bytes.
Returns:

dtype (numpy.dtype): numpy dtype for defining an ndarray to store the data count (int): length of array calculated from the block size and byte size of the dtype.

def parse_data(blockbytes: bytes, dpf: int = 1) -> numpy.ndarray:
209def parse_data(blockbytes: bytes, dpf: int = 1) -> np.ndarray:
210    '''Parses the bytes in a data block and returns a `numpy` array.
211
212    Data blocks contain no metadata, only the y-values of a data array. Data arrays include: single-channel sample,
213    reference, phase, interferograms, and a variety of resultant data (transmission, absorption, etc.).  Every data
214    block should have a corresponding data status parameter block which can be used to generate the x-array values for
215    the data block. The data status block also specifies the data type of the data array with the `DPF` parameter. It
216    appears that OPUS currently exclusively stores data blocks as 32-bit floats, but has a reservation for 32-bit
217    integers when `DPF` = 2.
218
219    Args:
220        blockbytes: raw bytes of data block
221        dpf: data-point-format integer stored in corresponding data status block.
222
223    Returns:
224        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
225    '''
226    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
227    return np.frombuffer(blockbytes, dtype=dtype, count=count)

Parses the bytes in a data block and returns a numpy array.

Data blocks contain no metadata, only the y-values of a data array. Data arrays include: single-channel sample, reference, phase, interferograms, and a variety of resultant data (transmission, absorption, etc.). Every data block should have a corresponding data status parameter block which can be used to generate the x-array values for the data block. The data status block also specifies the data type of the data array with the DPF parameter. It appears that OPUS currently exclusively stores data blocks as 32-bit floats, but has a reservation for 32-bit integers when DPF = 2.

Arguments:
  • blockbytes: raw bytes of data block
  • dpf: data-point-format integer stored in corresponding data status block.
Returns:

y_array (numpy.ndarray): numpy array of y values contained in the data block

def parse_data_compact(blockbytes: bytes, npt: int, dpf: int = 1) -> numpy.ndarray:
230def parse_data_compact(blockbytes: bytes, npt: int, dpf: int = 1) -> np.ndarray:
231    '''Parses the bytes in a data compact block and returns a `numpy` array.
232
233    Some data blocks are stored in the "Compact" block format that includes some metadata that preceeds the raw data. At
234    this time, the metadata is ignored, and the compact spectra is extracted from the last bytes of the block that fit
235    the known array size (specified with "npt" in corresponding data status block).
236
237    Args:
238        blockbytes: raw bytes of data block
239        npt: number of data points in the spectra (from data status block)
240        dpf: data-point-format integer stored in corresponding data status block.
241
242    Returns:
243        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
244    '''
245    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
246    return np.frombuffer(blockbytes, dtype=dtype, count=count)

Parses the bytes in a data compact block and returns a numpy array.

Some data blocks are stored in the "Compact" block format that includes some metadata that preceeds the raw data. At this time, the metadata is ignored, and the compact spectra is extracted from the last bytes of the block that fit the known array size (specified with "npt" in corresponding data status block).

Arguments:
  • blockbytes: raw bytes of data block
  • npt: number of data points in the spectra (from data status block)
  • dpf: data-point-format integer stored in corresponding data status block.
Returns:

y_array (numpy.ndarray): numpy array of y values contained in the data block

def parse_data_series(blockbytes: bytes, dpf: int = 1) -> dict:
249def parse_data_series(blockbytes: bytes, dpf: int = 1) -> dict:
250    '''Parses the bytes in a 3D data block (series of spectra) and returns a data `dict` containing data and metadata.
251
252    3D data blocks are structured differently than standard data blocks. In addition to the series of spectra, they
253    include metadata for each of the spectrum.  This function returns a `dict` containing all the extracted information
254    from the data block.  The series spectra is formed into a 2D array while metadata captured for each spectra is
255    formed into a 1D array (length = number of spectral measurements in the series).
256
257    Args:
258        blockbytes: raw bytes of the data series block
259        dpf: data-point-format integer stored in corresponding data status block.
260
261    Returns:
262        **data_dict (dict):** `dict` containing all extracted information from the data block  
263            {  
264                **version:** file format version number (should be 0)  
265                **num_blocks:** number of sub blocks; each sub block features a data spectra and associated metadata  
266                **offset:** offset in bytes to the first sub data block  
267                **data_size:** size in bytes of each sub data block  
268                **info_size:** size in bytes of the metadata info block immediately following the sub data block  
269                **store_table:** run numbers of the first and last blocks to keep track of skipped spectra  
270                **y:** 2D `numpy` array containing all spectra (C-order)  
271                **metadata arrays:** series of metadata arrays in 1D array format (e.g. `npt`, `mny`, `mxy`, `ert`).
272                    The most useful one is generally `ert`, which can be used as the time axis for 3D data plots.
273            }
274    '''
275    header = struct.unpack_from('<6i', blockbytes, 0)
276    data = {
277        'version': header[0],
278        'num_blocks': header[1],
279        'offset': header[2],
280        'data_size': header[3],
281        'info_size': header[4],
282    }
283    data['store_table'] = [struct.unpack_from('<2i', blockbytes, 24 + i * 8) for i in range(header[5])]
284    dtype, count = get_dpf_dtype_count(dpf, data['data_size'])
285    data['y'] = np.zeros((data['num_blocks'], count), dtype=dtype)
286    for entry in STRUCT_3D_INFO_BLOCK:
287        data[entry['key']] = np.zeros((data['num_blocks']), dtype=entry['dtype'])
288    offset = data['offset']
289    for i in range(data['num_blocks']):
290        data['y'][i] = np.frombuffer(blockbytes[offset:], dtype=dtype, count=count)
291        offset = offset + data['data_size']
292        info_vals = struct.unpack_from('<' + ''.join([e['fmt'] for e in STRUCT_3D_INFO_BLOCK]), blockbytes, offset)
293        for j, entry in enumerate(STRUCT_3D_INFO_BLOCK):
294            data[entry['key']][i] = info_vals[j]
295        offset = offset + data['info_size']
296        if offset >= len(blockbytes):
297            num_spectra = i
298            break # Not all blocks are necessarily stored (see Store Table)
299        num_spectra = i
300    data['y'] = data['y'][:num_spectra]
301    for entry in STRUCT_3D_INFO_BLOCK:
302        data[entry['key']] = data[entry['key']][:num_spectra]
303    return data

Parses the bytes in a 3D data block (series of spectra) and returns a data dict containing data and metadata.

3D data blocks are structured differently than standard data blocks. In addition to the series of spectra, they include metadata for each of the spectrum. This function returns a dict containing all the extracted information from the data block. The series spectra is formed into a 2D array while metadata captured for each spectra is formed into a 1D array (length = number of spectral measurements in the series).

Arguments:
  • blockbytes: raw bytes of the data series block
  • dpf: data-point-format integer stored in corresponding data status block.
Returns:

data_dict (dict): dict containing all extracted information from the data block
{
version: file format version number (should be 0)
num_blocks: number of sub blocks; each sub block features a data spectra and associated metadata
offset: offset in bytes to the first sub data block
data_size: size in bytes of each sub data block
info_size: size in bytes of the metadata info block immediately following the sub data block
store_table: run numbers of the first and last blocks to keep track of skipped spectra
y: 2D numpy array containing all spectra (C-order)
metadata arrays: series of metadata arrays in 1D array format (e.g. npt, mny, mxy, ert). The most useful one is generally ert, which can be used as the time axis for 3D data plots. }

def parse_text(block_bytes: bytes, encoding='utf-8', backup_encoding='utf-16') -> str:
306def parse_text(block_bytes: bytes, encoding='utf-8', backup_encoding='utf-16') -> str:
307    '''Parses and OPUS file block as text (e.g. history or file-log block).
308
309    The history (aka file-log) block of an OPUS file contains some information about how the file was generated and
310    edits that have been performed on the file.  This function parses the text block but does not take any steps to
311    parameterizing what is contained in the text.  The history block is generally not needed to retrieve the file data
312    and metadata, but might be useful for inspecting the file.
313
314    Args:
315        blockbytes: raw bytes of the text block (e.g. history or file-log)
316        encoding: string represting text encoding type. Can be set to "utf-16" for chinese character support
317        backup_encoding: if default encoding fails, this encoding string will be attempted.
318
319    Returns:
320        text: string of text contained in the file block.
321    '''
322    byte_string = struct.unpack('<' + str(len(block_bytes)) + 's', block_bytes)[0]
323    byte_strings = byte_string.split(b'\x00')
324    strings = []
325    for entry in byte_strings:
326        if entry != b'':
327            try:
328                strings.append(entry.decode(encoding))
329            except Exception:
330                try:
331                    strings.append(entry.decode(backup_encoding))
332                except Exception:
333                    try:
334                        strings.append(entry.decode('latin-1'))
335                    except Exception as e:
336                        strings.append('<Decode Exception>: ' + str(e))
337    return '\n'.join(strings)

Parses and OPUS file block as text (e.g. history or file-log block).

The history (aka file-log) block of an OPUS file contains some information about how the file was generated and edits that have been performed on the file. This function parses the text block but does not take any steps to parameterizing what is contained in the text. The history block is generally not needed to retrieve the file data and metadata, but might be useful for inspecting the file.

Arguments:
  • blockbytes: raw bytes of the text block (e.g. history or file-log)
  • encoding: string represting text encoding type. Can be set to "utf-16" for chinese character support
  • backup_encoding: if default encoding fails, this encoding string will be attempted.
Returns:

text: string of text contained in the file block.

def parse_subreport(subreport_bytes: bytes) -> dict:
340def parse_subreport(subreport_bytes: bytes) -> dict:
341    '''Parses the bytes of a subreport and returns the extracted data as a dictionary
342
343    Subreports are contained within a report block (e.g. Multi-Evaluation Test Report). A report can contain multiple
344    subreports, and they generally follow a table format. This sub-block is organized with a mini parameter block
345    followed by packed data. The mini parameter block contains information about how to read the packed data:
346        nco: number of columns
347        nln: number of rows
348        siz: size of mini parameter block (number of bytes)
349        src: size in bytes of entire row of data (offset for extracting column data from row 2, 3 ...)
350        f00, f01 ... fxx: start position of data in column 0, 1 ... xx (relative to end of mini param block)
351        t00, t01 ... txx: integer representing type of data (e.g. int32, float32, float64, str, etc.)
352        s00, s01 ... sxx: column header label
353        p00, p01 ... pxx: formatting string for numbers in column 0, 1 ... xx (not included for every column)
354
355    Args:
356        subreport_bytes: raw bytes of the subreport. Needs to start precisely where subreport begins, but can include
357        data beyond the end of the subreport (i.e. end of subreport does not need to be determined a priori).
358
359    Returns:
360        **subreport (dict):** `dict` containing subreport data and extraction/formatting parameters  
361            {  
362                **info:** `dict` of parameters extracted directly from subreport that describes how to read the data
363                    table and provides some basic metadata about the table (e.g. column header labels).
364                **data:** `list` of lists of data (table format) contained in the subreport
365            }
366    '''
367    info = parse_params(subreport_bytes)
368    data = []
369    for row in range(info['nln']):
370        data.append([])
371        for col in range(info['nco']):
372            offset = info['siz'] + row * info['src'] + info['f' + f'{col:02}']
373            type_int = info['t' + f'{col:02}']
374            if col < info['nco'] - 1:
375                size = min([type_int - 1000, info['f' + f'{col + 1:02}'] - info['f' + f'{col:02}']])
376            else:
377                size = info['src'] - info['f' + f'{col:02}']
378            if type_int > 1000:
379                val = decode_str(size, subreport_bytes, offset)
380            elif type_int in SUBREPORT_TYPE_FMT.keys():
381                fmt = SUBREPORT_TYPE_FMT[type_int]
382                val = struct.unpack_from(fmt, subreport_bytes, offset)[0]
383            else:
384                val = subreport_bytes[offset:offset + size]
385            data[row].append(val)
386    return {'info': info, 'data': data}

Parses the bytes of a subreport and returns the extracted data as a dictionary

Subreports are contained within a report block (e.g. Multi-Evaluation Test Report). A report can contain multiple subreports, and they generally follow a table format. This sub-block is organized with a mini parameter block followed by packed data. The mini parameter block contains information about how to read the packed data: nco: number of columns nln: number of rows siz: size of mini parameter block (number of bytes) src: size in bytes of entire row of data (offset for extracting column data from row 2, 3 ...) f00, f01 ... fxx: start position of data in column 0, 1 ... xx (relative to end of mini param block) t00, t01 ... txx: integer representing type of data (e.g. int32, float32, float64, str, etc.) s00, s01 ... sxx: column header label p00, p01 ... pxx: formatting string for numbers in column 0, 1 ... xx (not included for every column)

Arguments:
  • subreport_bytes: raw bytes of the subreport. Needs to start precisely where subreport begins, but can include
  • data beyond the end of the subreport (i.e. end of subreport does not need to be determined a priori).
Returns:

subreport (dict): dict containing subreport data and extraction/formatting parameters
{
info: dict of parameters extracted directly from subreport that describes how to read the data table and provides some basic metadata about the table (e.g. column header labels). data: list of lists of data (table format) contained in the subreport }

def parse_report(blockbytes: bytes) -> dict:
389def parse_report(blockbytes: bytes) -> dict:
390    '''Parses the report block of an OPUS file, such as Multi-Evaluation test reports, returning the report as a dict.
391
392    Report blocks are formatted in a very general way, potentially enabling a variety of different report structures.
393    This algorithm was developed using several OPUS files with a variety of different Multi-Evaluation Test Reports.
394    It is possible that other classes of test reports could be generated by OPUS that might change the structure, but
395    the overal organization and decoding methods should be similar.  In particular, the report block structure might
396    support multiple reports, but no such file has been available for testing to date.  This algorithm will extract a
397    single report and all the associated subreports.
398
399    Report blocks start with a mini parameter block that begins after the 12th byte.  It contains the following:
400        tit: Title of the report
401        f00: Starting position of the report summary data
402        Known unused parameters: bid, nrp, siz, e00, z00
403    This is followed by the report summary. For a multi-evaluation test report, this is a pair of tables summarizing the
404    methods applied to the spectrum.  It also specifies the number of subreports that follow, and the starting position
405    and title of each subreport. Some of the keys in this parameter set are described in the `parse_subreport` method.
406    Other parameters in the report summary include:
407        sub: Number of subreports
408        h00, h01 ... hxx: header labels of first summary table
409        v00, v01 ... vxx: corresponding values of first summary table
410        g00, g01 ... gxx: starting positions of each subreport relative to the start of this report summary
411        u00, u02 ... uxx: titles of each subreport
412    It should be noted that the only class of reports used for testing this algorithm were a variety of multi-evaluation
413    test reports. It is possible there are other similar report blocks OPUS writes that follow a similar structure but
414    could vary in some way that is incompatible with some of the assumptions made by this algorithm.
415
416    Args:
417        blockbytes: raw bytes of an OPUS file report block
418
419    Returns:
420        **report (dict):** `dict` containing report and subreport data 
421            {
422                **header:** `dict` of parameters from first mini param block
423                **info:** `dict` of parameters extracted from second mini param block
424                **data:** `list` of data that comprises second summary table
425                **subreports:** `list` of subreport `dict`s beneath the main report.
426            }
427    '''
428    header_ints = struct.unpack('<3i', blockbytes[:12])
429    header_info = parse_params(blockbytes[12:])
430    header_info['ints'] = header_ints
431    report_info = parse_subreport(blockbytes[header_info['f00']:])
432    report = {'header': header_info, 'info': report_info['info'], 'data': report_info['data']}
433    subreports = []
434    for idx in range(report_info['info']['sub']):
435        offset = header_info['f00'] + report_info['info']['g' + f'{idx:02}']
436        subreports.append(parse_subreport(blockbytes[offset:]))
437    report['subreports'] = subreports
438    return report

Parses the report block of an OPUS file, such as Multi-Evaluation test reports, returning the report as a dict.

Report blocks are formatted in a very general way, potentially enabling a variety of different report structures. This algorithm was developed using several OPUS files with a variety of different Multi-Evaluation Test Reports. It is possible that other classes of test reports could be generated by OPUS that might change the structure, but the overal organization and decoding methods should be similar. In particular, the report block structure might support multiple reports, but no such file has been available for testing to date. This algorithm will extract a single report and all the associated subreports.

Report blocks start with a mini parameter block that begins after the 12th byte. It contains the following: tit: Title of the report f00: Starting position of the report summary data Known unused parameters: bid, nrp, siz, e00, z00 This is followed by the report summary. For a multi-evaluation test report, this is a pair of tables summarizing the methods applied to the spectrum. It also specifies the number of subreports that follow, and the starting position and title of each subreport. Some of the keys in this parameter set are described in the parse_subreport method.

Other parameters in the report summary include:

sub: Number of subreports h00, h01 ... hxx: header labels of first summary table v00, v01 ... vxx: corresponding values of first summary table g00, g01 ... gxx: starting positions of each subreport relative to the start of this report summary u00, u02 ... uxx: titles of each subreport

It should be noted that the only class of reports used for testing this algorithm were a variety of multi-evaluation test reports. It is possible there are other similar report blocks OPUS writes that follow a similar structure but could vary in some way that is incompatible with some of the assumptions made by this algorithm.

Arguments:
  • blockbytes: raw bytes of an OPUS file report block
Returns:

report (dict): dict containing report and subreport data { header: dict of parameters from first mini param block info: dict of parameters extracted from second mini param block data: list of data that comprises second summary table subreports: list of subreport dicts beneath the main report. }