import sys import os import socket import threading import atexit import preserves.schema import time import re from syndicate import patterns as P, relay, turn, dataspace, Symbol, stringify from syndicate.actor import find_loop from syndicate.during import During try: schemas = preserves.schema.load_schema_file('/usr/share/synit/schemas/schema-bundle.prb') except: schemas = preserves.schema.load_schema_file('/home/tonyg/src/synit/protocols/schema-bundle.bin') network = schemas.network server_socket_dir = '/run/wpa_supplicant' class WPASupplicantClient: # https://w1.fi/wpa_supplicant/devel/ctrl_iface_page.html # # According to the code (!) any response starting with '<' is an event, not a reply. def __init__(self, interface, log): self.interface = interface self.log = log self.server_socket_path = os.path.join(server_socket_dir, interface) self.client_socket_path = f'/tmp/wpa_cli_py_{interface}_{os.getpid()}' self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) self.sock.bind(self.client_socket_path) self.sock.connect(self.server_socket_path) self.send('ATTACH') answer = self.recv(timeout_sec=3.0) if answer != 'OK': raise AssertionError(f'wpa_supplicant replied {repr(answer)} to ATTACH') self.cleanup = lambda: self.close() atexit.register(self.cleanup) def close(self): if self.cleanup: atexit.unregister(self.cleanup) self.cleanup = None self.send('DETACH') self.sock.close() os.unlink(self.client_socket_path) def send(self, command): self.sock.send(command.encode('utf-8')) def recv(self, timeout_sec=None): self.sock.settimeout(timeout_sec) try: reply = self.sock.recv(65536) self.log.debug(f'Raw input: {repr(reply)}') return reply.strip().decode('utf-8') except TimeoutError: return None finally: self.sock.settimeout(None) def gather_events(facet, callback, client, loop): facet.log.debug('Background socket read thread started') try: while True: facet.log.debug('waiting for event...') e = client.recv() # OMG python's incredibly broken mutable-variable-closures bit me AGAIN. I've had # to add a layer of gratuitous rebinding of `e` plus the `lambda:` below to make it # properly close over the current binding of `e`. def handler_for_specific_e(e): return lambda: callback(e) turn.external(facet, handler_for_specific_e(e), loop=loop) except Exception as e: facet.log.debug(e) finally: facet.log.debug('Background socket read thread terminated') @relay.service(name='wifi_daemon', debug=False) @During().add_handler def main(args): machine_ds = args[Symbol('machine')].embeddedValue ifname = args[Symbol('ifname')] client = WPASupplicantClient(ifname, turn.log) commands = [] def send_next_command(): while len(commands) > 0: turn.log.info(f'Sending {commands[0][0]}') client.send(commands[0][0]) if commands[0][1] is None: commands.pop(0) continue else: break def send_command(c, handler = lambda reply: None): turn.log.info(f'Scheduling {c}') commands.append((c, handler)) if len(commands) == 1: send_next_command() def cleanout_networks(nets): for line in nets.split('\n'): if line.startswith('network id'): continue turn.log.info(f'Cleaning out old network: {repr(line)}') send_command('REMOVE_NETWORK ' + line.split('\t')[0]) send_command('SCAN', handle_scan_failure) def handle_scan_failure(reply): if reply == 'FAIL': send_command('TERMINATE') send_command('LEVEL 3') send_command('LIST_NETWORKS', cleanout_networks) class NetworkState: def __init__(self, ssid, auth, machine_ds, facet): turn.log.info(f'Selected network ssid={repr(ssid)} auth={auth.VARIANT}') self.ssid = ssid self.auth = auth self.machine_ds = machine_ds self.facet = facet self.wanted = True self.network_id = None self.association_state = turn.field( initial_value=network.WifiAssociationState.inProgress(), name='association_state') turn.publish_dataflow(lambda: ( machine_ds, network.WifiAssociation(ifname, self.ssid, self.association_state.value))) send_command('ADD_NETWORK', self.on_network_created) def on_network_created(self, new_network_id): turn.log.info(f'Added network id={new_network_id}') self.network_id = new_network_id if self.wanted: send_command(f'SET_NETWORK {self.network_id} ssid {stringify(self.ssid)}') if self.auth.VARIANT == Symbol('psk'): send_command(f'SET_NETWORK {self.network_id} psk {stringify(self.auth.password)}') send_command(f'ENABLE_NETWORK {self.network_id}', self.on_network_enabled) else: send_command(f'REMOVE_NETWORK {self.network_id}') self.network_id = None def on_network_enabled(self, reply): turn.log.info(f'Network enabled: {repr(reply)}') def update_association_state(): self.association_state.value = network.WifiAssociationState.ready() turn.external(self.facet, update_association_state) def on_no_longer_wanted(self): self.wanted = False if self.network_id is not None: send_command(f'REMOVE_NETWORK {self.network_id}') self.network_id = None @dataspace.during(machine_ds, P.quote(network.SelectedWifiNetwork(ifname, P.uCAPTURE, P.uCAPTURE))) def handle_selected_network(ssid, auth): auth = network.WifiAuthentication.try_decode(auth) if auth is None: return state = NetworkState(ssid, auth, machine_ds, facet) turn.on_stop(state.on_no_longer_wanted) def handle_event(e): if e[0] == '<': # It's an event m = re.match(r'<(\d+)>(.*)', e) if m: (_level, message) = m.groups() if message == 'CTRL-EVENT-SCAN-RESULTS': send_command('SCAN_RESULTS') turn.log.info(f'Level {_level}: {message}') else: turn.log.info(f'Unusual event: {e}') else: if len(commands) > 0: turn.log.info(f'REPLY: {e}') commands[0][1](e) commands.pop(0) send_next_command() else: turn.log.warning(f'Unexpected reply: {e}') facet = turn.active_facet() loop = find_loop() turn.log.info('Starting background read thread') threading.Thread( name='background-wpa_supplicant-read-thread', target=lambda: gather_events(facet, handle_event, client, loop)).start()