synit/packaging/packages/synit-config/files/usr/lib/synit/python/synit/daemon/interface_monitor.py

181 lines
6.6 KiB
Python

import sys
from syndicate import relay, turn, Symbol, Record, Formatter
from syndicate.actor import find_loop
from syndicate.during import During
from socket import AF_INET, AF_INET6
from dataclasses import dataclass
from typing import Optional
import logging
import preserves.schema
import threading
import pyroute2
from pr2modules.iwutil import IW
try:
schemas = preserves.schema.load_schema_file('/usr/share/synit/schemas/schema-bundle.prb')
except:
schemas = preserves.schema.load_schema_file('/home/tonyg/src/synit/protocols/schema-bundle.bin')
network = schemas.network
class LenientFormatter(Formatter):
def cannot_format(self, v):
if v is None:
self.append(Record(Symbol('null'), []))
else:
super().cannot_format(v)
def lenient_stringify(v, indent=None):
e = LenientFormatter(indent=indent)
e.append(v)
return e.contents()
@dataclass
class Link:
handle: Optional[int]
record: Optional[network.Interface]
def update(self, machine_ds, newRecord):
if self.record != newRecord:
self.handle = turn.replace(machine_ds, self.handle, newRecord)
self.record = newRecord
def parse_route(linktable, m):
af_number = m['family']
if af_number == AF_INET6: af = network.AddressFamily.ipv6()
elif af_number == AF_INET: af = network.AddressFamily.ipv4()
else: af = network.AddressFamily.other(af_number)
dst = m.get_attr('RTA_DST')
if dst is None:
dst = network.RouteDestination.default()
else:
dst = network.RouteDestination.prefix(dst, m['dst_len'])
priority = m.get_attr('RTA_PRIORITY', 0)
tos = m['tos']
link = linktable.get(m.get_attr('RTA_OIF'), None)
if link is None:
ifname = network.RouteInterface.none()
else:
ifname = network.RouteInterface.name(link.record.name)
gw = m.get_attr('RTA_GATEWAY')
gw = network.Gateway.addr(gw) if gw is not None else network.Gateway.none()
return network.Route(af, dst, priority, tos, ifname, gw)
operational_state_map = {
'UP': network.OperationalState.up(),
'DORMANT': network.OperationalState.dormant(),
'TESTING': network.OperationalState.testing(),
'LOWERLAYERDOWN': network.OperationalState.lowerLayerDown(),
'DOWN': network.OperationalState.down(),
}
administrative_state_map = {
'up': network.AdministrativeState.up(),
'down': network.AdministrativeState.down(),
}
def parse_interface(m, iw):
wireless_info = None
try:
wireless_info = iw.get_interface_by_ifindex(m['index'])
except Exception as e:
# presumably, no wireless extensions
pass
iftype = network.InterfaceType.normal()
if m['flags'] & 8:
iftype = network.InterfaceType.loopback()
if wireless_info is not None:
iftype = network.InterfaceType.wireless()
return network.Interface(m.get_attr('IFLA_IFNAME'),
m['index'],
iftype,
administrative_state_map.get(m['state'],
network.AdministrativeState.unknown()),
operational_state_map.get(m.get_attr('IFLA_OPERSTATE', 'UNKNOWN'),
network.OperationalState.unknown()),
(network.CarrierState.carrier()
if m.get_attr('IFLA_CARRIER', 0) else
network.CarrierState.noCarrier()),
m.get_attr('IFLA_ADDRESS', ''))
def gather_events_from_socket(facet, callback, ip, loop):
facet.log.debug('Background netlink socket read thread started')
try:
while True:
facet.log.debug('waiting for event...')
events = ip.get()
facet.log.debug(f'... got {len(events)} events')
# AAARGH python's horrible closure rules wrt mutability bite AGAIN!!!!
def handler_for_specific_events(events):
return lambda: callback(events)
turn.external(facet, handler_for_specific_events(events), loop=loop)
except Exception as e:
facet.log.debug(e)
finally:
facet.log.debug('Background netlink socket read thread terminated')
@relay.service(name='interface_monitor', debug=False)
@During().add_handler
def main(args):
machine_ds = args[Symbol('machine')].embeddedValue
ip = pyroute2.IPRoute()
ip.bind()
iw = IW()
@turn.on_stop_or_crash
def shutdown():
ip.close()
linktable = {}
routetable = {}
def handle_events(events):
for m in events:
event_type = m['event']
if event_type == 'RTM_NEWLINK':
i = parse_interface(m, iw)
if (wireless_extension := m.get_attr('IFLA_WIRELESS', None)) is not None \
and wireless_extension.get_attr('SIOCGIWSCAN', None) is not None:
turn.log.info(f'Interface {i.name} is performing a rescan')
else:
with open('/tmp/' + i.name + '.info', 'w') as f:
f.write(lenient_stringify(m, indent=4))
if i.index not in linktable:
linktable[i.index] = Link(None, None)
linktable[i.index].update(machine_ds, i)
elif event_type == 'RTM_DELLINK':
i = parse_interface(m, iw)
link = linktable.pop(i.index, None)
if link is not None:
link.update(machine_ds, None)
elif event_type == 'RTM_NEWROUTE':
if m.get_attr('RTA_TABLE') == 254: # RT_TABLE_MAIN
r = parse_route(linktable, m)
if r not in routetable:
routetable[r] = turn.publish(machine_ds, r)
elif event_type == 'RTM_DELROUTE':
if m.get_attr('RTA_TABLE') == 254: # RT_TABLE_MAIN
r = parse_route(linktable, m)
turn.retract(routetable.pop(r, None))
elif event_type in ['RTM_NEWNEIGH', 'RTM_DELNEIGH',
'RTM_NEWADDR', 'RTM_DELADDR']:
# Ignored
pass
else:
turn.log.info(f'Unhandled netlink event: {lenient_stringify(m)}')
handle_events(ip.get_links())
handle_events(ip.get_routes())
facet = turn.active_facet()
loop = find_loop()
facet.log.info('Starting background netlink thread')
threading.Thread(
name='background-netlink-socket-read-thread',
target=lambda: gather_events_from_socket(facet, handle_events, ip, loop)).start()