house/tracking/track.py

87 lines
3.1 KiB
Python
Executable File

#!/usr/bin/env python3
import sys
import cv2
import numpy as np
import time
import argparse
from syndicate import relay, turn, Symbol
from syndicate.during import During
from preserves.schema import load_schema_file
schemas = load_schema_file('../protocols/house-schemas.bin')
LiteralVector3 = schemas.shapes.LiteralVector3
tracking = schemas.tracking
parser = argparse.ArgumentParser(description='Head tracker')
parser.add_argument('camera_name', help='name for this camera', default=None)
cli_args = parser.parse_args()
def open_video():
while True:
try:
video = cv2.VideoCapture(0)
video.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
video.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
return video
except:
import traceback
traceback.print_exc()
time.sleep(2)
import json
with open('calibration.json', 'r') as f:
calibration = json.load(f)
cameraMatrix = np.asarray(calibration['cameraMatrix'])
distCoeffs = np.asarray(calibration['distCoeffs'])
d = cv2.aruco.getPredefinedDictionary(cv2.aruco.DICT_4X4_50)
@relay.service(name = 'headtracker', debug = True)
@During().add_handler
def main(args):
turn.log.info(f'headtracker {cli_args} {args}')
turn.on_stop(lambda: turn.log.info('stopping'))
main_ds = args.get(Symbol('mainDataspace')).embeddedValue
@turn.linked_task(run_in_executor=True)
def capture_task(facet):
video = open_video()
while facet.alive:
(ok, frame) = video.read()
now = time.time()
if not ok:
facet.log.info(f'video.read() yielded false')
time.sleep(0.2)
video = open_video()
continue
(corners, ids, rejectedImagePoints) = \
cv2.aruco.detectMarkers(cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY), d)
cv2.aruco.drawDetectedMarkers(frame, corners, ids)
if ids is not None:
(rvecs, tvecs, _objPoints) = cv2.aruco.estimatePoseSingleMarkers(corners, 0.02, cameraMatrix, distCoeffs)
for i in range(len(rvecs)):
rv = rvecs[i]
tv = tvecs[i]
mi = ids[i]
if mi[0] <= 10: ## we only use markers [0-10]
marker = tracking.Marker(cli_args.camera_name,
mi[0],
LiteralVector3(*rv[0].tolist()),
LiteralVector3(*tv[0].tolist()),
now)
# facet.log.info(marker)
turn.external(facet, lambda: turn.send(main_ds, marker))
cv2.drawFrameAxes(frame, cameraMatrix, distCoeffs, rv, tv, 0.02)
# if ids is not None:
# (diamondCorners, diamondIds) = cv2.aruco.detectCharucoDiamond(frame, corners, ids, 200/120)
# cv2.aruco.drawDetectedDiamonds(frame, diamondCorners, diamondIds)
cv2.imshow('track', frame)
cv2.waitKey(1) # for liveness??