Files
moodconsole/processor.py

174 lines
4.6 KiB
Python

import asyncio
import socket
import copy
import logging
from artnet import buildPacket
from effects import *
import state
AVAILABLE_EFFECTS = [
{
"function": black,
"params": None,
},
{
"function": rainbow_whole,
"params": None,
},
{
"function": rainbow_whole_offset_stat,
"params": None,
},
{
"function": rainbow_whole_offset_dyn,
"params": None,
},
{
"function": two_colors_red_blue,
"params": None,
},
{
"function": two_colors_red_red,
"params": None
},
{
"function": two_colors_cyan_blue,
"params": None
},
{
"function": two_colors_cyan_orange,
"params": None
},
{
"function": adjust_saturation,
"params": {
"factor": {
"type": "slider",
"min": 0,
"max": 1,
"default": 1,
"step": 0.01
}
}
},
{
"function": rgb,
"params": {
"red": {
"type": "slider",
"min": 0,
"max": 255,
"default": 128
},
"green": {
"type": "slider",
"min": 0,
"max": 255,
"default": 128
},
"blue": {
"type": "slider",
"min": 0,
"max": 255,
"default": 128
},
"index": {
"type": "slider",
"min": -1,
"max": 3,
"default": -1
}
}
}
]
effect_chain = [ { "function": rainbow_whole, "params": None }, { "function": black, "params": None } ]
framecount = 0
def function_str(func):
return str(func).split(" ")[1]
def serialize_effect(effect):
return {
"function": function_str(effect["function"]),
"params": effect["params"],
}
def serialize_chain(chain):
return [ serialize_effect(e) for e in chain ]
def get_options():
return {
"effects": {
"available": [ serialize_effect(e) for e in AVAILABLE_EFFECTS ]
}
}
def serialize_internal_state():
global effect_chain
return { "chain": serialize_chain(effect_chain) }
def deserialize_effect(effect_input):
effect = [ x for x in AVAILABLE_EFFECTS if function_str(x["function"]) == effect_input["function"] ][0]
if (effect_input["params"] == {} or effect_input["params"] is None) and effect["params"] is not None:
# assign default params
params = { k: v["default"] for k,v in effect["params"].items() }
else:
params = effect_input["params"]
return {
"function": effect["function"],
"params": params
}
def deserialize_chain(chain):
return [ deserialize_effect(effect) for effect in chain ]
def set_immidiate(framecount, channels, update, params):
for inx, (current, target) in enumerate(zip(channels["current"], channels["target"])):
if params is None or params == {}:
update(framecount, inx, target)
else:
update(framecount, inx, target, params)
for i in range (0,3):
current[i] = target[i]
async def handle_state_change(old, new):
global effect_chain
global framecount
deserialized = deserialize_chain(new["chain"])
effect_chain = deserialized
serialized = serialize_internal_state()
await state.mutate(serialized)
async def main(target_ips):
global effect_chain
global framecount
channels = { "target": [], "current": [] }
for x in range(0,4):
channels["target"].append( [0,0,0] )
channels["current"].append( [0,0,0] )
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
debug = False
current_state = state.get_data()
state.register_on_input_callback(handle_state_change)
await state.mutate(serialize_internal_state())
await state.inc_initialized()
while True:
old_channels = copy.deepcopy(channels["current"])
for effect in effect_chain:
#print("effect: {}".format(effect["function"]))
set_immidiate(framecount, channels, effect["function"], effect["params"])
if old_channels != channels["current"] or framecount == 0:
if debug:
print("frame:{} channels:{}".format(framecount, channels))
channels_transmit = channels["current"]
for ip in target_ips:
sock.sendto(buildPacket(1, 255, channels_transmit), (ip, 6454))
await asyncio.sleep(0.02)
framecount+=1
sock.close()