Source code for pyvmu.vmu931

import serial
import time
import struct
import logging
import pyvmu.messages as messages


[docs]class VMU931Parser(object): """ This class is responsible for communicating with and parsing data from the VMU931 inertial measurement unit. """
[docs] def __init__(self, device="/dev/tty.usbmodem1411", accelerometer=False, magnetometer=False, gyroscope=False, euler=False, quaternion=False, heading=False ): """ Opens a connection to the VMU931 device :param device: Serial device name (on Windows) or path (nix, including OS X). :param accelerometer: Enable/disable accelerometer data streaming. :param magnetometer: Enable/disable magnetometer data streaming. :param gyroscope: Enable/disable gyroscope data streaming. :param euler: Enable/disable euler angle data streaming. :param quaternion: Enable/disable quaternion data streaming. :param heading: Enable/disable compass heading data streaming. """ self.ser = serial.Serial(device) self.device_status = None self.parse() self.set_accelerometer(accelerometer) self.set_magnetometer(magnetometer) self.set_gyroscope(gyroscope) self.set_euler(euler) self.set_quaternion(quaternion) self.set_heading(heading)
def __enter__(self): return self def __exit__(self, t, value, traceback): self.ser.close()
[docs] def set_quaternion(self, state): """ Enable/disable streaming of quaternion data. :param state: True/False, desired state """ assert self.device_status is not None, "Device status is not set" if self.device_status.quaternions_streaming != state: self._toggle_quaternion()
[docs] def set_euler(self, state): """ Enable/disable streaming of euler angle data. :param state: True/False, desired state """ assert self.device_status is not None, "Device status is not set" if self.device_status.euler_streaming != state: self._toggle_euler()
[docs] def set_accelerometer(self, state): """ Enable/disable streaming of accelerometer data. :param state: True/False, desired state """ assert self.device_status is not None, "Device status is not set" if self.device_status.accelerometer_streaming != state: self._toggle_accelerometer()
[docs] def set_magnetometer(self, state): """ Enable/disable streaming of magnetometer data. :param state: True/False, desired state """ assert self.device_status is not None, "Device status is not set" if self.device_status.magnetometer_streaming != state: self._toggle_magnetometer()
[docs] def set_gyroscope(self, state): """ Enabled/disable streaming of gyroscope data. :param state: True/False, desired state """ assert self.device_status is not None, "Device status is not set" if self.device_status.gyroscope_streaming != state: self._toggle_gyroscope()
[docs] def set_heading(self, state): """ Enable/disable streaming of compass heading data. :param state: True/False, desired state """ assert self.device_status is not None, "Device status is not set" if self.device_status.heading_streaming != state: self._toggle_heading()
def _toggle_quaternion(self): """ Toggles quaternion output from the VMU931 device. """ self._send_message("varq") def _toggle_euler(self): """ Toggles quaternion output from the VMU931 device. """ self._send_message("vare") def _toggle_heading(self): """ Toggles heading output from the VMU931 device. """ self._send_message("varh") def _toggle_accelerometer(self): """ Toggles accelerometer output from the VMU931 device. """ self._send_message("vara") def _toggle_gyroscope(self): """ Toggles gyroscope output from the VMU931 device. """ self._send_message("varg") def _toggle_magnetometer(self): """ Toggles magnetometer output from the VMU931 device. """ self._send_message("varc")
[docs] def set_gyroscope_resolution(self, resolution): """ Sets the gyroscope output resolution of the VMU931 device. :param resolution: 250, 500, 1000 or 2000. """ assert resolution in (250, 500, 1000, 2000), "Invalid gyroscope resolution, must be 250, 500, 1000 or 2000" mapping = {250: 0, 500: 1, 1000: 2, 2000: 3} command = "var{}".format(mapping[resolution]) self._send_message(command)
[docs] def set_accelerometer_resolution(self, resolution): """ Sets the accelerometer output resolution of the VMU931 device. :param resolution: 2, 4, 8 or 16. """ assert resolution in (2, 4, 8, 16), "Invalid accelerometer resolution, must be 2, 4, 8 or 18" mapping = {2: 4, 4: 5, 8: 6, 16: 7} command = "var{}".format(mapping[resolution]) self._send_message(command)
def _send_message(self, message, update_status=True): """ Sends a message to the VMU931 device, with 5ms delay between each character. :param message: Message to send to device :param update_status: Update sensor status after message send (defaults to True) """ byte_message = message.encode('ascii') # bytes must be sent with 1ms+ interval to be recognised by device. for c in byte_message: bs = bytes([c]) self.ser.write(bs) logging.debug("Sent {}".format(bs)) time.sleep(0.01) time.sleep(0.05) if update_status: self.request_status() time.sleep(0.100)
[docs] def request_status(self): """ Request a new status packet from the VMU931 """ # We don't want to update the status again after sending the message, otherwise we'd be in an infinite loop. logging.info("Requesting status update") self._send_message("vars", update_status=False)
[docs] def calibrate(self): """ Calibrate the VMU931 (be sure to lay the device on a flat surface, with Z axis pointing upward) """ # We don't want to update the status again after sending the message, otherwise we'd be in an infinite loop. logging.info("Requesting calibration...") self._send_message("varl", update_status=False)
[docs] def parse(self, callback=None): """ Parses a single packet from the VMU931 device, returning a namedtuple. Typically called multiple times from within a loop. If device status is currently known, we wait for an incoming status packet and parse it. This method will block until status is received (so that we're in a known state). This should never happen outside of the automatic call to parse() made during initialisation. When a status packet is received, self.device_status is updated to represent the new state. If a callback method is specified (through the `callback` argument) when calling parse(), that method will be called when the packet is parsed. :param callback: Method to call after processing each packet :return: processed packet """ # If we don't know the current device status, request it if self.device_status is None: self.request_status() # Loop until we get a status packet. Will normally only loop once per call. while True: # Find start of data message -- we might start processing data mid-stream so need to synchronise. # We are looking for the magic byte 0x01. There's a chance that this will be randomly encountered, but # we also check the footer value. message_start = self.ser.read()[0] while message_start != 0x01: logging.debug("Skipping invalid message_start, got {} expected 0x01".format(hex(message_start))) message_start = self.ser.read()[0] continue message_size = self.ser.read()[0] - 4 # Unsure why we have to subtract 4bytes from this... but we do. logging.debug("Message size: {}".format(message_size)) message_type = chr(self.ser.read()[0]) logging.debug("Message type: {}".format(message_type)) message_text = self.ser.read(message_size) message_end = self.ser.read()[0] # If we have an invalid footer, skip this packet, otherwise continue. if message_end != 0x04: logging.warning( "Invalid Message footer (was {}, expected 0x04), skipping this packet".format(message_end)) else: data = None if message_type == 'e': logging.info("Parsing Euler") data = VMU931Parser._parse_euler(message_text) elif message_type == 'q': logging.info("Parsing Quaternion") data = VMU931Parser._parse_quaternion(message_text) elif message_type == 'h': logging.info("Parsing Heading") data = VMU931Parser._parse_heading(message_text) elif message_type == 'a': logging.info("Parsing Accelerometer") data = VMU931Parser._parse_accelerometer(message_text) elif message_type == 'g': logging.info("Parsing Gyroscope") data = VMU931Parser._parse_gyroscope(message_text) elif message_type == 'c': logging.info("Parsing Magnetometer") data = VMU931Parser._parse_magnetometer(message_text) elif message_type == 's': logging.info("Parsing status message") data = VMU931Parser._parse_status(message_text) self.device_status = data else: logging.warning("No parser for {}".format(message_type)) if self.device_status is not None: if callback is not None and data is not None: callback(data) return data
@staticmethod def _parse_status(data): """ Parse the contents of a status message according to the VMU931 User Guide (http://variense.com/Docs/VMU931/VMU931_UserGuide.pdf) :param data: Bytes to process :return: Device Status """ status, res, low_output, data = struct.unpack(">BBBI", data[:7]) mag_status = status & 0b00000100 != 0 gyro_status = status & 0b00000010 != 0 acc_status = status & 0b00000001 != 0 gyro_res = None if res & 0b10000000 != 0: gyro_res = 2000 elif res & 0b01000000 != 0: gyro_res = 1000 elif res & 0b00100000 != 0: gyro_res = 500 elif res & 0b00010000 != 0: gyro_res = 250 acc_res = None if res & 0b00001000 != 0: acc_res = 16 elif res & 0b000000100 != 0: acc_res = 8 elif res & 0b00000010 != 0: acc_res = 4 elif res & 0b00000001 != 0: acc_res = 2 low_output_rate = low_output & 0b00000001 != 0 heading_streaming = data & 0b01000000 != 0 euler_streaming = data & 0b00010000 != 0 mag_streaming = data & 0b00001000 != 0 quat_streaming = data & 0b00000100 != 0 gyro_streaming = data & 0b00000010 != 0 acc_streaming = data & 0b00000001 != 0 return messages.Status( magnetometer_enabled=mag_status, gyroscope_enabled=gyro_status, accelerometer_enabled=acc_status, gyroscope_resolution=gyro_res, accelerometer_resolution=acc_res, low_output_rate=low_output_rate, heading_streaming=heading_streaming, euler_streaming=euler_streaming, magnetometer_streaming=mag_streaming, quaternions_streaming=quat_streaming, gyroscope_streaming=gyro_streaming, accelerometer_streaming=acc_streaming ) @staticmethod def _parse_quaternion(data): """ Parse a quaternion data packet :param data: Bytes to parse :return: Parsed quaternion packet """ ts, w, x, y, z = struct.unpack(">Iffff", data[:20]) return messages.Quaternion(timestamp=ts, w=w, x=x, y=y, z=z) @staticmethod def _parse_euler(data): """ Parse a euler angle data packet :param data: Bytes to parse :return: Parsed euler angle packet """ ts, x, y, z = struct.unpack(">Ifff", data[:16]) return messages.Euler(timestamp=ts, x=x, y=y, z=z) @staticmethod def _parse_accelerometer(data): """ Parse a euler angle data packet :param data: Bytes to parse :return: Parsed euler angle packet """ ts, x, y, z = struct.unpack(">Ifff", data[:16]) return messages.Accelerometer(timestamp=ts, x=x, y=y, z=z) @staticmethod def _parse_magnetometer(data): """ Parse a magnetometer data packet :param data: Bytes to parse :return: Parsed magnetometer packet """ ts, x, y, z = struct.unpack(">Ifff", data[:16]) return messages.Magnetometer(timestamp=ts, x=x, y=y, z=z) @staticmethod def _parse_gyroscope(data): """ Parse a gyroscope data packet :param data: Bytes to parse :return: Parsed gyroscope packet """ ts, x, y, z = struct.unpack(">Ifff", data[:16]) return messages.Gyroscope(timestamp=ts, x=x, y=y, z=z) @staticmethod def _parse_heading(data): """ Parse a compass heading data packet :param data: Bytes to parse :return: Parsed compass heading packet """ ts, h = struct.unpack(">If", data[:8]) return messages.Heading(timestamp=ts, h=h)