brukeropus.file.parse

  1import os, struct, errno
  2import numpy as np
  3from brukeropus.file.constants import STRUCT_3D_INFO_BLOCK, SUBREPORT_TYPE_FMT
  4
  5
  6__docformat__ = "google"
  7
  8
  9def read_opus_file_bytes(filepath) -> bytes:
 10    '''Returns `bytes` of an OPUS file specified by `filepath` (or `None`).
 11
 12    Function determines if `filepath` points to an OPUS file by reading the first four bytes which are always the same
 13    for OPUS files.  If `filepath` is not a file, or points to a non-OPUS file, the function returns `None`.  Otherwise
 14    the function returns the entire file as raw `bytes`.
 15
 16    Args:
 17        filepath (str or Path): full filepath to OPUS file
 18
 19    Returns:
 20        **filebytes (bytes):** raw bytes of OPUS file or `None` (if filepath does not point to an OPUS file)
 21    '''
 22    filebytes = None
 23    if os.path.isfile(filepath):
 24        with open(filepath, 'rb') as f:
 25            try:
 26                first_four = f.read(4)
 27                if first_four == b'\n\n\xfe\xfe':
 28                    filebytes = first_four + f.read()
 29            except:
 30                pass # Empty file (or file with fewer than 4 bytes)
 31    else:
 32        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filepath)
 33    return filebytes
 34
 35
 36def get_block_type(type_int: int) -> tuple:
 37    '''Converts an int32 block type code to a six-integer tuple `block_type`.
 38
 39    This function is used to decode the `type_int` from the directory block of an OPUS file into a tuple of integers.
 40    Each integer in the tuple provides information about the associated data block.
 41
 42    Args:
 43        type_int: 32-bit integer decoded from file directory block
 44
 45    Returns:
 46        **block_type (tuple):** six-integer tuple which specifies the block type
 47    '''
 48    type_bit_str = format(type_int, '#034b')  # binary representation as string
 49    block_type = (
 50        int(type_bit_str[-2:], 2),
 51        int(type_bit_str[-4:-2], 2),
 52        int(type_bit_str[-10:-4], 2),
 53        int(type_bit_str[-17:-10], 2),
 54        int(type_bit_str[-19:-17], 2),
 55        int(type_bit_str[-22:-19], 2)
 56    )
 57    return block_type
 58
 59
 60def decode_str(size: int, blockbytes: bytes, offset: int) -> str:
 61    '''Decode string that is packed as bytes in `blockbytes` starting from `offset`.
 62
 63    Strings are frequently stored in OPUS files with a size designation that is larger than the actual string. The end
 64    of the string is designated by a terminator byte: b'\x00'. This function unpacks the string using the size
 65    designator, truncates at the terminator byte if found, and decodes as "latin-1"
 66
 67    Args:
 68        size: size (number of bytes) of the string
 69        blockbytes: raw bytes of an OPUS file block
 70        offset: offset location where string begins in blockbytes
 71
 72    Returns:
 73        string: decoded string
 74    '''
 75    fmt = '<' + str(size) + 's'
 76    try:
 77        val = struct.unpack_from(fmt, blockbytes, offset)[0]
 78        x00_pos = val.find(b'\x00')
 79        if x00_pos != -1:
 80            val = val[:x00_pos].decode('latin-1')
 81        else:
 82            val = val.decode('latin-1')
 83    except Exception as e:
 84        val = 'Failed to decode: ' + str(e)
 85    return val
 86
 87
 88def parse_header(filebytes: bytes) -> tuple:
 89    '''Parses the OPUS file header.
 90
 91    The header of an OPUS file contains some basic information about the file including the version number, location of
 92    the directory block, and number of blocks in the file. This header is first to be parsed as it specifies how to
 93    read the file directory block (which contains information about each block in the file)
 94
 95    Args:
 96        filebytes: raw bytes of OPUS file (all bytes)
 97
 98    Returns:
 99        **header_info (tuple):**  
100            (  
101                **version (float64):** program version number as a floating-point date (later versions always greater)  
102                **directory_start (int32):** pointer to start location of directory block (number of bytes)  
103                **max_blocks (int32):** maximum number of blocks supported by the directory block (this should only be
104                    relevant when trying to edit an OPUS file, i.e. when adding data blocks to a file)  
105                **num_blocks (int32):** total number of blocks in the opus file  
106            )
107    '''
108    version = struct.unpack_from('<d', filebytes, 4)[0]
109    directory_start = struct.unpack_from('<i', filebytes, 12)[0]
110    max_blocks = struct.unpack_from('<i', filebytes, 16)[0]
111    num_blocks = struct.unpack_from('<i', filebytes, 20)[0]
112    return version, directory_start, max_blocks, num_blocks
113
114
115def parse_directory(blockbytes: bytes) -> list:
116    '''Parses directory block of OPUS file and returns a list of block info tuples: (type, size, start).
117
118    The directory block of an OPUS file contains information about every block in the file. The block information is
119    stored as three int32 values: `type_int`, `size_int`, `start`.  `type_int` is an integer representation of the block
120    type. The bits of this `type_int` have meaning and are parsed into a tuple using `get_block_type`. The `size_int` is
121    the size of the block in 32-bit words. `start` is the starting location of the block (in number of bytes).
122
123    Args:
124        blockbytes: raw bytes of an OPUS file directory block
125
126    Returns:
127        **blocks (list):** list of block_info tuples
128            **block_info (tuple):**
129                (  
130                    **block_type (tuple):** six-integer tuple which specifies the block type (see: `get_block_type`)  
131                    **size (int):** size (number of bytes) of the block  
132                    **start (int):** pointer to start location of the block (number of bytes)
133                )
134    '''
135    loc = 0
136    blocks = []
137    while loc < len(blockbytes):
138        type_int, size_int, start = struct.unpack_from('<3i', blockbytes, loc)
139        loc = loc + 12
140        if start > 0:
141            block_type = get_block_type(type_int)
142            size = size_int*4
143            blocks.append((block_type, size, start))
144        else:
145            break
146    return blocks
147
148
149def parse_params(blockbytes: bytes) -> dict:
150    '''Parses the bytes in a parameter block and returns a dict containing the decoded keys and vals.
151
152    Parameter blocks are in the form: `XXX`, `dtype_code`, `size`, `val`.  `XXX` is a three char abbreviation of the
153    parameter (key). The value of the parameter is decoded according to the `dtype_code` and size integers to be either:
154    `int`, `float`, or `string`.
155
156    Args:
157        blockbytes: raw bytes of an OPUS file parameter block
158
159    Returns:
160        **items (tuple):** (key, value) pairs where key is three char string (lowercase) and value can be `int`, `float`
161            or `string`.
162    '''
163    loc = 0
164    params = dict()
165    while loc < len(blockbytes):
166        key = blockbytes[loc:loc + 3].decode('utf-8')
167        if key == 'END':
168            break
169        dtype_code, val_size = struct.unpack_from('<2h', blockbytes[loc + 4:loc + 8])
170        val_size = val_size * 2
171        if dtype_code == 0:
172            val = struct.unpack_from('<i', blockbytes, loc + 8)[0]
173        elif dtype_code == 1:
174            val = struct.unpack_from('<d', blockbytes, loc + 8)[0]
175        else:
176            val = decode_str(val_size, blockbytes, loc + 8)
177        params[key.lower()] = val
178        loc = loc + val_size + 8
179    return params
180
181
182def get_dpf_dtype_count(dpf: int, size: int) -> tuple:
183    '''Returns numpy dtype and array count from the data point format (dpf) and block size (in bytes).
184
185    Args:
186        dpf: data point format integer stored in data status block.
187            dpf = 1 -> array of float32
188            dpf = 2 -> array of int32
189        size: Block size in bytes.
190
191    Returns:
192        **dtype (numpy.dtype):** `numpy` dtype for defining an `ndarray` to store the data
193        **count (int):** length of array calculated from the block size and byte size of the dtype.
194    '''
195    if dpf == 2:
196        dtype = np.int32
197        count = round(size/4)
198    else:
199        dtype = np.float32
200        count = round(size/4)
201    return dtype, count
202
203
204def parse_data(blockbytes: bytes, dpf: int = 1) -> np.ndarray:
205    '''Parses the bytes in a data block and returns a `numpy` array.
206
207    Data blocks contain no metadata, only the y-values of a data array. Data arrays include: single-channel sample,
208    reference, phase, interferograms, and a variety of resultant data (transmission, absorption, etc.).  Every data
209    block should have a corresponding data status parameter block which can be used to generate the x-array values for
210    the data block. The data status block also specifies the data type of the data array with the `DPF` parameter. It
211    appears that OPUS currently exclusively stores data blocks as 32-bit floats, but has a reservation for 32-bit
212    integers when `DPF` = 2.
213
214    Args:
215        blockbytes: raw bytes of data block
216        dpf: data-point-format integer stored in corresponding data status block.
217
218    Returns:
219        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
220    '''
221    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
222    return np.frombuffer(blockbytes, dtype=dtype, count=count)
223
224
225def parse_data_compact(blockbytes: bytes, npt: int, dpf: int = 1) -> np.ndarray:
226    '''Parses the bytes in a data compact block and returns a `numpy` array.
227
228    Some data blocks are stored in the "Compact" block format that includes some metadata that preceeds the raw data. At
229    this time, the metadata is ignored, and the compact spectra is extracted from the last bytes of the block that fit
230    the known array size (specified with "npt" in corresponding data status block).
231
232    Args:
233        blockbytes: raw bytes of data block
234        npt: number of data points in the spectra (from data status block)
235        dpf: data-point-format integer stored in corresponding data status block.
236
237    Returns:
238        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
239    '''
240    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
241    return np.frombuffer(blockbytes, dtype=dtype, count=count)
242
243
244def parse_data_series(blockbytes: bytes, dpf: int = 1) -> dict:
245    '''Parses the bytes in a 3D data block (series of spectra) and returns a data `dict` containing data and metadata.
246
247    3D data blocks are structured differently than standard data blocks. In addition to the series of spectra, they
248    include metadata for each of the spectrum.  This function returns a `dict` containing all the extracted information
249    from the data block.  The series spectra is formed into a 2D array while metadata captured for each spectra is
250    formed into a 1D array (length = number of spectral measurements in the series).
251
252    Args:
253        blockbytes: raw bytes of the data series block
254        dpf: data-point-format integer stored in corresponding data status block.
255
256    Returns:
257        **data_dict (dict):** `dict` containing all extracted information from the data block  
258            {  
259                **version:** file format version number (should be 0)  
260                **num_blocks:** number of sub blocks; each sub block features a data spectra and associated metadata  
261                **offset:** offset in bytes to the first sub data block  
262                **data_size:** size in bytes of each sub data block  
263                **info_size:** size in bytes of the metadata info block immediately following the sub data block  
264                **store_table:** run numbers of the first and last blocks to keep track of skipped spectra  
265                **y:** 2D `numpy` array containing all spectra (C-order)  
266                **metadata arrays:** series of metadata arrays in 1D array format (e.g. `npt`, `mny`, `mxy`, `ert`).
267                    The most useful one is generally `ert`, which can be used as the time axis for 3D data plots.
268            }
269    '''
270    header = struct.unpack_from('<6i', blockbytes, 0)
271    data = {
272        'version': header[0],
273        'num_blocks': header[1],
274        'offset': header[2],
275        'data_size': header[3],
276        'info_size': header[4],
277    }
278    data['store_table'] = [struct.unpack_from('<2i', blockbytes, 24 + i * 8) for i in range(header[5])]
279    dtype, count = get_dpf_dtype_count(dpf, data['data_size'])
280    data['y'] = np.zeros((data['num_blocks'], count), dtype=dtype)
281    for entry in STRUCT_3D_INFO_BLOCK:
282        data[entry['key']] = np.zeros((data['num_blocks']), dtype=entry['dtype'])
283    offset = data['offset']
284    for i in range(data['num_blocks']):
285        data['y'][i] = np.frombuffer(blockbytes[offset:], dtype=dtype, count=count)
286        offset = offset + data['data_size']
287        info_vals = struct.unpack_from('<' + ''.join([e['fmt'] for e in STRUCT_3D_INFO_BLOCK]), blockbytes, offset)
288        for j, entry in enumerate(STRUCT_3D_INFO_BLOCK):
289            data[entry['key']][i] = info_vals[j]
290        offset = offset + data['info_size']
291    return data
292
293
294def parse_text(block_bytes: bytes, encoding='utf-8', backup_encoding='utf-16') -> str:
295    '''Parses and OPUS file block as text (e.g. history or file-log block).
296
297    The history (aka file-log) block of an OPUS file contains some information about how the file was generated and
298    edits that have been performed on the file.  This function parses the text block but does not take any steps to
299    parameterizing what is contained in the text.  The history block is generally not needed to retrieve the file data
300    and metadata, but might be useful for inspecting the file.
301
302    Args:
303        blockbytes: raw bytes of the text block (e.g. history or file-log)
304        encoding: string represting text encoding type. Can be set to "utf-16" for chinese character support
305        backup_encoding: if default encoding fails, this encoding string will be attempted.
306
307    Returns:
308        text: string of text contained in the file block.
309    '''
310    byte_string = struct.unpack('<' + str(len(block_bytes)) + 's', block_bytes)[0]
311    byte_strings = byte_string.split(b'\x00')
312    strings = []
313    for entry in byte_strings:
314        if entry != b'':
315            try:
316                strings.append(entry.decode(encoding))
317            except Exception:
318                try:
319                    strings.append(entry.decode(backup_encoding))
320                except Exception:
321                    try:
322                        strings.append(entry.decode('latin-1'))
323                    except Exception as e:
324                        strings.append('<Decode Exception>: ' + str(e))
325    return '\n'.join(strings)
326
327
328def parse_subreport(subreport_bytes: bytes) -> dict:
329    '''Parses the bytes of a subreport and returns the extracted data as a dictionary
330
331    Subreports are contained within a report block (e.g. Multi-Evaluation Test Report). A report can contain multiple
332    subreports, and they generally follow a table format. This sub-block is organized with a mini parameter block
333    followed by packed data. The mini parameter block contains information about how to read the packed data:
334        nco: number of columns
335        nln: number of rows
336        siz: size of mini parameter block (number of bytes)
337        src: size in bytes of entire row of data (offset for extracting column data from row 2, 3 ...)
338        f00, f01 ... fxx: start position of data in column 0, 1 ... xx (relative to end of mini param block)
339        t00, t01 ... txx: integer representing type of data (e.g. int32, float32, float64, str, etc.)
340        s00, s01 ... sxx: column header label
341        p00, p01 ... pxx: formatting string for numbers in column 0, 1 ... xx (not included for every column)
342
343    Args:
344        subreport_bytes: raw bytes of the subreport. Needs to start precisely where subreport begins, but can include
345        data beyond the end of the subreport (i.e. end of subreport does not need to be determined a priori).
346
347    Returns:
348        **subreport (dict):** `dict` containing subreport data and extraction/formatting parameters  
349            {  
350                **info:** `dict` of parameters extracted directly from subreport that describes how to read the data
351                    table and provides some basic metadata about the table (e.g. column header labels).
352                **data:** `list` of lists of data (table format) contained in the subreport
353            }
354    '''
355    info = parse_params(subreport_bytes)
356    data = []
357    for row in range(info['nln']):
358        data.append([])
359        for col in range(info['nco']):
360            offset = info['siz'] + row * info['src'] + info['f' + f'{col:02}']
361            type_int = info['t' + f'{col:02}']
362            if col < info['nco'] - 1:
363                size = min([type_int - 1000, info['f' + f'{col + 1:02}'] - info['f' + f'{col:02}']])
364            else:
365                size = info['src'] - info['f' + f'{col:02}']
366            if type_int > 1000:
367                val = decode_str(size, subreport_bytes, offset)
368            elif type_int in SUBREPORT_TYPE_FMT.keys():
369                fmt = SUBREPORT_TYPE_FMT[type_int]
370                val = struct.unpack_from(fmt, subreport_bytes, offset)[0]
371            else:
372                val = subreport_bytes[offset:offset + size]
373            data[row].append(val)
374    return {'info': info, 'data': data}
375
376
377def parse_report(blockbytes: bytes) -> dict:
378    '''Parses the report block of an OPUS file, such as Multi-Evaluation test reports, returning the report as a dict.
379
380    Report blocks are formatted in a very general way, potentially enabling a variety of different report structures.
381    This algorithm was developed using several OPUS files with a variety of different Multi-Evaluation Test Reports.
382    It is possible that other classes of test reports could be generated by OPUS that might change the structure, but
383    the overal organization and decoding methods should be similar.  In particular, the report block structure might
384    support multiple reports, but no such file has been available for testing to date.  This algorithm will extract a
385    single report and all the associated subreports.
386
387    Report blocks start with a mini parameter block that begins after the 12th byte.  It contains the following:
388        tit: Title of the report
389        f00: Starting position of the report summary data
390        Known unused parameters: bid, nrp, siz, e00, z00
391    This is followed by the report summary. For a multi-evaluation test report, this is a pair of tables summarizing the
392    methods applied to the spectrum.  It also specifies the number of subreports that follow, and the starting position
393    and title of each subreport. Some of the keys in this parameter set are described in the `parse_subreport` method.
394    Other parameters in the report summary include:
395        sub: Number of subreports
396        h00, h01 ... hxx: header labels of first summary table
397        v00, v01 ... vxx: corresponding values of first summary table
398        g00, g01 ... gxx: starting positions of each subreport relative to the start of this report summary
399        u00, u02 ... uxx: titles of each subreport
400    It should be noted that the only class of reports used for testing this algorithm were a variety of multi-evaluation
401    test reports. It is possible there are other similar report blocks OPUS writes that follow a similar structure but
402    could vary in some way that is incompatible with some of the assumptions made by this algorithm.
403
404    Args:
405        blockbytes: raw bytes of an OPUS file report block
406
407    Returns:
408        **report (dict):** `dict` containing report and subreport data 
409            {
410                **header:** `dict` of parameters from first mini param block
411                **info:** `dict` of parameters extracted from second mini param block
412                **data:** `list` of data that comprises second summary table
413                **subreports:** `list` of subreport `dict`s beneath the main report.
414            }
415    '''
416    header_ints = struct.unpack('<3i', blockbytes[:12])
417    header_info = parse_params(blockbytes[12:])
418    header_info['ints'] = header_ints
419    report_info = parse_subreport(blockbytes[header_info['f00']:])
420    report = {'header': header_info, 'info': report_info['info'], 'data': report_info['data']}
421    subreports = []
422    for idx in range(report_info['info']['sub']):
423        offset = header_info['f00'] + report_info['info']['g' + f'{idx:02}']
424        subreports.append(parse_subreport(blockbytes[offset:]))
425    report['subreports'] = subreports
426    return report
def read_opus_file_bytes(filepath) -> bytes:
10def read_opus_file_bytes(filepath) -> bytes:
11    '''Returns `bytes` of an OPUS file specified by `filepath` (or `None`).
12
13    Function determines if `filepath` points to an OPUS file by reading the first four bytes which are always the same
14    for OPUS files.  If `filepath` is not a file, or points to a non-OPUS file, the function returns `None`.  Otherwise
15    the function returns the entire file as raw `bytes`.
16
17    Args:
18        filepath (str or Path): full filepath to OPUS file
19
20    Returns:
21        **filebytes (bytes):** raw bytes of OPUS file or `None` (if filepath does not point to an OPUS file)
22    '''
23    filebytes = None
24    if os.path.isfile(filepath):
25        with open(filepath, 'rb') as f:
26            try:
27                first_four = f.read(4)
28                if first_four == b'\n\n\xfe\xfe':
29                    filebytes = first_four + f.read()
30            except:
31                pass # Empty file (or file with fewer than 4 bytes)
32    else:
33        raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), filepath)
34    return filebytes

Returns bytes of an OPUS file specified by filepath (or None).

Function determines if filepath points to an OPUS file by reading the first four bytes which are always the same for OPUS files. If filepath is not a file, or points to a non-OPUS file, the function returns None. Otherwise the function returns the entire file as raw bytes.

Arguments:
  • filepath (str or Path): full filepath to OPUS file
Returns:

filebytes (bytes): raw bytes of OPUS file or None (if filepath does not point to an OPUS file)

def get_block_type(type_int: int) -> tuple:
37def get_block_type(type_int: int) -> tuple:
38    '''Converts an int32 block type code to a six-integer tuple `block_type`.
39
40    This function is used to decode the `type_int` from the directory block of an OPUS file into a tuple of integers.
41    Each integer in the tuple provides information about the associated data block.
42
43    Args:
44        type_int: 32-bit integer decoded from file directory block
45
46    Returns:
47        **block_type (tuple):** six-integer tuple which specifies the block type
48    '''
49    type_bit_str = format(type_int, '#034b')  # binary representation as string
50    block_type = (
51        int(type_bit_str[-2:], 2),
52        int(type_bit_str[-4:-2], 2),
53        int(type_bit_str[-10:-4], 2),
54        int(type_bit_str[-17:-10], 2),
55        int(type_bit_str[-19:-17], 2),
56        int(type_bit_str[-22:-19], 2)
57    )
58    return block_type

Converts an int32 block type code to a six-integer tuple block_type.

This function is used to decode the type_int from the directory block of an OPUS file into a tuple of integers. Each integer in the tuple provides information about the associated data block.

Arguments:
  • type_int: 32-bit integer decoded from file directory block
Returns:

block_type (tuple): six-integer tuple which specifies the block type

def decode_str(size: int, blockbytes: bytes, offset: int) -> str:
61def decode_str(size: int, blockbytes: bytes, offset: int) -> str:
62    '''Decode string that is packed as bytes in `blockbytes` starting from `offset`.
63
64    Strings are frequently stored in OPUS files with a size designation that is larger than the actual string. The end
65    of the string is designated by a terminator byte: b'\x00'. This function unpacks the string using the size
66    designator, truncates at the terminator byte if found, and decodes as "latin-1"
67
68    Args:
69        size: size (number of bytes) of the string
70        blockbytes: raw bytes of an OPUS file block
71        offset: offset location where string begins in blockbytes
72
73    Returns:
74        string: decoded string
75    '''
76    fmt = '<' + str(size) + 's'
77    try:
78        val = struct.unpack_from(fmt, blockbytes, offset)[0]
79        x00_pos = val.find(b'\x00')
80        if x00_pos != -1:
81            val = val[:x00_pos].decode('latin-1')
82        else:
83            val = val.decode('latin-1')
84    except Exception as e:
85        val = 'Failed to decode: ' + str(e)
86    return val

Decode string that is packed as bytes in blockbytes starting from offset.

Strings are frequently stored in OPUS files with a size designation that is larger than the actual string. The end of the string is designated by a terminator byte: b''. This function unpacks the string using the size designator, truncates at the terminator byte if found, and decodes as "latin-1"

Arguments:
  • size: size (number of bytes) of the string
  • blockbytes: raw bytes of an OPUS file block
  • offset: offset location where string begins in blockbytes
Returns:

string: decoded string

def parse_header(filebytes: bytes) -> tuple:
 89def parse_header(filebytes: bytes) -> tuple:
 90    '''Parses the OPUS file header.
 91
 92    The header of an OPUS file contains some basic information about the file including the version number, location of
 93    the directory block, and number of blocks in the file. This header is first to be parsed as it specifies how to
 94    read the file directory block (which contains information about each block in the file)
 95
 96    Args:
 97        filebytes: raw bytes of OPUS file (all bytes)
 98
 99    Returns:
100        **header_info (tuple):**  
101            (  
102                **version (float64):** program version number as a floating-point date (later versions always greater)  
103                **directory_start (int32):** pointer to start location of directory block (number of bytes)  
104                **max_blocks (int32):** maximum number of blocks supported by the directory block (this should only be
105                    relevant when trying to edit an OPUS file, i.e. when adding data blocks to a file)  
106                **num_blocks (int32):** total number of blocks in the opus file  
107            )
108    '''
109    version = struct.unpack_from('<d', filebytes, 4)[0]
110    directory_start = struct.unpack_from('<i', filebytes, 12)[0]
111    max_blocks = struct.unpack_from('<i', filebytes, 16)[0]
112    num_blocks = struct.unpack_from('<i', filebytes, 20)[0]
113    return version, directory_start, max_blocks, num_blocks

Parses the OPUS file header.

The header of an OPUS file contains some basic information about the file including the version number, location of the directory block, and number of blocks in the file. This header is first to be parsed as it specifies how to read the file directory block (which contains information about each block in the file)

Arguments:
  • filebytes: raw bytes of OPUS file (all bytes)
Returns:

header_info (tuple):
(
version (float64): program version number as a floating-point date (later versions always greater)
directory_start (int32): pointer to start location of directory block (number of bytes)
max_blocks (int32): maximum number of blocks supported by the directory block (this should only be relevant when trying to edit an OPUS file, i.e. when adding data blocks to a file)
num_blocks (int32): total number of blocks in the opus file
)

def parse_directory(blockbytes: bytes) -> list:
116def parse_directory(blockbytes: bytes) -> list:
117    '''Parses directory block of OPUS file and returns a list of block info tuples: (type, size, start).
118
119    The directory block of an OPUS file contains information about every block in the file. The block information is
120    stored as three int32 values: `type_int`, `size_int`, `start`.  `type_int` is an integer representation of the block
121    type. The bits of this `type_int` have meaning and are parsed into a tuple using `get_block_type`. The `size_int` is
122    the size of the block in 32-bit words. `start` is the starting location of the block (in number of bytes).
123
124    Args:
125        blockbytes: raw bytes of an OPUS file directory block
126
127    Returns:
128        **blocks (list):** list of block_info tuples
129            **block_info (tuple):**
130                (  
131                    **block_type (tuple):** six-integer tuple which specifies the block type (see: `get_block_type`)  
132                    **size (int):** size (number of bytes) of the block  
133                    **start (int):** pointer to start location of the block (number of bytes)
134                )
135    '''
136    loc = 0
137    blocks = []
138    while loc < len(blockbytes):
139        type_int, size_int, start = struct.unpack_from('<3i', blockbytes, loc)
140        loc = loc + 12
141        if start > 0:
142            block_type = get_block_type(type_int)
143            size = size_int*4
144            blocks.append((block_type, size, start))
145        else:
146            break
147    return blocks

Parses directory block of OPUS file and returns a list of block info tuples: (type, size, start).

The directory block of an OPUS file contains information about every block in the file. The block information is stored as three int32 values: type_int, size_int, start. type_int is an integer representation of the block type. The bits of this type_int have meaning and are parsed into a tuple using get_block_type. The size_int is the size of the block in 32-bit words. start is the starting location of the block (in number of bytes).

Arguments:
  • blockbytes: raw bytes of an OPUS file directory block
Returns:

blocks (list): list of block_info tuples block_info (tuple): (
block_type (tuple): six-integer tuple which specifies the block type (see: get_block_type)
size (int): size (number of bytes) of the block
start (int): pointer to start location of the block (number of bytes) )

def parse_params(blockbytes: bytes) -> dict:
150def parse_params(blockbytes: bytes) -> dict:
151    '''Parses the bytes in a parameter block and returns a dict containing the decoded keys and vals.
152
153    Parameter blocks are in the form: `XXX`, `dtype_code`, `size`, `val`.  `XXX` is a three char abbreviation of the
154    parameter (key). The value of the parameter is decoded according to the `dtype_code` and size integers to be either:
155    `int`, `float`, or `string`.
156
157    Args:
158        blockbytes: raw bytes of an OPUS file parameter block
159
160    Returns:
161        **items (tuple):** (key, value) pairs where key is three char string (lowercase) and value can be `int`, `float`
162            or `string`.
163    '''
164    loc = 0
165    params = dict()
166    while loc < len(blockbytes):
167        key = blockbytes[loc:loc + 3].decode('utf-8')
168        if key == 'END':
169            break
170        dtype_code, val_size = struct.unpack_from('<2h', blockbytes[loc + 4:loc + 8])
171        val_size = val_size * 2
172        if dtype_code == 0:
173            val = struct.unpack_from('<i', blockbytes, loc + 8)[0]
174        elif dtype_code == 1:
175            val = struct.unpack_from('<d', blockbytes, loc + 8)[0]
176        else:
177            val = decode_str(val_size, blockbytes, loc + 8)
178        params[key.lower()] = val
179        loc = loc + val_size + 8
180    return params

Parses the bytes in a parameter block and returns a dict containing the decoded keys and vals.

Parameter blocks are in the form: XXX, dtype_code, size, val. XXX is a three char abbreviation of the parameter (key). The value of the parameter is decoded according to the dtype_code and size integers to be either: int, float, or string.

Arguments:
  • blockbytes: raw bytes of an OPUS file parameter block
Returns:

items (tuple): (key, value) pairs where key is three char string (lowercase) and value can be int, float or string.

def get_dpf_dtype_count(dpf: int, size: int) -> tuple:
183def get_dpf_dtype_count(dpf: int, size: int) -> tuple:
184    '''Returns numpy dtype and array count from the data point format (dpf) and block size (in bytes).
185
186    Args:
187        dpf: data point format integer stored in data status block.
188            dpf = 1 -> array of float32
189            dpf = 2 -> array of int32
190        size: Block size in bytes.
191
192    Returns:
193        **dtype (numpy.dtype):** `numpy` dtype for defining an `ndarray` to store the data
194        **count (int):** length of array calculated from the block size and byte size of the dtype.
195    '''
196    if dpf == 2:
197        dtype = np.int32
198        count = round(size/4)
199    else:
200        dtype = np.float32
201        count = round(size/4)
202    return dtype, count

Returns numpy dtype and array count from the data point format (dpf) and block size (in bytes).

Arguments:
  • dpf: data point format integer stored in data status block. dpf = 1 -> array of float32 dpf = 2 -> array of int32
  • size: Block size in bytes.
Returns:

dtype (numpy.dtype): numpy dtype for defining an ndarray to store the data count (int): length of array calculated from the block size and byte size of the dtype.

def parse_data(blockbytes: bytes, dpf: int = 1) -> numpy.ndarray:
205def parse_data(blockbytes: bytes, dpf: int = 1) -> np.ndarray:
206    '''Parses the bytes in a data block and returns a `numpy` array.
207
208    Data blocks contain no metadata, only the y-values of a data array. Data arrays include: single-channel sample,
209    reference, phase, interferograms, and a variety of resultant data (transmission, absorption, etc.).  Every data
210    block should have a corresponding data status parameter block which can be used to generate the x-array values for
211    the data block. The data status block also specifies the data type of the data array with the `DPF` parameter. It
212    appears that OPUS currently exclusively stores data blocks as 32-bit floats, but has a reservation for 32-bit
213    integers when `DPF` = 2.
214
215    Args:
216        blockbytes: raw bytes of data block
217        dpf: data-point-format integer stored in corresponding data status block.
218
219    Returns:
220        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
221    '''
222    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
223    return np.frombuffer(blockbytes, dtype=dtype, count=count)

Parses the bytes in a data block and returns a numpy array.

Data blocks contain no metadata, only the y-values of a data array. Data arrays include: single-channel sample, reference, phase, interferograms, and a variety of resultant data (transmission, absorption, etc.). Every data block should have a corresponding data status parameter block which can be used to generate the x-array values for the data block. The data status block also specifies the data type of the data array with the DPF parameter. It appears that OPUS currently exclusively stores data blocks as 32-bit floats, but has a reservation for 32-bit integers when DPF = 2.

Arguments:
  • blockbytes: raw bytes of data block
  • dpf: data-point-format integer stored in corresponding data status block.
Returns:

y_array (numpy.ndarray): numpy array of y values contained in the data block

def parse_data_compact(blockbytes: bytes, npt: int, dpf: int = 1) -> numpy.ndarray:
226def parse_data_compact(blockbytes: bytes, npt: int, dpf: int = 1) -> np.ndarray:
227    '''Parses the bytes in a data compact block and returns a `numpy` array.
228
229    Some data blocks are stored in the "Compact" block format that includes some metadata that preceeds the raw data. At
230    this time, the metadata is ignored, and the compact spectra is extracted from the last bytes of the block that fit
231    the known array size (specified with "npt" in corresponding data status block).
232
233    Args:
234        blockbytes: raw bytes of data block
235        npt: number of data points in the spectra (from data status block)
236        dpf: data-point-format integer stored in corresponding data status block.
237
238    Returns:
239        **y_array (numpy.ndarray):** `numpy` array of y values contained in the data block
240    '''
241    dtype, count = get_dpf_dtype_count(dpf=dpf, size=len(blockbytes))
242    return np.frombuffer(blockbytes, dtype=dtype, count=count)

Parses the bytes in a data compact block and returns a numpy array.

Some data blocks are stored in the "Compact" block format that includes some metadata that preceeds the raw data. At this time, the metadata is ignored, and the compact spectra is extracted from the last bytes of the block that fit the known array size (specified with "npt" in corresponding data status block).

Arguments:
  • blockbytes: raw bytes of data block
  • npt: number of data points in the spectra (from data status block)
  • dpf: data-point-format integer stored in corresponding data status block.
Returns:

y_array (numpy.ndarray): numpy array of y values contained in the data block

def parse_data_series(blockbytes: bytes, dpf: int = 1) -> dict:
245def parse_data_series(blockbytes: bytes, dpf: int = 1) -> dict:
246    '''Parses the bytes in a 3D data block (series of spectra) and returns a data `dict` containing data and metadata.
247
248    3D data blocks are structured differently than standard data blocks. In addition to the series of spectra, they
249    include metadata for each of the spectrum.  This function returns a `dict` containing all the extracted information
250    from the data block.  The series spectra is formed into a 2D array while metadata captured for each spectra is
251    formed into a 1D array (length = number of spectral measurements in the series).
252
253    Args:
254        blockbytes: raw bytes of the data series block
255        dpf: data-point-format integer stored in corresponding data status block.
256
257    Returns:
258        **data_dict (dict):** `dict` containing all extracted information from the data block  
259            {  
260                **version:** file format version number (should be 0)  
261                **num_blocks:** number of sub blocks; each sub block features a data spectra and associated metadata  
262                **offset:** offset in bytes to the first sub data block  
263                **data_size:** size in bytes of each sub data block  
264                **info_size:** size in bytes of the metadata info block immediately following the sub data block  
265                **store_table:** run numbers of the first and last blocks to keep track of skipped spectra  
266                **y:** 2D `numpy` array containing all spectra (C-order)  
267                **metadata arrays:** series of metadata arrays in 1D array format (e.g. `npt`, `mny`, `mxy`, `ert`).
268                    The most useful one is generally `ert`, which can be used as the time axis for 3D data plots.
269            }
270    '''
271    header = struct.unpack_from('<6i', blockbytes, 0)
272    data = {
273        'version': header[0],
274        'num_blocks': header[1],
275        'offset': header[2],
276        'data_size': header[3],
277        'info_size': header[4],
278    }
279    data['store_table'] = [struct.unpack_from('<2i', blockbytes, 24 + i * 8) for i in range(header[5])]
280    dtype, count = get_dpf_dtype_count(dpf, data['data_size'])
281    data['y'] = np.zeros((data['num_blocks'], count), dtype=dtype)
282    for entry in STRUCT_3D_INFO_BLOCK:
283        data[entry['key']] = np.zeros((data['num_blocks']), dtype=entry['dtype'])
284    offset = data['offset']
285    for i in range(data['num_blocks']):
286        data['y'][i] = np.frombuffer(blockbytes[offset:], dtype=dtype, count=count)
287        offset = offset + data['data_size']
288        info_vals = struct.unpack_from('<' + ''.join([e['fmt'] for e in STRUCT_3D_INFO_BLOCK]), blockbytes, offset)
289        for j, entry in enumerate(STRUCT_3D_INFO_BLOCK):
290            data[entry['key']][i] = info_vals[j]
291        offset = offset + data['info_size']
292    return data

Parses the bytes in a 3D data block (series of spectra) and returns a data dict containing data and metadata.

3D data blocks are structured differently than standard data blocks. In addition to the series of spectra, they include metadata for each of the spectrum. This function returns a dict containing all the extracted information from the data block. The series spectra is formed into a 2D array while metadata captured for each spectra is formed into a 1D array (length = number of spectral measurements in the series).

Arguments:
  • blockbytes: raw bytes of the data series block
  • dpf: data-point-format integer stored in corresponding data status block.
Returns:

data_dict (dict): dict containing all extracted information from the data block
{
version: file format version number (should be 0)
num_blocks: number of sub blocks; each sub block features a data spectra and associated metadata
offset: offset in bytes to the first sub data block
data_size: size in bytes of each sub data block
info_size: size in bytes of the metadata info block immediately following the sub data block
store_table: run numbers of the first and last blocks to keep track of skipped spectra
y: 2D numpy array containing all spectra (C-order)
metadata arrays: series of metadata arrays in 1D array format (e.g. npt, mny, mxy, ert). The most useful one is generally ert, which can be used as the time axis for 3D data plots. }

def parse_text(block_bytes: bytes, encoding='utf-8', backup_encoding='utf-16') -> str:
295def parse_text(block_bytes: bytes, encoding='utf-8', backup_encoding='utf-16') -> str:
296    '''Parses and OPUS file block as text (e.g. history or file-log block).
297
298    The history (aka file-log) block of an OPUS file contains some information about how the file was generated and
299    edits that have been performed on the file.  This function parses the text block but does not take any steps to
300    parameterizing what is contained in the text.  The history block is generally not needed to retrieve the file data
301    and metadata, but might be useful for inspecting the file.
302
303    Args:
304        blockbytes: raw bytes of the text block (e.g. history or file-log)
305        encoding: string represting text encoding type. Can be set to "utf-16" for chinese character support
306        backup_encoding: if default encoding fails, this encoding string will be attempted.
307
308    Returns:
309        text: string of text contained in the file block.
310    '''
311    byte_string = struct.unpack('<' + str(len(block_bytes)) + 's', block_bytes)[0]
312    byte_strings = byte_string.split(b'\x00')
313    strings = []
314    for entry in byte_strings:
315        if entry != b'':
316            try:
317                strings.append(entry.decode(encoding))
318            except Exception:
319                try:
320                    strings.append(entry.decode(backup_encoding))
321                except Exception:
322                    try:
323                        strings.append(entry.decode('latin-1'))
324                    except Exception as e:
325                        strings.append('<Decode Exception>: ' + str(e))
326    return '\n'.join(strings)

Parses and OPUS file block as text (e.g. history or file-log block).

The history (aka file-log) block of an OPUS file contains some information about how the file was generated and edits that have been performed on the file. This function parses the text block but does not take any steps to parameterizing what is contained in the text. The history block is generally not needed to retrieve the file data and metadata, but might be useful for inspecting the file.

Arguments:
  • blockbytes: raw bytes of the text block (e.g. history or file-log)
  • encoding: string represting text encoding type. Can be set to "utf-16" for chinese character support
  • backup_encoding: if default encoding fails, this encoding string will be attempted.
Returns:

text: string of text contained in the file block.

def parse_subreport(subreport_bytes: bytes) -> dict:
329def parse_subreport(subreport_bytes: bytes) -> dict:
330    '''Parses the bytes of a subreport and returns the extracted data as a dictionary
331
332    Subreports are contained within a report block (e.g. Multi-Evaluation Test Report). A report can contain multiple
333    subreports, and they generally follow a table format. This sub-block is organized with a mini parameter block
334    followed by packed data. The mini parameter block contains information about how to read the packed data:
335        nco: number of columns
336        nln: number of rows
337        siz: size of mini parameter block (number of bytes)
338        src: size in bytes of entire row of data (offset for extracting column data from row 2, 3 ...)
339        f00, f01 ... fxx: start position of data in column 0, 1 ... xx (relative to end of mini param block)
340        t00, t01 ... txx: integer representing type of data (e.g. int32, float32, float64, str, etc.)
341        s00, s01 ... sxx: column header label
342        p00, p01 ... pxx: formatting string for numbers in column 0, 1 ... xx (not included for every column)
343
344    Args:
345        subreport_bytes: raw bytes of the subreport. Needs to start precisely where subreport begins, but can include
346        data beyond the end of the subreport (i.e. end of subreport does not need to be determined a priori).
347
348    Returns:
349        **subreport (dict):** `dict` containing subreport data and extraction/formatting parameters  
350            {  
351                **info:** `dict` of parameters extracted directly from subreport that describes how to read the data
352                    table and provides some basic metadata about the table (e.g. column header labels).
353                **data:** `list` of lists of data (table format) contained in the subreport
354            }
355    '''
356    info = parse_params(subreport_bytes)
357    data = []
358    for row in range(info['nln']):
359        data.append([])
360        for col in range(info['nco']):
361            offset = info['siz'] + row * info['src'] + info['f' + f'{col:02}']
362            type_int = info['t' + f'{col:02}']
363            if col < info['nco'] - 1:
364                size = min([type_int - 1000, info['f' + f'{col + 1:02}'] - info['f' + f'{col:02}']])
365            else:
366                size = info['src'] - info['f' + f'{col:02}']
367            if type_int > 1000:
368                val = decode_str(size, subreport_bytes, offset)
369            elif type_int in SUBREPORT_TYPE_FMT.keys():
370                fmt = SUBREPORT_TYPE_FMT[type_int]
371                val = struct.unpack_from(fmt, subreport_bytes, offset)[0]
372            else:
373                val = subreport_bytes[offset:offset + size]
374            data[row].append(val)
375    return {'info': info, 'data': data}

Parses the bytes of a subreport and returns the extracted data as a dictionary

Subreports are contained within a report block (e.g. Multi-Evaluation Test Report). A report can contain multiple subreports, and they generally follow a table format. This sub-block is organized with a mini parameter block followed by packed data. The mini parameter block contains information about how to read the packed data: nco: number of columns nln: number of rows siz: size of mini parameter block (number of bytes) src: size in bytes of entire row of data (offset for extracting column data from row 2, 3 ...) f00, f01 ... fxx: start position of data in column 0, 1 ... xx (relative to end of mini param block) t00, t01 ... txx: integer representing type of data (e.g. int32, float32, float64, str, etc.) s00, s01 ... sxx: column header label p00, p01 ... pxx: formatting string for numbers in column 0, 1 ... xx (not included for every column)

Arguments:
  • subreport_bytes: raw bytes of the subreport. Needs to start precisely where subreport begins, but can include
  • data beyond the end of the subreport (i.e. end of subreport does not need to be determined a priori).
Returns:

subreport (dict): dict containing subreport data and extraction/formatting parameters
{
info: dict of parameters extracted directly from subreport that describes how to read the data table and provides some basic metadata about the table (e.g. column header labels). data: list of lists of data (table format) contained in the subreport }

def parse_report(blockbytes: bytes) -> dict:
378def parse_report(blockbytes: bytes) -> dict:
379    '''Parses the report block of an OPUS file, such as Multi-Evaluation test reports, returning the report as a dict.
380
381    Report blocks are formatted in a very general way, potentially enabling a variety of different report structures.
382    This algorithm was developed using several OPUS files with a variety of different Multi-Evaluation Test Reports.
383    It is possible that other classes of test reports could be generated by OPUS that might change the structure, but
384    the overal organization and decoding methods should be similar.  In particular, the report block structure might
385    support multiple reports, but no such file has been available for testing to date.  This algorithm will extract a
386    single report and all the associated subreports.
387
388    Report blocks start with a mini parameter block that begins after the 12th byte.  It contains the following:
389        tit: Title of the report
390        f00: Starting position of the report summary data
391        Known unused parameters: bid, nrp, siz, e00, z00
392    This is followed by the report summary. For a multi-evaluation test report, this is a pair of tables summarizing the
393    methods applied to the spectrum.  It also specifies the number of subreports that follow, and the starting position
394    and title of each subreport. Some of the keys in this parameter set are described in the `parse_subreport` method.
395    Other parameters in the report summary include:
396        sub: Number of subreports
397        h00, h01 ... hxx: header labels of first summary table
398        v00, v01 ... vxx: corresponding values of first summary table
399        g00, g01 ... gxx: starting positions of each subreport relative to the start of this report summary
400        u00, u02 ... uxx: titles of each subreport
401    It should be noted that the only class of reports used for testing this algorithm were a variety of multi-evaluation
402    test reports. It is possible there are other similar report blocks OPUS writes that follow a similar structure but
403    could vary in some way that is incompatible with some of the assumptions made by this algorithm.
404
405    Args:
406        blockbytes: raw bytes of an OPUS file report block
407
408    Returns:
409        **report (dict):** `dict` containing report and subreport data 
410            {
411                **header:** `dict` of parameters from first mini param block
412                **info:** `dict` of parameters extracted from second mini param block
413                **data:** `list` of data that comprises second summary table
414                **subreports:** `list` of subreport `dict`s beneath the main report.
415            }
416    '''
417    header_ints = struct.unpack('<3i', blockbytes[:12])
418    header_info = parse_params(blockbytes[12:])
419    header_info['ints'] = header_ints
420    report_info = parse_subreport(blockbytes[header_info['f00']:])
421    report = {'header': header_info, 'info': report_info['info'], 'data': report_info['data']}
422    subreports = []
423    for idx in range(report_info['info']['sub']):
424        offset = header_info['f00'] + report_info['info']['g' + f'{idx:02}']
425        subreports.append(parse_subreport(blockbytes[offset:]))
426    report['subreports'] = subreports
427    return report

Parses the report block of an OPUS file, such as Multi-Evaluation test reports, returning the report as a dict.

Report blocks are formatted in a very general way, potentially enabling a variety of different report structures. This algorithm was developed using several OPUS files with a variety of different Multi-Evaluation Test Reports. It is possible that other classes of test reports could be generated by OPUS that might change the structure, but the overal organization and decoding methods should be similar. In particular, the report block structure might support multiple reports, but no such file has been available for testing to date. This algorithm will extract a single report and all the associated subreports.

Report blocks start with a mini parameter block that begins after the 12th byte. It contains the following: tit: Title of the report f00: Starting position of the report summary data Known unused parameters: bid, nrp, siz, e00, z00 This is followed by the report summary. For a multi-evaluation test report, this is a pair of tables summarizing the methods applied to the spectrum. It also specifies the number of subreports that follow, and the starting position and title of each subreport. Some of the keys in this parameter set are described in the parse_subreport method.

Other parameters in the report summary include:

sub: Number of subreports h00, h01 ... hxx: header labels of first summary table v00, v01 ... vxx: corresponding values of first summary table g00, g01 ... gxx: starting positions of each subreport relative to the start of this report summary u00, u02 ... uxx: titles of each subreport

It should be noted that the only class of reports used for testing this algorithm were a variety of multi-evaluation test reports. It is possible there are other similar report blocks OPUS writes that follow a similar structure but could vary in some way that is incompatible with some of the assumptions made by this algorithm.

Arguments:
  • blockbytes: raw bytes of an OPUS file report block
Returns:

report (dict): dict containing report and subreport data { header: dict of parameters from first mini param block info: dict of parameters extracted from second mini param block data: list of data that comprises second summary table subreports: list of subreport dicts beneath the main report. }