move hog class to submodule
This commit is contained in:
parent
56ea58d7f8
commit
9532b1c003
|
@ -2,12 +2,12 @@
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from pythonosc import osc_message_builder
|
|
||||||
from time import sleep
|
|
||||||
|
|
||||||
from .commentmacro.CommentMacroParser import CommentMacroParser
|
from .commentmacro.CommentMacroParser import CommentMacroParser
|
||||||
from .commentmacro.CommentMacroListener import CommentMacroListener
|
from .commentmacro.CommentMacroListener import CommentMacroListener
|
||||||
|
|
||||||
|
from .hog4 import HogNet
|
||||||
|
|
||||||
|
|
||||||
# https://raw.githubusercontent.com/jszheng/py3antlr4book/master/bin/pygrun
|
# https://raw.githubusercontent.com/jszheng/py3antlr4book/master/bin/pygrun
|
||||||
# this is a python version of TestRig
|
# this is a python version of TestRig
|
||||||
|
@ -30,49 +30,9 @@ def beautify_lisp_string(in_string):
|
||||||
return out_string
|
return out_string
|
||||||
|
|
||||||
|
|
||||||
class HogDevice():
|
|
||||||
# button state constants
|
|
||||||
buttonDOWN = 1
|
|
||||||
buttonUP = 0
|
|
||||||
|
|
||||||
def __init__(self, servers):
|
|
||||||
self.servers = servers
|
|
||||||
|
|
||||||
def button_press(self, device, path, delay=0.05):
|
|
||||||
self.send_message(device, path, HogDevice.buttonDOWN) # button down
|
|
||||||
sleep(delay)
|
|
||||||
self.send_message(device, path, HogDevice.buttonUP) # button up
|
|
||||||
|
|
||||||
# utility function to send simple messages with one argument
|
|
||||||
def send_message(self, device, path, arg):
|
|
||||||
msg = osc_message_builder.OscMessageBuilder(address=path)
|
|
||||||
msg.add_arg(arg)
|
|
||||||
self.send(device, msg.build())
|
|
||||||
|
|
||||||
# send python-osc messages
|
|
||||||
def send(self, device, msg):
|
|
||||||
if device is None:
|
|
||||||
osc = list(self.servers.values())[0] # first configured server
|
|
||||||
else:
|
|
||||||
if (device.nodeType().getText().lower() != 'h'):
|
|
||||||
logging.error("ERROR: Only Hog type devices are supported.")
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
osc = self.servers[device.number().value]
|
|
||||||
except KeyError:
|
|
||||||
logging.error("ERROR: Net# " + str(device.number().value) +
|
|
||||||
" not found.")
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
osc.send(msg)
|
|
||||||
except Exception as e:
|
|
||||||
logging.error(e)
|
|
||||||
|
|
||||||
|
|
||||||
class OscCommentMacroListener(CommentMacroListener):
|
class OscCommentMacroListener(CommentMacroListener):
|
||||||
def __init__(self, servers):
|
def __init__(self, servers):
|
||||||
self.osc = HogDevice(servers)
|
self.osc = HogNet(servers)
|
||||||
|
|
||||||
def exitWait(self, ctx: CommentMacroParser.WaitContext):
|
def exitWait(self, ctx: CommentMacroParser.WaitContext):
|
||||||
logging.info("Waiting " + str(ctx.number().value) + " seconds.")
|
logging.info("Waiting " + str(ctx.number().value) + " seconds.")
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
import logging
|
||||||
|
from pythonosc import osc_message_builder
|
||||||
|
from time import sleep
|
||||||
|
|
||||||
|
|
||||||
|
class HogNet():
|
||||||
|
# button state constants
|
||||||
|
buttonDOWN = 1
|
||||||
|
buttonUP = 0
|
||||||
|
|
||||||
|
def __init__(self, servers):
|
||||||
|
self.servers = servers
|
||||||
|
|
||||||
|
def button_press(self, device, path, delay=0.05):
|
||||||
|
self.send_message(device, path, self.buttonDOWN)
|
||||||
|
sleep(delay)
|
||||||
|
self.send_message(device, path, self.buttonUP)
|
||||||
|
|
||||||
|
# utility function to send simple messages with one argument
|
||||||
|
def send_message(self, device, path, arg):
|
||||||
|
msg = osc_message_builder.OscMessageBuilder(address=path)
|
||||||
|
msg.add_arg(arg)
|
||||||
|
self.send(device, msg.build())
|
||||||
|
|
||||||
|
# send python-osc messages
|
||||||
|
def send(self, device, msg):
|
||||||
|
if device is None:
|
||||||
|
osc = list(self.servers.values())[0] # first configured server
|
||||||
|
else:
|
||||||
|
if (device.nodeType().getText().lower() != 'h'):
|
||||||
|
logging.error("ERROR: Only Hog type devices are supported.")
|
||||||
|
return
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
osc = self.servers[device.number().value]
|
||||||
|
except KeyError:
|
||||||
|
logging.error("ERROR: Net# " + str(device.number().value) +
|
||||||
|
" not found.")
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
osc.send(msg)
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(e)
|
Loading…
Reference in New Issue