1
0
Fork 0
armCtl/robot.py

99 lines
3.0 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""robot.py: class up your robot arm."""
__author__ = "Kevin Matz"
__copyright__ = "Copyright 2019, Company 235, LLC"
__credits__ = ["Kevin Matz"]
__license__ = "MIT"
__version__ = "0.1"
__maintainer__ = "Kevin Matz"
__email__ = "kevin@company235.com"
__status__ = "Prototype"
import logging
import configparser
import serial
import threading
logger = logging.getLogger('__main__')
class Robot:
def __init__(self):
self.serial_port = None
self.config = None
self.preset = None
self.serial_read_thread = None
self.angle = []
self.target = []
self.buffer = bytes()
def move(self):
hexvals = [hex(dec).lstrip("0x").rstrip("L") for dec in self.target]
for index, hexval in enumerate(hexvals):
if len(hexvals[index]) < 2:
hexvals[index] = "0" + hexval
str = '{'+','.join(hexvals)+'}\n'
str = str.lower().encode()
try:
self.serial_port.write(str)
except serial.serialutil.SerialTimeoutException:
pass
def isMoving(self):
return self.angle != self.target
@staticmethod
def read_preset(str):
if str.startswith('{') and str.endswith('}'):
str = str.lstrip('{').rstrip('}')
else:
return None # malformed packet
data = str.split(',')
return [int(val) for val in data]
def serial_read_handler(self, data):
data = data.strip()
if data.startswith(b'{') and data.endswith(b'}'):
data = data.lstrip(b'{').rstrip(b'}')
else:
return # malformed packet
data = data.split(b',')
try:
self.angle = [int(hex, 16) for hex in data]
except ValueError:
return # malformed packet
def serial_read_worker(self):
while True:
try:
self.buffer = self.buffer + self.serial_port.readline()
except serial.serialutil.SerialException as e:
logger.error(e)
break
if b'\n' in self.buffer:
list = self.buffer.split(b'\n', 1)
self.buffer = list[1]
self.serial_read_handler(list[0])
def start(self):
# Start a serial read thread, if the serial port can be opened.
try:
self.serial_port = serial.Serial(self.config.get("port", None),
self.config.get("baud", 9600),
timeout=0)
self.serial_read_thread = threading.Thread(
target=self.serial_read_worker,
daemon=True)
self.serial_read_thread.start()
except NameError:
logger.error("ERROR: configuration hasn't been set!")
raise
except serial.serialutil.SerialException:
logger.error("ERROR: Unable to start serial port.")