From 6a9ecef2cff478a008bdc52605470fb474710c9e Mon Sep 17 00:00:00 2001 From: Kevin Matz Date: Sun, 21 Oct 2018 00:30:21 -0400 Subject: [PATCH] support for net devices --- OscCommentMacroListener.py | 66 +++++++++++++++++++++++++++----------- comment.py | 16 +++------ 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/OscCommentMacroListener.py b/OscCommentMacroListener.py index cd8b574..caa155f 100644 --- a/OscCommentMacroListener.py +++ b/OscCommentMacroListener.py @@ -23,12 +23,6 @@ from pythonosc import udp_client from time import sleep -def button_press(server, path, delay=0.05): - server.send_message(path, 1) # button down - sleep(delay) - server.send_message(path, 0) # button up - - def num(string): try: num = int(string) @@ -45,7 +39,7 @@ def _master_go(self, ctx): if (len(ctx.master.targets) == 0): print("Main GO") - button_press(self.server.osc, "/hog/hardware/maingo") + self.button_press(ctx.device, "/hog/hardware/maingo") return 1 else: for i in ctx.master.targets: @@ -58,14 +52,14 @@ def _master_go(self, ctx): continue master = str(i) print("GO on master " + master) - button_press(self.server.osc, "/hog/hardware/go/" + master) + self.button_press(ctx.device, "/hog/hardware/go/" + master) return 1 def _master_halt(self, ctx): if (len(ctx.master.targets) == 0): print("Main HALT") - button_press(self.server.osc, "/hog/hardware/mainhalt") + self.button_press(ctx.device, "/hog/hardware/mainhalt") return 1 else: for i in ctx.master.targets: @@ -78,7 +72,7 @@ def _master_halt(self, ctx): continue master = str(i) print("HALT on master " + master) - button_press(self.server.osc, "/hog/hardware/pause/" + master) + self.button_press(ctx.device, "/hog/hardware/pause/" + master) return 1 @@ -102,7 +96,7 @@ def _master_fade(self, ctx): master = str(i) print("Fade Master "+master+" to "+str(level)+"%") level *= 255 / 100 # percent in Macro, 0>255 in OSC - self.server.osc.send_message("/hog/hardware/fader/"+master, level) + self.send_message(ctx.device, "/hog/hardware/fader/"+master, level) return 1 @@ -113,7 +107,7 @@ def _master_fade_grand(self, ctx): return -1 print("Fading Grand Master to " + str(level) + "%") level *= 255 / 100 # percent in Macro, 0>255 in OSC - self.server.osc.send_message("/hog/hardware/fader/0", level) + self.send_message(ctx.device, "/hog/hardware/fader/0", level) return 1 @@ -123,7 +117,7 @@ def _master_choose(self, ctx): return -1 master = str(ctx.number.value) print("Choose Master " + master) - button_press(self.server.osc, "/hog/hardware/choose/" + master) + self.button_press(ctx.device, "/hog/hardware/choose/" + master) return 1 @@ -133,42 +127,42 @@ def _list_go(self, ctx): if ctx.number.value is not None: list += "." + str(ctx.number.value) print("Go on List " + list) - self.server.osc.send_message("/hog/playback/go/0", list) + self.send_message(ctx.device, "/hog/playback/go/0", list) return 1 def _list_halt(self, ctx): for i in ctx.targets: print("Halting List " + str(i)) - self.server.osc.send_message("/hog/playback/halt/0", i) + self.send_message(ctx.device, "/hog/playback/halt/0", i) return 1 def _list_release(self, ctx): for i in ctx.targets: print("Releasing List " + str(i)) - self.server.osc.send_message("/hog/playback/release/0", i) + self.send_message(ctx.device, "/hog/playback/release/0", i) return 1 def _scene_go(self, ctx): for i in ctx.targets: print("Go on Scene " + str(i)) - self.server.osc.send_message("/hog/playback/go/1", i) + self.send_message(ctx.device, "/hog/playback/go/1", i) return 1 def _scene_halt(self, ctx): for i in ctx.targets: print("Halt Scene " + str(i)) - self.server.osc.send_message("/hog/playback/halt/1", i) + self.send_message(ctx.device, "/hog/playback/halt/1", i) return 1 def _scene_release(self, ctx): for i in ctx.targets: print("Release Scene " + str(i)) - self.server.osc.send_message("/hog/playback/release/1", i) + self.send_message(ctx.device, "/hog/playback/release/1", i) return 1 @@ -213,10 +207,38 @@ class OscCommentMacroListener(CommentMacroListener): self.parser = parser self.server = server + def button_press(self, device, path, delay=0.05): + self.send_message(device, path, 1) # button down + sleep(delay) + self.send_message(device, path, 0) # button up + + def send_message(self, device, path, arg): + if device is None: + osc = list(self.server.values())[0] + else: + if (device.type.getText() != 'h'): + print("Only Hog type devices are curently supported.") + print("WARNIN: macro discarded!") + return -1 + else: + try: + osc = self.server[device.number.value] + except KeyError: + print("Net# "+str(device.number.value)+" not configured.") + print("WARNING: macro discarded!") + return -1 + osc.send_message(path, arg) + return 1 + + def exitDevice(self, ctx: CommentMacroParser.DeviceContext): + if isinstance(ctx.parentCtx, CommentMacroParser.MacroContext): + ctx.parentCtx.device = ctx + def enterMacro(self, ctx: CommentMacroParser.MacroContext): ctx.device = None ctx.number = None ctx.master = None + ctx.device = None ctx.targets = [] ctx.time = None @@ -248,6 +270,8 @@ class OscCommentMacroListener(CommentMacroListener): ctx.parentCtx.targets.append(ctx.value) if isinstance(ctx.parentCtx, CommentMacroParser.MacroContext): ctx.parentCtx.number = ctx + if isinstance(ctx.parentCtx, CommentMacroParser.DeviceContext): + ctx.parentCtx.number = ctx def exitSpan(self, ctx: CommentMacroParser.SpanContext): number1 = ctx.children[0].value @@ -268,3 +292,7 @@ class OscCommentMacroListener(CommentMacroListener): def exitTarget(self, ctx: CommentMacroParser.TargetContext): ctx.target = set(ctx.targets) # no duplicates ctx.parentCtx.targets.extend(ctx.targets) + + def exitNodeType(self, ctx: CommentMacroParser.NodeTypeContext): + if isinstance(ctx.parentCtx, CommentMacroParser.DeviceContext): + ctx.parentCtx.type = ctx diff --git a/comment.py b/comment.py index 650c3fe..e35bcfe 100755 --- a/comment.py +++ b/comment.py @@ -27,22 +27,14 @@ from OscCommentMacroListener import OscCommentMacroListener from pythonosc import udp_client -class hog4_osc: - 'A Hog 4 OSC device on the network' - def __init__(self, ip, port, net): - self.ip = ip - self.port = port - self.net = net - self.osc = udp_client.SimpleUDPClient(ip, port) - - # refactor this to support multiple net#s config = configparser.ConfigParser(allow_no_value=True) config.read('server.cfg') server = config['hog4'] -hog4 = hog4_osc(server.get("ip", "10.0.0.1"), - server.getint("port", 6600), - server.getint("net", 1)) +net = server.getint("net", 1) +hog4 = {} +hog4[net] = udp_client.SimpleUDPClient(server.get("ip", "10.0.0.1"), + server.getint("port", 6600)) def comment(text):