110 lines
2.8 KiB
Python
110 lines
2.8 KiB
Python
|
#!/usr/bin/env python3
|
||
|
import os
|
||
|
from fcntl import ioctl
|
||
|
from time import sleep
|
||
|
import struct
|
||
|
from collections import namedtuple
|
||
|
import cffi
|
||
|
import select
|
||
|
import datetime
|
||
|
import sys
|
||
|
|
||
|
ffi = cffi.FFI()
|
||
|
|
||
|
def mkstruct(name, fields, spec):
|
||
|
cls = namedtuple(name, fields)
|
||
|
def iter_load(bs):
|
||
|
return map(cls._make, struct.iter_unpack(spec, bs))
|
||
|
cls.iter_load = iter_load
|
||
|
def load(bs):
|
||
|
return cls._make(struct.unpack(spec, bs))
|
||
|
cls.load = load
|
||
|
def iter_save(items):
|
||
|
return b''.join(struct.pack(spec, *item) for item in items)
|
||
|
cls.iter_save = iter_save
|
||
|
def save(item):
|
||
|
return struct.pack(spec, *item)
|
||
|
cls.save = save
|
||
|
return cls
|
||
|
|
||
|
# struct sipc_fmt_hdr, modem_prj.h
|
||
|
Packet = mkstruct('Packet', 'len msg_seq ack_seq main_cmd sub_cmd cmd_type', '<HBBBBB')
|
||
|
|
||
|
# enum modem_state from modem_prj.h
|
||
|
enum_modem_state = [
|
||
|
'STATE_OFFLINE',
|
||
|
'STATE_CRASH_RESET',
|
||
|
'STATE_CRASH_EXIT',
|
||
|
'STATE_BOOTING',
|
||
|
'STATE_ONLINE',
|
||
|
'STATE_NV_REBUILDING',
|
||
|
'STATE_LOADER_DONE',
|
||
|
'STATE_SIM_ATTACH',
|
||
|
'STATE_SIM_DETACH',
|
||
|
'STATE_CRASH_WATCHDOG',
|
||
|
]
|
||
|
|
||
|
def modem_status(fd):
|
||
|
return enum_modem_state[ioctl(fd, 0x6f27)]
|
||
|
|
||
|
class Driver(object):
|
||
|
def __init__(self, name, fd):
|
||
|
self.name = name
|
||
|
self.fd = fd
|
||
|
|
||
|
def read_ready(self):
|
||
|
i = os.read(self.fd, 65536)
|
||
|
self.decode(i)
|
||
|
print(str(datetime.datetime.now()), self.name, '({:04x})'.format(len(i)), self.statusreport())
|
||
|
|
||
|
def decode(self, bs):
|
||
|
raise Exception('subclassResponsibility')
|
||
|
|
||
|
def statusreport(self):
|
||
|
raise Exception('subclassResponsibility')
|
||
|
|
||
|
class IpcDriver(Driver):
|
||
|
def __init__(self, *args):
|
||
|
super().__init__(*args)
|
||
|
self.head = None
|
||
|
self.body = b''
|
||
|
self.leftover = b''
|
||
|
|
||
|
def decode(self, bs):
|
||
|
self.head = Packet.load(bs[:7])
|
||
|
bodysize = self.head.len - 7
|
||
|
self.body = bs[7:self.head.len]
|
||
|
self.leftover = bs[self.head.len:]
|
||
|
|
||
|
def statusreport(self):
|
||
|
return ' '.join([modem_status(self.fd), repr(self.head), self.body.hex(), repr(self.body), self.leftover.hex()])
|
||
|
|
||
|
class RfsDriver(Driver):
|
||
|
def __init__(self, *args):
|
||
|
super().__init__(*args)
|
||
|
self.packet = b''
|
||
|
|
||
|
def decode(self, bs):
|
||
|
self.packet = bs
|
||
|
|
||
|
def statusreport(self):
|
||
|
return self.packet.hex()
|
||
|
|
||
|
ipc0_path = '/dev/umts_ipc0'
|
||
|
rfs0_path = '/dev/umts_rfs0'
|
||
|
|
||
|
ipc0_fd = os.open(ipc0_path, os.O_RDWR)
|
||
|
rfs0_fd = os.open(rfs0_path, os.O_RDWR)
|
||
|
drivers = {}
|
||
|
drivers[ipc0_fd] = IpcDriver('ipc0', ipc0_fd)
|
||
|
drivers[rfs0_fd] = RfsDriver('rfs0', rfs0_fd)
|
||
|
|
||
|
while True:
|
||
|
(readfds, _writefds, errorfds) = select.select([ipc0_fd, rfs0_fd], [], [ipc0_fd, rfs0_fd])
|
||
|
if len(errorfds) != 0:
|
||
|
print('AIEEE', errorfds)
|
||
|
break
|
||
|
for fd in readfds:
|
||
|
drivers[fd].read_ready()
|
||
|
sys.stdout.flush()
|