Source code for slicops.device.screen

"""Control a Screen

:copyright: Copyright (c) 2024 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy).  All Rights Reserved.
:license: http://github.com/slaclab/slicops/LICENSE
"""

from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp, pkdformat
import pykern.pkasyncio
import abc
import enum
import logging
import pykern.pkconfig
import queue
import slicops.device
import slicops.device_db

# TODO(robnagler) these should be reused for both cases
_MOVE_TARGET_IN = PKDict({False: 0, True: 1})

_BLOCKING_MSG = "upstream target is {}"
_TIMEOUT_MSG = "upstream target status accessor timed out"
_ERROR_PREFIX_MSG = "upstream target error: "


[docs] class Screen(slicops.device.Device): """Augment `Device` with screen specific operations""" def __init__(self, beam_path, device_name, handler, *args, **kwargs): super().__init__(device_name, *args, **kwargs) if not isinstance(handler, EventHandler): raise AssertionError( f"handler is not subclass EventHandler type={type(handler)}" ) self.__worker = _Worker(beam_path, handler, self)
[docs] def destroy(self): self.__worker.destroy() super().destroy()
[docs] def move_target(self, want_in): """Insert or remove the target Args: want_in (bool): True to insert, and False to remove """ self.__worker.req_action("req_move_target", PKDict(want_in=want_in))
[docs] class ErrorKind(enum.Enum): """Errors passed to on_screen_device_error""" fsm = enum.auto() monitor = enum.auto() upstream = enum.auto()
[docs] class EventHandler: """Clients of DeviceScreen must implement this"""
[docs] @abc.abstractmethod def on_screen_device_error(self, exc): pass
[docs] @abc.abstractmethod def on_screen_device_update(self, accessor_name, value): pass
class _FSM: """Finite State Machine called by `_Worker` exclusively Manages state via `event` calls and schedules actions in `_Worker`. """ def __init__(self, worker): self.worker = worker self.curr = PKDict( acquire=False, # status of screen acquire move_target_arg=None, # where do we want the target to be? target_status=None, # where is the status right now? await_upstream=False, # are we checking upstream? upstream_problems=None, # are there problems upstream? ) self.prev = self.curr.copy() def event(self, name, arg): self.prev = self.curr.copy() if u := getattr(self, f"_event_{name}")(arg, **self.curr): self.curr.update(u) def _event_handle_monitor( self, arg, await_upstream, move_target_arg, target_status, upstream_problems, **kwargs, ): n = arg.accessor.accessor_name if "error" in arg: self.worker.action( "call_handler", ScreenError( device=self.worker.device.device_name, error_kind=ErrorKind.monitor, accessor_name=n, error_msg=arg.error, ), ) if n == "target_status": # TODO(robnagler) is resetting move_target_arg right? return PKDict(target_status=None, move_target_arg=None) return if "connected" in arg: return if n == "image": v = arg.value rv = None elif n == "acquire": v = arg.value rv = PKDict(acquire=arg.value) elif n == "target_status": v = TargetStatus(arg.value) rv = PKDict(target_status=v) if target_status is None and move_target_arg: rv.await_upstream = self.__move_target_upstream_check( move_target_arg, upstream_problems, await_upstream ) else: rv.move_target_arg = None else: raise AssertionError(f"unsupported accessor={n} {self}") self.worker.action("call_handler", PKDict(accessor_name=n, value=v)) return rv def _event_move_target( self, arg, await_upstream, move_target_arg, target_status, upstream_problems, **kwargs, ): # If target_status hasn't initialized, defer to monitor fire. if target_status == None: rv = PKDict(move_target_arg=arg) return rv if move_target_arg or target_status in ( TargetStatus.MOVING, TargetStatus.INCONSISTENT, ): self.worker.action( "call_handler", ScreenError( device=self.worker.device.device_name, error_kind=ErrorKind.fsm, error_msg="target already moving, inconsistent, or not intialized", ), ) return if arg.want_in == (target_status == TargetStatus.IN): # TODO(robnagler) could be a race condition so probably fine to do nothing pkdlog("same target_status={} self.want_in={}", target_status, arg.want_in) return # TODO(robnagler) allow moving without checking upstream rv = PKDict(move_target_arg=arg) rv.await_upstream = self.__move_target_upstream_check( arg.want_in, upstream_problems, await_upstream ) return rv def _event_upstream_status(self, arg, move_target_arg, **kwargs): rv = PKDict(await_upstream=False, upstream_problems=arg.problems) if arg.problems: self.worker.action( "call_handler", ScreenError( device=self.worker.device.device_name, error_kind=ErrorKind.upstream, error_msg=arg.problems, ), ) return rv.pkupdate(move_target_arg=None) self.worker.action("move_target", move_target_arg) return rv def __move_target_upstream_check(self, want_in, upstream_problems, await_upstream): rv = False if want_in and upstream_problems is None or upstream_problems: # Recheck the upstream if not await_upstream: self.worker.action("check_upstream", None) rv = True else: arg = PKDict(want_in=want_in) self.worker.action("move_target", arg) return rv def __repr__(self): def _states(curr): return " ".join(f"{k}={curr[k]}" for k in sorted(curr.keys())) return f"<_FSM {self.worker.device.device_name} {_states(self.curr)}>"
[docs] class ScreenError(Exception): def __init__(self, **kwargs): def _arg_str(): return pkdformat( " ".join(k + "={" + k + "}" for k in sorted(kwargs)), **kwargs, ) super().__init__(_arg_str())
[docs] class TargetStatus(enum.Enum): """Errors passed to on_screen_device_error""" INCONSISTENT = 3 IN = 2 OUT = 1 MOVING = 0
class _Upstream(pykern.pkasyncio.ActionLoop): """Action loop to check targets of upstream screens""" def __init__(self, worker): def _names(): return slicops.device_db.upstream_devices( "PROF", "target_control", worker.beam_path, worker.device.device_name ) self.__worker = worker self.__problems = PKDict() self.__devices = PKDict({u: slicops.device.Device(u) for u in _names()}) if len(self.__devices) == 0: self.__done() self._destroy() return self._loop_timeout_secs = _cfg.upstream_timeout_secs super().__init__() def action_handle_target_status(self, arg): n = arg.accessor.device.device_name self.__devices.pkdel(n).destroy() if e := arg.get("error"): pkdlog("device={} error={}", n, e) self.__problems[n] = f"{_ERROR_PREFIX_MSG}{e}" elif arg.value != TargetStatus.OUT.value: s = TargetStatus(arg.value) self.__problems[n] = _BLOCKING_MSG.format(s.name) if not self.__devices: return self.__done() return None def action_loop_timeout(self): for n in self.__devices: self.__problems[n] = _TIMEOUT_MSG return self.__done() def _destroy(self): (d, self.__devices) = (self.__devices, PKDict()) for x in d.values(): x.destroy() def __done(self): self.__worker.action("upstream_status", PKDict(problems=self.__problems)) return self._LOOP_END def __handle_target_status(self, kwargs): if "connected" in kwargs: return self.action("handle_target_status", kwargs) def _start(self, *args, **kwargs): for d in self.__devices.values(): d.accessor("target_status").monitor(self.__handle_target_status) super()._start(*args, **kwargs) def _repr(self): return f"pending={sorted(self.__devices)} problems={sorted(self.__problems)}" class _Worker(pykern.pkasyncio.ActionLoop): """Action loop for Screen _Worker uses `_FSM` to translate events to actions. Monitor calls from device are translated to actions to avoid locking in callback. Similarly, when Screen requests to move target, this is a queued action as well. """ def __init__(self, beam_path, handler, device): self.beam_path = beam_path self.device = device self.__handler = handler self.__upstream = None self.__status = None # self.monitors = pkdict... self.__fsm = _FSM(self) # self.monitors ... self.__target_control = None self._loop_timeout_secs = 0 super().__init__() # get from ready queue with timeout # except # exit def action_call_handler(self, arg): m = ( self.__handler.on_screen_device_error if isinstance(arg, Exception) else self.__handler.on_screen_device_update ) # Denormalized state so no need for lock during call if isinstance(arg, dict): return lambda: m(**arg) else: return lambda: m(arg) def action_check_upstream(self, arg): if self.__upstream is None or self.__upstream.destroyed: self.__upstream = _Upstream(self) return None def action_handle_monitor(self, arg): self.__fsm.event("handle_monitor", arg) return None def action_move_target(self, arg): if not self.__target_control: self.__target_control = self.device.accessor("target_control") self.__target_control.put(_MOVE_TARGET_IN[arg.want_in]) return None def action_req_move_target(self, arg): self.__fsm.event("move_target", arg) return None def action_upstream_status(self, arg): self.__fsm.event("upstream_status", arg) self.__upstream = None return None def req_action(self, method, arg): """Called by DeviceScreen which has separate life cycle""" # __fsm.is_ready if self.destroyed: raise AssertionError("object is destroyed") self.action(method, arg) def _destroy(self): if self.__upstream: (u, self.__upstream) = (self.__upstream, None) u.destroy() def _handle_exception(self, exc): self.__handler.on_screen_device_error( ScreenError( device=self.device.device_name, error=exc, ) ) def __handle_monitor(self, change): self.action("handle_monitor", change) def _start(self, *args, **kwargs): for a in "acquire", "image": # self.monitors ... self.device.accessor(a).monitor(self.__handle_monitor) if self.device.has_accessor("target_status"): self.device.accessor("target_status").monitor(self.__handle_monitor) super()._start(*args, **kwargs) def _repr(self): return f"device={self.device.device_name}" _cfg = pykern.pkconfig.init( upstream_timeout_secs=( 15, pykern.pkconfig.parse_seconds, "how long to wait for updates from devices", ), )