#!/usr/bin/env python3 # -*- coding: utf-8 -*- """robot.py: class up your robot arm.""" __author__ = "Kevin Matz" __copyright__ = "Copyright 2019, Company 235, LLC" __credits__ = ["Kevin Matz"] __license__ = "MIT" __version__ = "0.1" __maintainer__ = "Kevin Matz" __email__ = "kevin@company235.com" __status__ = "Prototype" import logging import configparser import serial import threading logger = logging.getLogger('__main__') class Robot: def __init__(self): self.serial_port = None self.config = None self.serial_read_thread = None self.angle = [] self.target = [] self.buffer = bytes() def move(self, coords=None): if coords is not None: self.target = coords hex = [hex(dec).lstrip("0x").rstrip("L") for dec in self.target] str = '{'+','.join(hex)+'}' str = str.lower() try: self.serial_port.write(str) except serial.serialutil.SerialTimeoutException: pass def isMoving(self): if len(self.angle) != len(self.target): values = [] for i in range(len(self.angle)): try: values.append(self.target[i]) except Exception as e: values.append(0) return self.angle != values return self.angle != self.target def serial_read_handler(self, data): data = data.strip() if data.startswith(b'{') and data.endswith(b'}'): data = data.lstrip(b'{').rstrip(b'}') else: return # malformed packet data = data.split(b',') try: self.angle = [int(hex, 16) for hex in data] except ValueError: return # malformed packet def serial_read_worker(self): while True: self.buffer = self.buffer + self.serial_port.readline() if b'\n' in self.buffer: list = self.buffer.split(b'\n', 1) self.buffer = list[1] self.serial_read_handler(list[0]) def start(self): # Start a serial read thread, if the serial port can be opened. try: self.serial_port = serial.Serial(self.config.get("port", None), self.config.get("baud", 9600), timeout=0) self.serial_read_thread = threading.Thread( target=self.serial_read_worker, daemon=True) self.serial_read_thread.start() except NameError: logger.error("ERROR: configuration hasn't been set!") raise except serial.serialutil.SerialException: logger.error("ERROR: Unable to start serial port.")