#!/usr/bin/env python3 # -*- coding: utf-8 -*- """robot.py: class up your robot arm.""" __author__ = "Kevin Matz" __copyright__ = "Copyright 2019, Company 235, LLC" __credits__ = ["Kevin Matz"] __license__ = "MIT" __version__ = "0.1" __maintainer__ = "Kevin Matz" __email__ = "kevin@company235.com" __status__ = "Prototype" import logging import configparser import serial import threading logger = logging.getLogger('__main__') class Robot: def __init__(self): self.serial_port = None self.config = None self.preset = None self.serial_read_thread = None self.angle = [] self.target = [] self.buffer = bytes() def move(self): hexvals = [hex(dec).lstrip("0x").rstrip("L") for dec in self.target] for index, hexval in enumerate(hexvals): if len(hexvals[index]) < 2: hexvals[index] = "0" + hexval str = '{'+','.join(hexvals)+'}\n' str = str.lower().encode() try: self.serial_port.write(str) except serial.serialutil.SerialTimeoutException: pass def isMoving(self): return self.angle != self.target @staticmethod def read_preset(str): if str.startswith('{') and str.endswith('}'): str = str.lstrip('{').rstrip('}') else: return None # malformed packet data = str.split(',') return [int(val) for val in data] def serial_read_handler(self, data): data = data.strip() if data.startswith(b'{') and data.endswith(b'}'): data = data.lstrip(b'{').rstrip(b'}') else: return # malformed packet data = data.split(b',') try: self.angle = [int(hex, 16) for hex in data] except ValueError: return # malformed packet def serial_read_worker(self): while True: try: self.buffer = self.buffer + self.serial_port.readline() except serial.serialutil.SerialException as e: logger.error(e) break if b'\n' in self.buffer: list = self.buffer.split(b'\n', 1) self.buffer = list[1] self.serial_read_handler(list[0]) def start(self): # Start a serial read thread, if the serial port can be opened. try: self.serial_port = serial.Serial(self.config.get("port", None), self.config.get("baud", 9600), timeout=0) self.serial_read_thread = threading.Thread( target=self.serial_read_worker, daemon=True) self.serial_read_thread.start() except NameError: logger.error("ERROR: configuration hasn't been set!") raise except serial.serialutil.SerialException: logger.error("ERROR: Unable to start serial port.")