#!/usr/bin/env python # WS server example that synchronizes state across clients import asyncio import json import logging import websockets import processor import random import time import state logging.basicConfig() USERS = set() def state_event(state): return json.dumps({ "type": "state", "payload": {**state} }) def options_event(): return json.dumps({ "type": "options", "payload": { **processor.get_options() } }) def users_event(): return json.dumps({"type": "users", "count": len(USERS)}) async def notify_state(old_state, new_state): if USERS: # asyncio.wait doesn't accept an empty list message = state_event(new_state) await asyncio.wait([user.send(message) for user in USERS]) async def notify_users(): if USERS: # asyncio.wait doesn't accept an empty list message = users_event() await asyncio.wait([user.send(message) for user in USERS]) async def register(websocket): USERS.add(websocket) await notify_users() async def unregister(websocket): USERS.remove(websocket) await notify_users() async def counter(websocket, path): # register(websocket) sends user_event() to websocket await register(websocket) try: await websocket.send(state_event(await state.get_data())) await websocket.send(options_event()) async for message in websocket: data = json.loads(message) if data["type"] == "mutation": await state.mutate(data["payload"]) else: logging.error("unsupported event: {}", data) finally: await unregister(websocket) async def serve(): state.register_callback(notify_state) await state.inc_initialized() start_server = websockets.serve(counter, "localhost", 6789) await start_server async def tick(): while True: logging.warning("tick") await asyncio.sleep(1) async def main(): random.seed(time.time()) await asyncio.gather( tick(), serve(), processor.main(["192.168.178.123", "192.168.178.124"]), ) asyncio.run(main())