Source code for slicops.sliclet.screen

"""Profile monitor Sliclet

:copyright: Copyright (c) 2025 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy).  All Rights Reserved.
:license: http://github.com/slaclab/slicops/LICENSE
"""

from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp
import pykern.pkcompat
import pykern.pkconfig
import pykern.util
import queue
import slicops.device
import slicops.device.screen
import slicops.device_db
import slicops.plot
import slicops.sliclet
import threading

_DEVICE_TYPE = "PROF"

_cfg = None

_BUTTONS_DISABLE = (
    ("single_button.ui.enabled", False),
    ("start_button.ui.enabled", False),
    ("stop_button.ui.enabled", False),
)

_TARGET_DISABLE = (
    ("target_in_button.ui.enabled", False),
    ("target_out_button.ui.enabled", False),
)

_TARGET_INVISIBLE = (
    ("target_in_button.ui.visible", False),
    ("target_out_button.ui.visible", False),
    ("target_status.ui.visible", False),
)

_TARGET_VISIBLE = (
    ("target_in_button.ui.visible", True),
    ("target_out_button.ui.visible", True),
    ("target_status.ui.visible", True),
)

_BUTTONS_INVISIBLE = (
    ("single_button.ui.visible", False),
    ("start_button.ui.visible", False),
    ("stop_button.ui.visible", False),
)

_BUTTONS_VISIBLE = (
    ("images_to_average.ui.visible", True),
    ("single_button.ui.visible", True),
    ("start_button.ui.visible", True),
    ("stop_button.ui.visible", True),
)

_DEVICE_DISABLE = (
    (
        ("color_map.ui.enabled", False),
        ("color_map.ui.visible", False),
        ("curve_fit_method.ui.enabled", False),
        ("curve_fit_method.ui.visible", False),
        ("plot.ui.visible", False),
        # Useful to avoid large ctx sends
        ("plot.value", None),
        ("csi_name.ui.visible", False),
        ("csi_name.value", None),
        ("save_to_file.ui.enabled", False),
        ("save_to_file.ui.visible", False),
    )
    + _BUTTONS_DISABLE
    + _BUTTONS_INVISIBLE
    + _TARGET_DISABLE
    + _TARGET_INVISIBLE
)

_DEVICE_ENABLE = (("csi_name.ui.visible", True),) + _BUTTONS_VISIBLE

_PLOT_ENABLE = (
    ("color_map.ui.enabled", True),
    ("color_map.ui.visible", True),
    ("curve_fit_method.ui.enabled", True),
    ("curve_fit_method.ui.visible", True),
    ("plot.ui.visible", True),
    ("save_to_file.ui.enabled", True),
    ("save_to_file.ui.visible", True),
)


[docs] class Screen(slicops.sliclet.Base): def __init__(self, *args): self.__current_value = PKDict(acquire=None, image=None, target=None) super().__init__(*args)
[docs] def handle_destroy(self): self.__device_destroy()
[docs] def on_change_camera(self, txn, value, **kwargs): self.__device_change(txn, txn.field_value("beam_path"), value)
[docs] def on_change_beam_path(self, txn, value, **kwargs): self.__beam_path_change(txn, value)
[docs] def on_change_curve_fit_method(self, txn, value, **kwargs): # TODO(robnagler) optimize with ImageSet.update_curve_fit_method() self.__new_image_set(txn)
[docs] def on_change_images_to_average(self, txn, value, **kwargs): # TODO(robnagler) optimize with ImageSet.update_images_to_average() self.__new_image_set(txn)
[docs] def on_click_save_to_file(self, txn, **kwargs): # TODO(pjm) provide UI notice with file info, download link self.__image_set.save_file(self.save_file_path())
[docs] def on_click_single_button(self, txn, **kwargs): self.__single_button = True self.__set(txn, "acquire", True, _BUTTONS_DISABLE)
[docs] def on_click_start_button(self, txn, **kwargs): self.__set(txn, "acquire", True, _BUTTONS_DISABLE)
[docs] def on_click_stop_button(self, txn, **kwargs): self.__set(txn, "acquire", False, _BUTTONS_DISABLE)
[docs] def on_click_target_in_button(self, txn, **kwargs): self.__set(txn, "target", True, _TARGET_DISABLE, method="move_target")
[docs] def on_click_target_out_button(self, txn, **kwargs): self.__set(txn, "acquire", False, _BUTTONS_DISABLE) self.__set(txn, "target", False, _TARGET_DISABLE, method="move_target")
[docs] def handle_init(self, txn): self.__device = None self.__handler = None self.__single_button = False txn.multi_group_attr_set( ("beam_path.constraints.choices", slicops.device_db.beam_paths()) ) self.__beam_path_change(txn, None) self.__device_change(txn, None, None) b = c = None if pykern.pkconfig.in_dev_mode(): b = _cfg.dev.beam_path c = _cfg.dev.camera # the values are None by default, but this initializes # the state of the choices, buttons and fields appropriately txn.field_value_set("beam_path", b) self.__beam_path_change(txn, b) txn.field_value_set("camera", c)
[docs] def handle_start(self, txn): self.__device_setup( txn, txn.field_value("beam_path"), txn.field_value("camera") )
def __beam_path_change(self, txn, value): def _choices(): if value is None: return () return slicops.device_db.device_names(_DEVICE_TYPE, value) txn.multi_group_attr_set( ("camera.constraints.choices", _choices()), ("camera.value", None), ) # This technically shouldn't happen if value is None: txn.multi_group_attr_set( _DEVICE_DISABLE + (("camera.ui.enabled", False), ("camera.ui.visible", False)) ) else: txn.multi_group_attr_set( (("camera.ui.enabled", True), ("camera.ui.visible", True)) ) if not self.__device: # No device change return c = self.__device.device_name if txn.is_field_value_valid("camera", c): # Camera is the same so restore the value, no device change txn.field_value_set("camera", c) else: self.__device_change(txn, value, None) def __device_change(self, txn, beam_path, camera): self.__device_destroy(txn) txn.multi_group_attr_set(_DEVICE_DISABLE) self.__device_setup(txn, beam_path, camera) def __device_destroy(self, txn=None): if not self.__device: return self.__image_set = None self.__single_button = False self.__handler.destroy() self.__handler = None try: n = self.__device.device_name except Exception: n = None try: self.__device.destroy() except Exception as e: pkdlog("destroy device={} error={}", n, e) self.__device = None def __device_setup(self, txn, beam_path, camera): self.__handler = _Handler( self.__handle_device_error, PKDict( image=self.__handle_image, acquire=self.__handle_acquire, target_status=self.__handle_target_status, ), ) if camera is None: return try: # If there's an epics issues, we have to clear the device self.__device = slicops.device.screen.Screen( beam_path, camera, self.__handler, ) except slicops.device.DeviceError as e: pkdlog("error={} setting up {}, clearing; stack={}", e, camera, pkdexc()) self.__device_destroy(txn) self.__user_alert(txn, "unable to connect to camera={} error={}", camera, e) return s = PKDict(_DEVICE_ENABLE + (("csi_name.value", self.__device.meta.csi_name),)) if self.__device.has_accessor("target_status"): s.update(_TARGET_VISIBLE) txn.multi_group_attr_set(s) self.__new_image_set(txn) def __handle_acquire(self, acquire): with self.lock_for_update() as txn: self.__current_value["acquire"] = acquire n = not acquire # Leave plot alone txn.multi_group_attr_set( ("single_button.ui.enabled", n), ("start_button.ui.enabled", n), ( "stop_button.ui.enabled", acquire and not self.__single_button, ), ) if not acquire: self.__single_button = False def __handle_device_error(self, exc): self.put_exception(exc) def __handle_image(self, image): with self.lock_for_update() as txn: self.__current_value["image"] = image if self.__update_plot(txn) and self.__single_button: self.__set(txn, "acquire", False, _BUTTONS_DISABLE) txn.multi_group_attr_set( ("single_button.ui.enabled", True), ("start_button.ui.enabled", True), ) def __new_image_set(self, txn): self.__image_set = slicops.plot.ImageSet( txn.multi_field_value( ( "beam_path", "camera", "curve_fit_method", "images_to_average", "csi_name", ) ), ) def __handle_target_status(self, status): with self.lock_for_update() as txn: self.__current_value["target"] = status txn.multi_group_attr_set( ("target_status", status.name), ( "target_in_button.ui.enabled", status == slicops.device.screen.TargetStatus.OUT, ), ( "start_button.ui.enabled", status == slicops.device.screen.TargetStatus.IN, ), ( "single_button.ui.enabled", status == slicops.device.screen.TargetStatus.IN, ), ( "target_out_button.ui.enabled", status == slicops.device.screen.TargetStatus.IN, ), ) def __set(self, txn, accessor, value, txn_set, method=None): if not self.__device or not self.__handler: # buttons already disabled return v = self.__current_value[accessor] if v is not None and v == value: # No button disable since nothing changed return txn.multi_group_attr_set(txn_set) try: if method is None: self.__device.put(accessor, value) else: m = getattr(self.__device, method) m(value) except slicops.device.DeviceError as e: pkdlog( "error={} on {}, clearing camera; stack={}", e, self.__device, pkdexc() ) raise pykern.util.APIError(e) def __update_plot(self, txn): if not self.__device or not self.__handler: return False if (i := self.__current_value["image"]) is None or not i.size: return False if (p := self.__image_set.add_frame(i, pykern.pkcompat.utcnow())) is None: return False if not txn.group_attr("plot", "ui", "visible"): txn.multi_group_attr_set(_PLOT_ENABLE) txn.field_value_set("plot", p) return True def __user_alert(self, txn, fmt, *args): pkdlog("TODO: USER ALERT: " + fmt, *args)
CLASS = Screen class _Handler(slicops.device.screen.EventHandler): def __init__( self, handle_device_error, handle_device_update, ): self.__destroyed = False self.__lock = threading.Lock() self.__handle_device_error = handle_device_error self.__handle_device_update = handle_device_update def destroy(self): with self.__lock: if self.__destroyed: return self.__destroyed = True self.__handle_device_error = None self.__handle_device_update = None def on_screen_device_error(self, exc): self.__handle_device_error(exc) def on_screen_device_update(self, accessor_name, value): # TODO move prev value to sliclet within txn if not accessor_name in self.__handle_device_update: raise AssertionError(f"unsupported accessor={n} {self}") h = self.__handle_device_update[accessor_name] h(value) def _init(): global _cfg _cfg = pykern.pkconfig.init( dev=PKDict( beam_path=("DEV_BEAM_PATH", str, "dev beam path name"), camera=("DEV_CAMERA", str, "dev camera name"), ), ) _init()