import asyncio import socket import copy import logging from artnet import buildPacket from effects import * import state AVAILABLE_EFFECTS = [ { "function": black, "params": None, }, { "function": rainbow_whole, "params": None, }, { "function": rainbow_whole_offset_stat, "params": None, }, { "function": rainbow_whole_offset_dyn, "params": None, }, { "function": two_colors_red_blue, "params": None, }, { "function": two_colors_red_red, "params": None }, { "function": two_colors_cyan_blue, "params": None }, { "function": two_colors_cyan_orange, "params": None }, { "function": adjust_saturation, "params": { "factor": { "type": "slider", "min": 0, "max": 1, "default": 1, "step": 0.01 } } }, { "function": rgb, "params": { "red": { "type": "slider", "min": 0, "max": 255, "default": 128 }, "green": { "type": "slider", "min": 0, "max": 255, "default": 128 }, "blue": { "type": "slider", "min": 0, "max": 255, "default": 128 }, "index": { "type": "slider", "min": -1, "max": 3, "default": -1 } } } ] effect_chain = [ { "function": rainbow_whole, "params": None }, { "function": black, "params": None } ] framecount = 0 def function_str(func): return str(func).split(" ")[1] def serialize_effect(effect): return { "function": function_str(effect["function"]), "params": effect["params"], } def serialize_chain(chain): return [ serialize_effect(e) for e in chain ] def get_options(): return { "effects": { "available": [ serialize_effect(e) for e in AVAILABLE_EFFECTS ] } } def serialize_internal_state(): global effect_chain return { "chain": serialize_chain(effect_chain) } def deserialize_effect(effect_input): effect = [ x for x in AVAILABLE_EFFECTS if function_str(x["function"]) == effect_input["function"] ][0] if (effect_input["params"] == {} or effect_input["params"] is None) and effect["params"] is not None: # assign default params params = { k: v["default"] for k,v in effect["params"].items() } else: params = effect_input["params"] return { "function": effect["function"], "params": params } def deserialize_chain(chain): return [ deserialize_effect(effect) for effect in chain ] def set_immidiate(framecount, channels, update, params): for inx, (current, target) in enumerate(zip(channels["current"], channels["target"])): if params is None or params == {}: update(framecount, inx, target) else: update(framecount, inx, target, params) for i in range (0,3): current[i] = target[i] async def handle_state_change(old, new): global effect_chain global framecount effect_chain = deserialize_chain(new["chain"]) await state.mutate(serialize_internal_state()) async def main(target_ips): global effect_chain global framecount channels = { "target": [], "current": [] } for x in range(0,4): channels["target"].append( [0,0,0] ) channels["current"].append( [0,0,0] ) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) debug = False current_state = state.get_data() state.register_on_input_callback(handle_state_change) await state.mutate(serialize_internal_state()) await state.inc_initialized() while True: old_channels = copy.deepcopy(channels["current"]) for effect in effect_chain: #print("effect: {}".format(effect["function"])) set_immidiate(framecount, channels, effect["function"], effect["params"]) if old_channels != channels["current"] or framecount == 0: if debug: print("frame:{} channels:{}".format(framecount, channels)) channels_transmit = channels["current"] for ip in target_ips: sock.sendto(buildPacket(1, 255, channels_transmit), (ip, 6454)) await asyncio.sleep(0.02) framecount+=1 sock.close()