"""
A module for reading microlab MDF file.
Usage
import mdf_parser.mdf_parser as mdf
mdf_object = mdf.MDFParser(file_name)
Author: Eelco van Vliet
29-2-2015
"""
__author__ = "Eelco van Vliet"
__copyright__ = "Eelco van Vliet"
__license__ = "MIT"
import argparse
import logging
import struct
import sys
from builtins import object, range
from os.path import getsize, join, split, splitext
from re import IGNORECASE, compile, match, search, sub
from sys import version_info
import numpy as np
import pandas as pd
from . import __version__
from .mdf_blocks import DataSetRecord, MDHFileHeader, set_logging_level
PythonVersion = version_info[0]
logging.basicConfig(format="%(message)s", level=logging.INFO)
_logger = logging.getLogger(__name__)
MDF_EXTENSION = ".mdf"
[docs]
def decode_ymdhms(ymdhms):
"""The year month day hour minute seconds are stored in the 4-byte integer
Parameters
----------
ymdhms : int
A 4-byte integer containing the date time according to the MTF manual
Returns
-------
type
The ISO Data time string
"""
ymdhms = int(ymdhms)
year = np.right_shift(ymdhms & 0b11111100000000000000000000000000, 26) + 1980
months = np.right_shift(ymdhms & 0b00000011110000000000000000000000, 22)
days = np.right_shift(ymdhms & 0b00000000001111100000000000000000, 17)
hours = np.right_shift(ymdhms & 0b00000000000000011111000000000000, 12)
minutes = np.right_shift(ymdhms & 0b00000000000000000000111111000000, 6)
seconds = np.right_shift(ymdhms & 0b00000000000000000000000000111111, 0)
return year, months, days, hours, minutes, seconds
[docs]
def convert_ymdhms_to_data_time(ymdhrs_array, sample_rate=1, constant_sample_rate=True):
"""Convert the binary year month day hour minutes seconds representation into a
readable data/time string
Parameters
----------
ymdhrs_array : binary
array with the ymdhrs datatime integers
sample_rate : float, optional
the sampling rate of the signal (Default value = 1)
constant_sample_rate : bool, optional
If True assume that the sample read is leading (Default value = True)
Returns
-------
type
DateTime pandas index array
Notes
-----
In the first version of the script, the ymdhrs was taken as leading and the number
of samples per seconds we corrected to take care of missing samples or too many
samples in a second.
It appears that the sample rate is really constant and that the clock time may vary.
Setting this flag true takes the sample rate leading
"""
# initialise some variables
number_of_samples = ymdhrs_array.shape[0]
delta_t_ms = int(1000.0 / sample_rate)
# create an empty datetime64 array
date_time_array = np.empty(number_of_samples, dtype="datetime64[ms]")
# the date time format string: YYYY-MM-DDThh:mm:ss.msec
dts_format = "{:04d}-{:02d}-{:02d}T{:02d}:{:02d}:{:02d}.{:03d}"
if constant_sample_rate:
# This routine return the date time based on the assumption the the sample rate
# is leading and constant
# get the start time first
(year, month, day, hour, minutes, seconds) = decode_ymdhms(ymdhrs_array[0])
start_date_time = np.datetime64(
dts_format.format(year, month, day, hour, minutes, seconds, 0), "ms"
)
end_date_time = start_date_time + np.timedelta64(
delta_t_ms * number_of_samples, "ms"
)
date_time_array = np.arange(start_date_time, end_date_time, step=delta_t_ms)
else:
# This routine assumes that the ymdhrs is leading and exact and that the sample
# rate is not constant. This means that the number of samples per second can
# slightly vary. In this routine, samples are removed and added such that the
# ymdhrs is correct. This routine may not be needed as it appears that in fact,
# the sampling rate is very constant and the PC clock is not.
# To use the 'constant_sample_rate' by default, initialize the milliseconds on
# 0; the ymdhrs integer does not hold milliseconds, we get it our by counting
ms = 0
ms_prev = None
old_sec = None
i_first_change = None # Keep the index where the seconds changed
cnt_per_sec = 0
cnt_list = list()
sec_step = 0
for i in range(number_of_samples):
# convert the ymdhrs 4-byte integer into readable integers for each item
(year, month, day, hour, minutes, seconds) = decode_ymdhms(ymdhrs_array[i])
# update the milliseconds-counter if the seconds increase
if old_sec is not None and ((old_sec < seconds) or (sec_step == -59)):
# set the milliseconds to zero again
ms = 0
cnt_list.append(cnt_per_sec)
cnt_per_sec = 0
if i_first_change is None:
# Store the position where we change for the first time to a new
# second
i_first_change = i
else:
# increase the milliseconds with the time step in milliseconds
ms += delta_t_ms
# correct the milliseconds if too many samples occurred
if ms_prev is not None and ms >= 1000:
# Add a small time-step when we get closer to the 1000.
ms = ms_prev + (1000 - ms_prev) / 2.0
# create a data time string and store it in the datetime64 numpy array
date_time_array[i] = dts_format.format(
year, month, day, hour, minutes, seconds, ms
)
cnt_per_sec += 1
if old_sec is not None:
# This is to monitor when we step back from 59 s to 0 s at the
# minute/hour change
sec_step = seconds - old_sec
old_sec = seconds
ms_prev = ms
# Correct for the first i_first_change sample as the ms may not have started at
# 0 as we assumed
date_time_array[:i_first_change] += np.timedelta64(
delta_t_ms * (int(sample_rate) - i_first_change - 1), "ms"
)
return pd.DatetimeIndex(date_time_array)
[docs]
class MDFParser(object):
"""
The MDFParser class contains methods for reading mdf files.
Parameters
----------
mdf_file : str
Path to a binary the follows MDF 3.3 specification
import_data: bool , optional
Flag to enable to import the data, default = True. If False, only the header
information is read.
include_columns : list
List with columns to import. Default value = []. If empty, all columns are
included
exclude_columns: list
List with colums to exclude. Default value =[]. If empty, none are excluded
verbose: int
Set the logging level. Obsolete
0. Silent
1. Normal info
2. Debugging
convert_datetime: int, optional
Translate the ymdhms integer into a data time string
resample_data: bool, optional, False
The sampled data is not completely uniformly sampled. To enforce an equidistant
sampling, set this flag to true
constant_sample_rate: bool, optional
If true, use the sample rate for the clock, otherwise, the ymdhms is leading.
Defaults to *True*
replace_record_names : dict, optional
A dictionary with records names which we want to replace from A1 to B1
date_time_label : str, optional
Default label to assign to the Date time string. Default = "DateTime"
date_time_match_string : str, optional
The date time column is selected based on this match string.
Default = "_DateTime32"
load_date_time: bool, optional
Always read the date time information channel, even if it is not explicitly
mentioned in the filter list. Defaults to *True*
set_relative_time_column: bool, optional
If true, create a column *time_r* in seconds with the relative time starting at
t=0 s. Defaults to *False*
Examples
--------
Reading an MDF file is done by creating a MDFParser object with a file_name as first
argument.
>>> file_name = "../data/AMS_BALDER_110225T233000_UTC222959.mdf"
>>> header_object = MDFParser(mdf_file=file_name, import_data=False)
>>> names = header_object.make_report()
If the *import_data* flag would have been set to *True*, the *header_object* class
would have been created and all MDF data would be put in a data frame
*header_object.data*.
In this example, however, we only read the header information of the MDF file first.
As a next step, we can make a selection of the data columns we want to import.
In this way the reading time of an MDF data file can be reduced significantly as
only the selected data needs to be imported.
The data available in the mdf file can be explored by using the *make_report()*
method. which writes all channels to screen. Now, we are going to select the
MRU_Roll data first.
>>> from tabulate import tabulate
>>> names_labels_and_groups = header_object.set_column_selection(
... filter_list=["MRU_Roll"], include_date_time=True)
>>> header_object.import_data()
>>> print(tabulate(header_object.data.head(5), headers="keys", tablefmt="psql"))
+----------------------------+------------+
| DateTime | MRU_Roll |
|----------------------------+------------|
| 2011-02-25 23:30:00 | 0.01207 |
| 2011-02-25 23:30:00.040000 | 0.01207 |
| 2011-02-25 23:30:00.080000 | 0.01207 |
| 2011-02-25 23:30:00.120000 | 0.01204 |
| 2011-02-25 23:30:00.160000 | 0.01204 |
+----------------------------+------------+
The *names_labels_and_groups* now contains 3 lists, but we don't use it now.
For more information about the return values, look at the docstring of the
*set_column_selection* method.
Because we have added the *include_date_time* flag, the DataTime column is read by
default and set as the index of the DataFrame. You can do this multiple times if
you want to add more columns.
The *include_data_time* does not have to be given again as we already have imported
the DateTime.
So let's import the MRU Roll Pitch Heave channels as well. We do this with a regular
expression matching all the channels names starting with MRU_R, MRU_P, or MRU_H
>>> names_labels_and_groups = header_object.set_column_selection(
... filter_list=["MRU_[RPH]"])
>>> header_object.import_data()
>>> print(tabulate(header_object.data.head(5), headers="keys", tablefmt="psql"))
+----------------------------+------------+-------------+-------------+
| DateTime | MRU_Roll | MRU_Heave | MRU_Pitch |
|----------------------------+------------+-------------+-------------|
| 2011-02-25 23:30:00 | 0.01207 | -0.1051 | -0.0001869 |
| 2011-02-25 23:30:00.040000 | 0.01207 | -0.1051 | -0.0001869 |
| 2011-02-25 23:30:00.080000 | 0.01207 | -0.1051 | -0.0001869 |
| 2011-02-25 23:30:00.120000 | 0.01204 | -0.1078 | -0.0002593 |
| 2011-02-25 23:30:00.160000 | 0.01204 | -0.1078 | -0.0002593 |
+----------------------------+------------+-------------+-------------+
Since all data is stored in the Pandas Dataframe *header_object.data* we can plot
the data using all Pandas/matplotlib plotting capabilities.
This is demonstrated in the example notebook.
"""
def __init__(
self,
mdf_file,
import_data=True,
include_columns=None,
exclude_columns=None,
verbose=1,
convert_datetime=True,
resample_data=False,
constant_sample_rate=True,
replace_record_names=dict(),
log_level=logging.WARNING,
date_time_label="DateTime",
date_time_match_string=r"^_DateTime32$",
load_date_time=True,
set_relative_time_column=False,
include_date_time=False,
):
self.include_columns = include_columns
self.exclude_columns = exclude_columns
self.convert_datetime = convert_datetime
self.resample_data = resample_data
self.constant_sample_rate = constant_sample_rate
self.verbose = verbose
self.replace_record_names = replace_record_names
self.date_time_label = date_time_label
self.date_time_match_string = date_time_match_string
self.load_date_time = load_date_time
self.include_date_time = include_date_time
self.set_relative_time_column = set_relative_time_column
if self.set_relative_time_column:
# If we want to set a relative data time column, the include date time
# must be set
self.include_date_time = True
# set the logger and update the level if you want to debug
self.log = logging.getLogger(__name__)
self.log.setLevel(log_level)
self.header = None
self.data = None # The dataframe will contain all the data at the end
mdf_file_base, extension = splitext(mdf_file)
if not bool(match(MDF_EXTENSION, extension, IGNORECASE)):
raise ImportError("file name should be of type {}".format(MDF_EXTENSION))
self.mdf_path_to_file = split(mdf_file_base)[0]
self.dta_filename = None
# initialise some attributes
self.dataset_records = []
# dictionary to hold the index belong to each record name
self.dataset_records_index = dict()
self.filename_record = None
self.time_record = None
self.data = None
# import the mdf header
self.log.debug("Starting reading mdf_file header")
self.import_header(mdf_file)
if self.include_date_time:
# To prevent problems later, just put the date time column in the include
# list
self.set_column_selection([self.date_time_match_string])
# only import the data if import_data was set true
if import_data:
self.log.debug("Starting reading dta data ")
self.import_data()
[docs]
def import_data(self, set_relative_time_column=None):
"""
Import the binary data from the dta file
Parameters
----------
set_relative_time_column: bool or None, optional
If true, store the relative time in the time_r column. Default is None,
which means that the value as stored during initialization of the class is
taken. This is *False* by default, but can also be passed through the
constructor arguments.
"""
self.log = logging.getLogger(__name__)
set_logging_level(self.log, self.verbose)
if set_relative_time_column is not None:
# only if the argument is explicitly passed, we can overrule the value
self.set_relative_time_column = set_relative_time_column
self.dta_filename = join(self.mdf_path_to_file, self.filename_record.filename)
file_size = getsize(self.dta_filename)
n_frames_to_read = int(file_size / self.header.frame_size)
self.log.debug(
"Reading {} frames from {}".format(n_frames_to_read, self.dta_filename)
)
with open(self.dta_filename, "rb") as fp_dta:
byte_array = fp_dta.read(n_frames_to_read * self.header.frame_size)
data_columns = {}
date_time_column_name = None
# Loop over all the data sets records and convert them from the binary array and
# put them in the dataframe
for index, record in enumerate(self.dataset_records):
# Check if we want to skip the column based on the include_columns and
# exclude_columns lists
if (
self.include_columns is not None
and record.name not in self.include_columns
):
continue
if self.exclude_columns is not None and record.name in self.exclude_columns:
continue
if self.data is not None and record.name in self.data.columns:
_logger.debug(f"Column {record.name} was already imported. skipping")
continue
# Proceed with reading the data
self.log.debug(
f"Decoding data set nr {index} : {record.name}/{record.label} "
f"(format {record.data_format})"
)
record_data = record.byte_to_ndarray(
byte_array=byte_array,
n_frames_to_read=n_frames_to_read,
frame_size=self.header.frame_size,
)
# Copy the numpy array in the Pandas data frame
if record.data_format == "ymdhms" and self.convert_datetime:
# convert the ymdhms integer into a datatime-index.
date_time_index = convert_ymdhms_to_data_time(
record_data,
self.time_record.sample_rate,
constant_sample_rate=self.constant_sample_rate,
)
record_data = date_time_index.values
record.name = self.date_time_label
date_time_column_name = record.name
record.loaded_data = True
data_columns[record.name] = record_data
# Done with the loop. Add all the data
data_to_add = pd.DataFrame.from_dict(data_columns)
if date_time_column_name is not None:
data_to_add = data_to_add.set_index(date_time_column_name, drop=True)
if self.data is None:
self.data = data_to_add
else:
# add new data to the data frame.
data_to_add.index = self.data.index
overlap = data_to_add.columns.intersection(self.data.columns)
if not overlap.empty:
_logger.warning("Overlapping columns detected. This should not happen")
self.data = pd.concat([self.data, data_to_add], axis=1)
if self.convert_datetime and self.resample_data:
# resampling is only required if the
delta_time = int(1000.0 / self.time_record.sample_rate)
self.log.debug("Resampling the data set with dt = {}".format(delta_time))
# Resample get a parameter of the sampling frequency in ms as a
# string-argument.
# The resampled result needs to be passed to interpolate to fill the gaps
self.data = self.data.resample("{}ms".format(delta_time)).interpolate()
if self.set_relative_time_column:
# set the relative time in a separate column
self.data["time_r"] = (self.data.index - self.data.index[0]) / pd.Timedelta(
1, "s"
)
[docs]
def set_column_selection(
self, filter_list, set_on_exclude_list=False, include_date_time=None
):
"""Select the data to import based on a list of regular expressions
Parameters
----------
filter_list : list
A list with regular expression in which the first filter is always applied
on the name field and the next filters are all applied to the label field
of the record.
set_on_exclude_list : bool, optional
By default, the selected columns are added to the include list.
If this value is true, set the selection on the excluded list.
Defaults to False
include_date_time: bool, optional
Include the date time field by default (without specification in the
*filter_list*).
Handy for the examples as you don't have to specify the DateTime explicitly.
Defaults to None, implying that the setting is taken from the constructor
and is set to *False*.
Returns
-------
tuple (name_list, label_list, group_list)
Selection of name columns along with a list of the () group selection
Notes
-----
The data reader allows passing a list of exclude_columns and include_columns by
which you can select which column is actually read. With the routine, lists can
be created by a regular expression filter
"""
if include_date_time is not None:
# we can overrule the include date time
self.include_date_time = include_date_time
# Create the regular expression belong to the name and labels.
# For the labels, multiple regular expression can be defined.
# Note that filtering takes place based on the name field and label fields of
# the record.
# The name field only has one item, whereas the label fields can have multiple
# items. Tol select a record, define a reg exp for first the name field, and
# then the multiple labels field if required.
regular_expression_labels = list()
regular_expression_names = None
for index, reg_filter in enumerate(filter_list):
if index == 0:
# The first in the list is always the filter applied on the name field
# of the record
if (
self.load_date_time
and self.include_date_time
and not bool(search(self.date_time_match_string, reg_filter))
):
# if the date time label is not present yet in the filter
reg_filter += "|{}".format(self.date_time_match_string)
regular_expression = compile(reg_filter)
self.log.debug("Filtering on name with {}".format(reg_filter))
regular_expression_names = regular_expression
else:
# The remaining of the filter all apply on the label field of the record
self.log.debug("Filtering on label with {}".format(reg_filter))
regular_expression = compile(reg_filter)
regular_expression_labels.append(regular_expression)
name_list = []
label_list = []
group_lists = []
if not regular_expression_labels:
# In case no second filter is given (ie no filter for the label fields), set
# it to fit everything with .*
regular_expression_labels.append(compile(".*"))
for record in self.dataset_records:
# first check if the name field matches the namefield regular expression
date_time_match = match(self.date_time_match_string, record.name)
if bool(regular_expression_names.search(record.name)) or (
date_time_match and self.include_date_time
):
if record.name in name_list:
# We have this record already in the list. Continue to the next
continue
# It matches, now check if the label fields match the label field
# regular expression by looping
self.log.debug("Checking = {} ".format(record.name))
for reg_exp in regular_expression_labels:
label_match = reg_exp.search(record.label)
if bool(label_match):
# This label match the regular expression, to add this record to
# the name list
self.log.debug(
"Found match name = {} label = {}"
"".format(record.name, record.label)
)
name_list.append(record.name)
label_list.append(record.label)
group_number = 1
groups = []
while group_number:
# A regular expression stores all patterns in between () in
# a group.
# These patterns are here stored in the group list such that
# it can be used later if needed
try:
groups.append(label_match.group(group_number))
group_number += 1
except (IndexError, AttributeError):
# No () were used in the regular expression to no groups
# need to be stored
group_number = 0
# the group list eventually contains a list of lists with stored
# matches per label
group_lists.append(groups)
# The lists of names created by this routine are normally stored in the
# include_list, such that all matches are imported.
# You can also specify it to be excluded by the set_on_exclude_list flag
def append_missing_name(column_list, names):
if column_list is None:
column_list = names
else:
for name in names:
if name not in column_list:
column_list.append(name)
return column_list
if set_on_exclude_list:
self.exclude_columns = append_missing_name(self.exclude_columns, name_list)
else:
self.include_columns = append_missing_name(self.include_columns, name_list)
return name_list, label_list, group_lists
[docs]
def make_report(self, show_loaded_data_only=False):
"""
Make a report of the records available in the mdf file
Parameters
----------
show_loaded_data_only: bool, optional
If True, only show the data columns that have been loaded. Default = False,
which means that all channels are shown
Returns
-------
list
List of the reported columns. We can use this list to obtain the channel
name by the index
"""
self.log.setLevel(logging.INFO)
msg1 = "{:3s} {:5s} {:6s} : {:50s} : {}"
msg2 = "{:3d} {:5d} {:6d} : {:50s} : {}"
self.log.info(msg1.format("cnt", "index", "Loaded", "Name", "Label"))
self.log.info(msg1.format("-" * 3, "-" * 5, "-" * 6, "-" * 50, "-" * 30))
cnt = 0
loaded_data_list = list()
for index, record in enumerate(self.dataset_records):
if show_loaded_data_only and not record.loaded_data:
# Skip if not loaded this data and the show loaded_data_only flag is
# True
continue
self.log.info(
msg2.format(cnt, index, record.loaded_data, record.name, record.label)
)
cnt += 1
loaded_data_list.append(record.name)
return loaded_data_list
[docs]
def parse_args(args):
"""Parse command line parameters
Parameters
----------
args : list
Command line parameters as a list of strings
Returns
-------
:obj:`argparse.Namespace`
command line parameters
"""
parser = argparse.ArgumentParser(description="A tool to read mdf fils")
parser.add_argument(
"--version",
action="version",
version="mdf_reader {ver}".format(ver=__version__),
)
parser.add_argument(
"-v",
"--verbose",
dest="loglevel",
help="set loglevel to INFO",
action="store_const",
const=logging.INFO,
)
parser.add_argument(
"-vv",
"--very-verbose",
dest="loglevel",
help="set loglevel to DEBUG",
action="store_const",
const=logging.DEBUG,
)
return parser.parse_args(args)
[docs]
def main(args):
"""
The main routine for testing purpose
Parameters
----------
args : list
Command line arguments
"""
args = parse_args(args)
logging.basicConfig(level=args.loglevel, stream=sys.stdout)
_logger.debug("Writing pickling data to tests directory...")
_logger.info("Script ends here")
[docs]
def run():
""" """
main(sys.argv[1:])
if __name__ == "__main__":
run()