1
0
Fork 0
baconscript/OscCommentMacroListener.py

279 lines
11 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""OscCommentMacroListener.py: Hog 4 comment macro antlr4 listener for OSC."""
__author__ = "Kevin Matz"
__copyright__ = "Copyright 2018, Company 235, LLC"
__credits__ = ["Kevin Matz"]
__license__ = "MIT"
__version__ = "3.9"
__maintainer__ = "Kevin Matz"
__email__ = "kevin@company235.com"
__status__ = "Prototype"
import logging
from .CommentMacroParser import CommentMacroParser
from .CommentMacroListener import CommentMacroListener
from pythonosc import osc_message_builder
from time import sleep
logger = logging.getLogger('__main__')
# https://raw.githubusercontent.com/jszheng/py3antlr4book/master/bin/pygrun
# this is a python version of TestRig
def beautify_lisp_string(in_string):
__author__ = 'jszheng'
indent_size = 2
add_indent = ' ' * indent_size
out_string = in_string[0] # no indent for 1st (
indent = ''
for i in range(1, len(in_string) - 1):
if in_string[i] == '(' and in_string[i + 1] != ' ':
indent += add_indent
out_string += "\n" + indent + '('
elif in_string[i] == ')':
out_string += ')'
if len(indent) > 0:
indent = indent.replace(add_indent, '', 1)
else:
out_string += in_string[i]
return out_string
class HogDevice():
# button state constants
buttonDOWN = 1
buttonUP = 0
def __init__(self, servers):
self.servers = servers
def button_press(self, device, path, delay=0.05):
self.send_message(device, path, HogDevice.buttonDOWN) # button down
sleep(delay)
self.send_message(device, path, HogDevice.buttonUP) # button up
# utility function to send simple messages with one argument
def send_message(self, device, path, arg):
msg = osc_message_builder.OscMessageBuilder(address=path)
msg.add_arg(arg)
self.send(device, msg.build())
# send python-osc messages
def send(self, device, msg):
if device is None:
osc = list(self.servers.values())[0] # first configured server
else:
if (device.nodeType().getText().lower() != 'h'):
logger.error("ERROR: Only Hog type devices are supported.")
return
else:
try:
osc = self.servers[device.number().value]
except KeyError:
logger.error("ERROR: Net# " + str(device.number().value) +
" not found.")
return
try:
osc.send(msg)
except Exception as e:
logger.error(e)
class OscCommentMacroListener(CommentMacroListener):
def __init__(self, servers):
self.osc = HogDevice(servers)
def exitWait(self, ctx: CommentMacroParser.WaitContext):
logger.info("Waiting " + str(ctx.number().value) + " seconds.")
sleep(ctx.number().value)
def enterStatement(self, ctx: CommentMacroParser.StatementContext):
# print the lisp tree of this macro
lisp_tree_str = ctx.toStringTree(recog=ctx.parser)
logger.debug(beautify_lisp_string(lisp_tree_str))
def enterTarget(self, ctx: CommentMacroParser.TargetContext):
ctx.targets = []
def exitTarget(self, ctx: CommentMacroParser.TargetContext):
ctx.targets = set(ctx.targets)
if isinstance(ctx.parentCtx, CommentMacroParser.TargetContext):
ctx.parentCtx.targets.extend(ctx.targets) # add to parent targets
def exitNumber(self, ctx: CommentMacroParser.NumberContext):
try:
ctx.value = int(ctx.getText())
except ValueError:
ctx.value = float(ctx.getText())
if isinstance(ctx.parentCtx, CommentMacroParser.TargetContext):
ctx.parentCtx.targets.append(ctx.value)
def exitSpan(self, ctx: CommentMacroParser.SpanContext):
lower = min(ctx.n1.value, ctx.n2.value)
upper = max(ctx.n1.value, ctx.n2.value)
while lower <= upper:
ctx.parentCtx.targets.append(lower)
lower += 1
def exitMasterGo(self, ctx: CommentMacroParser.MasterGoContext):
if ctx.target() is None:
logger.info("Main GO")
self.osc.button_press(ctx.device(), "/hog/hardware/maingo")
return
for i in ctx.target().targets:
if isinstance(i, int) is not True:
logger.warn("GO MASTER macro targets must be intigers. "
+ str(i) + " is not an intigers.")
continue
if (i < 0):
logger.warn("Master "+str(i)+" is not greater than 0.")
continue
master = str(i)
logger.info("GO on master " + master)
self.osc.button_press(ctx.device(), "/hog/hardware/go/" + master)
def exitMasterHalt(self, ctx: CommentMacroParser.MasterHaltContext):
if ctx.target() is None:
logger.info("Main HALT")
self.osc.button_press(ctx.device(), "/hog/hardware/mainhalt")
return
for i in ctx.target().targets:
if isinstance(i, int) is not True:
logger.warn("GO MASTER macro targets must be intigers. "
+ str(i) + " is not an intigers.")
continue
if (i < 0):
logger.warn("Master "+str(i)+" is not greater than 0.")
continue
master = str(i)
logger.info("HALT on master " + master)
self.osc.button_press(ctx.device(), "/hog/hardware/pause/"+master)
def exitMasterAssert(self, ctx: CommentMacroParser.MasterAssertContext):
if ctx.target()is not None:
logger.error("ERROR: limited to asserting current master only.")
return
logger.info("ASSERT on current master.")
self.osc.button_press(ctx.device(), "/hog/hardware/assert")
def exitMasterRelease(self, ctx: CommentMacroParser.MasterReleaseContext):
if ctx.target() is not None:
logger.error("ERROR: limited to releasing current master only.")
return
logger.info("RELEASE on current master.")
self.osc.button_press(ctx.device(), "/hog/hardware/release")
def exitMasterFade(self, ctx: CommentMacroParser.MasterFadeContext):
if ctx.target()is None:
logger.error("ERROR: limited to fading specified masters only.")
return
level = ctx.number().value
if (level < 0 or level > 100):
logger.error("Level must be between 0 and 100.")
return
for i in ctx.target().targets:
if isinstance(i, int) is not True:
logger.warn("FADE MASTER macro targets must be intigers. "
+ str(i) + " is not an intigers.")
continue
if (i < 0):
logger.warn("Master "+str(i)+" is not greater than 0.")
continue
master = str(i)
logger.info("Fade Master "+master+" to "+str(level)+"%")
level *= 255 / 100 # percent in Macro, 0>255 in OSC
self.osc.send_message(ctx.device(),
"/hog/hardware/fader/" + master,
level)
def exitFadeGrandMaster(self,
ctx: CommentMacroParser.FadeGrandMasterContext):
level = ctx.number().value
if (level < 0 or level > 100):
logger.error("Level must be between 0 and 100.")
return
logger.info("Fading Grand Master to " + str(level) + "%")
level *= 255 / 100 # percent in Macro, 0>255 in OSC
self.osc.send_message(ctx.device(), "/hog/hardware/fader/0", level)
def exitMasterChoose(self, ctx: CommentMacroParser.MasterChooseContext):
if (ctx.number().value < 0):
logger.error("Master must be greater than 0.")
return
master = str(ctx.number().value)
logger.info("Choose Master " + master)
self.osc.button_press(ctx.device(), "/hog/hardware/choose/" + master)
def exitReleaseAll(self, ctx: CommentMacroParser.ReleaseAllContext):
logger.info("Release All")
self.osc.send_message(ctx.device(), "/hog/hardware/pig",
self.osc.buttonDOWN)
self.osc.button_press(ctx.device(), "/hog/hardware/release")
self.osc.send_message(ctx.device(), "/hog/hardware/pig",
self.osc.buttonUP)
def exitListGo(self, ctx: CommentMacroParser.ListGoContext):
for i in ctx.target().targets:
logger.info("Go on List " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/go/0", i)
def exitListGoto(self, ctx: CommentMacroParser.ListGotoContext):
for i in ctx.target().targets:
list = str(i) + "." + str(ctx.number().value)
logger.info("Go on List " + str(list))
self.osc.send_message(ctx.device(), "/hog/playback/go/0", list)
def exitListHalt(self, ctx: CommentMacroParser.ListHaltContext):
for i in ctx.target().targets:
logger.info("Halting List " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/halt/0", i)
def exitListRelese(self, ctx: CommentMacroParser.ListReleseContext):
for i in ctx.target().targets:
logger.info("Releasing List " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/release/0", i)
def exitSceneGo(self, ctx: CommentMacroParser.SceneGoContext):
for i in ctx.target().targets:
logger.info("Go on Scene " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/go/1", i)
def exitSceneHalt(self, ctx: CommentMacroParser.SceneHaltContext):
for i in ctx.target().targets:
logger.info("Halt Scene " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/halt/1", i)
def exitSceneRelease(self, ctx: CommentMacroParser.SceneReleaseContext):
for i in ctx.target().targets:
logger.info("Release Scene " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/release/1", i)
def exitMacroGo(self, ctx: CommentMacroParser.MacroGoContext):
for i in ctx.target().targets:
logger.info("Go on Macro " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/go/2", i)
def exitMacroHalt(self, ctx: CommentMacroParser.MacroHaltContext):
for i in ctx.target().targets:
logger.info("Pause Macro " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/halt/2", i)
def exitMacroStop(self, ctx: CommentMacroParser.MacroStopContext):
for i in ctx.target().targets:
logger.info("Stop Macro " + str(i))
self.osc.send_message(ctx.device(), "/hog/playback/release/2", i)
def exitPageNext(self, ctx: CommentMacroParser.PageNextContext):
logger.info("Next Page")
self.osc.button_press(ctx.device(), "/hog/hardware/nextpage")
def exitPagePrev(self, ctx: CommentMacroParser.PagePrevContext):
logger.info("Prev Page")
self.osc.button_press(ctx.device(), "/hog/hardware/backpage")