You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

198 lines
7.2 KiB

import sys
import os
import socket
import threading
import atexit
import preserves.schema
import time
import re
from syndicate import patterns as P, relay, turn, dataspace, Symbol, stringify
from syndicate.actor import find_loop
from syndicate.during import During
try:
schemas = preserves.schema.load_schema_file('/usr/share/synit/schemas/schema-bundle.prb')
except:
schemas = preserves.schema.load_schema_file('/home/tonyg/src/synit/protocols/schema-bundle.bin')
network = schemas.network
server_socket_dir = '/run/wpa_supplicant'
class WPASupplicantClient:
# https://w1.fi/wpa_supplicant/devel/ctrl_iface_page.html
#
# According to the code (!) any response starting with '<' is an event, not a reply.
def __init__(self, interface, log):
self.interface = interface
self.log = log
self.server_socket_path = os.path.join(server_socket_dir, interface)
self.client_socket_path = f'/tmp/wpa_cli_py_{interface}_{os.getpid()}'
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.sock.bind(self.client_socket_path)
self.sock.connect(self.server_socket_path)
self.send('ATTACH')
answer = self.recv(timeout_sec=3.0)
if answer != 'OK':
raise AssertionError(f'wpa_supplicant replied {repr(answer)} to ATTACH')
self.cleanup = lambda: self.close()
atexit.register(self.cleanup)
def close(self):
if self.cleanup:
atexit.unregister(self.cleanup)
self.cleanup = None
self.send('DETACH')
self.sock.close()
os.unlink(self.client_socket_path)
def send(self, command):
self.sock.send(command.encode('utf-8'))
def recv(self, timeout_sec=None):
self.sock.settimeout(timeout_sec)
try:
reply = self.sock.recv(65536)
self.log.debug(f'Raw input: {repr(reply)}')
return reply.strip().decode('utf-8')
except TimeoutError:
return None
finally:
self.sock.settimeout(None)
def gather_events(facet, callback, client, loop):
facet.log.debug('Background socket read thread started')
try:
while True:
facet.log.debug('waiting for event...')
e = client.recv()
# OMG python's incredibly broken mutable-variable-closures bit me AGAIN. I've had
# to add a layer of gratuitous rebinding of `e` plus the `lambda:` below to make it
# properly close over the current binding of `e`.
def handler_for_specific_e(e):
return lambda: callback(e)
turn.external(facet, handler_for_specific_e(e), loop=loop)
except Exception as e:
facet.log.debug(e)
finally:
facet.log.debug('Background socket read thread terminated')
@relay.service(name='wifi_daemon', debug=False)
@During().add_handler
def main(args):
machine_ds = args[Symbol('machine')].embeddedValue
ifname = args[Symbol('ifname')]
client = WPASupplicantClient(ifname, turn.log)
commands = []
def send_next_command():
while len(commands) > 0:
turn.log.info(f'Sending {commands[0][0]}')
client.send(commands[0][0])
if commands[0][1] is None:
commands.pop(0)
continue
else:
break
def send_command(c, handler = lambda reply: None):
turn.log.info(f'Scheduling {c}')
commands.append((c, handler))
if len(commands) == 1:
send_next_command()
def cleanout_networks(nets):
for line in nets.split('\n'):
if line.startswith('network id'):
continue
turn.log.info(f'Cleaning out old network: {repr(line)}')
send_command('REMOVE_NETWORK ' + line.split('\t')[0])
send_command('SCAN', handle_scan_failure)
def handle_scan_failure(reply):
if reply == 'FAIL':
send_command('TERMINATE')
send_command('LEVEL 3')
send_command('LIST_NETWORKS', cleanout_networks)
class NetworkState:
def __init__(self, ssid, auth, machine_ds, facet):
turn.log.info(f'Selected network ssid={repr(ssid)} auth={auth.VARIANT}')
self.ssid = ssid
self.auth = auth
self.machine_ds = machine_ds
self.facet = facet
self.wanted = True
self.network_id = None
self.association_state = turn.field(
initial_value=network.WifiAssociationState.inProgress(),
name='association_state')
turn.publish_dataflow(lambda: (
machine_ds, network.WifiAssociation(ifname, self.ssid, self.association_state.value)))
send_command('ADD_NETWORK', self.on_network_created)
def on_network_created(self, new_network_id):
turn.log.info(f'Added network id={new_network_id}')
self.network_id = new_network_id
if self.wanted:
send_command(f'SET_NETWORK {self.network_id} ssid {stringify(self.ssid)}')
if self.auth.VARIANT == Symbol('psk'):
send_command(f'SET_NETWORK {self.network_id} psk {stringify(self.auth.password)}')
send_command(f'ENABLE_NETWORK {self.network_id}', self.on_network_enabled)
else:
send_command(f'REMOVE_NETWORK {self.network_id}')
self.network_id = None
def on_network_enabled(self, reply):
turn.log.info(f'Network enabled: {repr(reply)}')
def update_association_state():
self.association_state.value = network.WifiAssociationState.ready()
turn.external(self.facet, update_association_state)
def on_no_longer_wanted(self):
self.wanted = False
if self.network_id is not None:
send_command(f'REMOVE_NETWORK {self.network_id}')
self.network_id = None
@dataspace.during(machine_ds,
P.quote(network.SelectedWifiNetwork(ifname, P.uCAPTURE, P.uCAPTURE)))
def handle_selected_network(ssid, auth):
auth = network.WifiAuthentication.try_decode(auth)
if auth is None: return
state = NetworkState(ssid, auth, machine_ds, facet)
turn.on_stop(state.on_no_longer_wanted)
def handle_event(e):
if e[0] == '<':
# It's an event
m = re.match(r'<(\d+)>(.*)', e)
if m:
(_level, message) = m.groups()
if message == 'CTRL-EVENT-SCAN-RESULTS':
send_command('SCAN_RESULTS')
turn.log.info(f'Level {_level}: {message}')
else:
turn.log.info(f'Unusual event: {e}')
else:
if len(commands) > 0:
turn.log.info(f'REPLY: {e}')
commands[0][1](e)
commands.pop(0)
send_next_command()
else:
turn.log.warning(f'Unexpected reply: {e}')
facet = turn.active_facet()
loop = find_loop()
turn.log.info('Starting background read thread')
threading.Thread(
name='background-wpa_supplicant-read-thread',
target=lambda: gather_events(facet, handle_event, client, loop)).start()