"""Profile monitor Sliclet
:copyright: Copyright (c) 2025 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy). All Rights Reserved.
:license: http://github.com/slaclab/slicops/LICENSE
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp
import pykern.pkcompat
import pykern.pkconfig
import pykern.util
import queue
import slicops.device
import slicops.device.screen
import slicops.device_db
import slicops.plot
import slicops.sliclet
import threading
_DEVICE_TYPE = "PROF"
_cfg = None
_BUTTONS_DISABLE = (
("single_button.ui.enabled", False),
("start_button.ui.enabled", False),
("stop_button.ui.enabled", False),
)
_TARGET_DISABLE = (
("target_in_button.ui.enabled", False),
("target_out_button.ui.enabled", False),
)
_TARGET_INVISIBLE = (
("target_in_button.ui.visible", False),
("target_out_button.ui.visible", False),
("target_status.ui.visible", False),
)
_TARGET_VISIBLE = (
("target_in_button.ui.visible", True),
("target_out_button.ui.visible", True),
("target_status.ui.visible", True),
)
_BUTTONS_INVISIBLE = (
("single_button.ui.visible", False),
("start_button.ui.visible", False),
("stop_button.ui.visible", False),
)
_BUTTONS_VISIBLE = (
("images_to_average.ui.visible", True),
("single_button.ui.visible", True),
("start_button.ui.visible", True),
("stop_button.ui.visible", True),
)
_DEVICE_DISABLE = (
(
("color_map.ui.enabled", False),
("color_map.ui.visible", False),
("curve_fit_method.ui.enabled", False),
("curve_fit_method.ui.visible", False),
("plot.ui.visible", False),
# Useful to avoid large ctx sends
("plot.value", None),
("csi_name.ui.visible", False),
("csi_name.value", None),
("save_to_file.ui.enabled", False),
("save_to_file.ui.visible", False),
)
+ _BUTTONS_DISABLE
+ _BUTTONS_INVISIBLE
+ _TARGET_DISABLE
+ _TARGET_INVISIBLE
)
_DEVICE_ENABLE = (("csi_name.ui.visible", True),) + _BUTTONS_VISIBLE
_PLOT_ENABLE = (
("color_map.ui.enabled", True),
("color_map.ui.visible", True),
("curve_fit_method.ui.enabled", True),
("curve_fit_method.ui.visible", True),
("plot.ui.visible", True),
("save_to_file.ui.enabled", True),
("save_to_file.ui.visible", True),
)
[docs]
class Screen(slicops.sliclet.Base):
def __init__(self, *args):
self.__current_value = PKDict(acquire=None, image=None, target=None)
super().__init__(*args)
[docs]
def handle_destroy(self):
self.__device_destroy()
[docs]
def on_change_camera(self, txn, value, **kwargs):
self.__device_change(txn, txn.field_value("beam_path"), value)
[docs]
def on_change_beam_path(self, txn, value, **kwargs):
self.__beam_path_change(txn, value)
[docs]
def on_change_curve_fit_method(self, txn, value, **kwargs):
# TODO(robnagler) optimize with ImageSet.update_curve_fit_method()
self.__new_image_set(txn)
[docs]
def on_change_images_to_average(self, txn, value, **kwargs):
# TODO(robnagler) optimize with ImageSet.update_images_to_average()
self.__new_image_set(txn)
[docs]
def on_click_save_to_file(self, txn, **kwargs):
# TODO(pjm) provide UI notice with file info, download link
self.__image_set.save_file(self.save_file_path())
[docs]
def handle_init(self, txn):
self.__device = None
self.__handler = None
self.__single_button = False
txn.multi_group_attr_set(
("beam_path.constraints.choices", slicops.device_db.beam_paths())
)
self.__beam_path_change(txn, None)
self.__device_change(txn, None, None)
b = c = None
if pykern.pkconfig.in_dev_mode():
b = _cfg.dev.beam_path
c = _cfg.dev.camera
# the values are None by default, but this initializes
# the state of the choices, buttons and fields appropriately
txn.field_value_set("beam_path", b)
self.__beam_path_change(txn, b)
txn.field_value_set("camera", c)
[docs]
def handle_start(self, txn):
self.__device_setup(
txn, txn.field_value("beam_path"), txn.field_value("camera")
)
def __beam_path_change(self, txn, value):
def _choices():
if value is None:
return ()
return slicops.device_db.device_names(_DEVICE_TYPE, value)
txn.multi_group_attr_set(
("camera.constraints.choices", _choices()),
("camera.value", None),
)
# This technically shouldn't happen
if value is None:
txn.multi_group_attr_set(
_DEVICE_DISABLE
+ (("camera.ui.enabled", False), ("camera.ui.visible", False))
)
else:
txn.multi_group_attr_set(
(("camera.ui.enabled", True), ("camera.ui.visible", True))
)
if not self.__device:
# No device change
return
c = self.__device.device_name
if txn.is_field_value_valid("camera", c):
# Camera is the same so restore the value, no device change
txn.field_value_set("camera", c)
else:
self.__device_change(txn, value, None)
def __device_change(self, txn, beam_path, camera):
self.__device_destroy(txn)
txn.multi_group_attr_set(_DEVICE_DISABLE)
self.__device_setup(txn, beam_path, camera)
def __device_destroy(self, txn=None):
if not self.__device:
return
self.__image_set = None
self.__single_button = False
self.__handler.destroy()
self.__handler = None
try:
n = self.__device.device_name
except Exception:
n = None
try:
self.__device.destroy()
except Exception as e:
pkdlog("destroy device={} error={}", n, e)
self.__device = None
def __device_setup(self, txn, beam_path, camera):
self.__handler = _Handler(
self.__handle_device_error,
PKDict(
image=self.__handle_image,
acquire=self.__handle_acquire,
target_status=self.__handle_target_status,
),
)
if camera is None:
return
try:
# If there's an epics issues, we have to clear the device
self.__device = slicops.device.screen.Screen(
beam_path,
camera,
self.__handler,
)
except slicops.device.DeviceError as e:
pkdlog("error={} setting up {}, clearing; stack={}", e, camera, pkdexc())
self.__device_destroy(txn)
self.__user_alert(txn, "unable to connect to camera={} error={}", camera, e)
return
s = PKDict(_DEVICE_ENABLE + (("csi_name.value", self.__device.meta.csi_name),))
if self.__device.has_accessor("target_status"):
s.update(_TARGET_VISIBLE)
txn.multi_group_attr_set(s)
self.__new_image_set(txn)
def __handle_acquire(self, acquire):
with self.lock_for_update() as txn:
self.__current_value["acquire"] = acquire
n = not acquire
# Leave plot alone
txn.multi_group_attr_set(
("single_button.ui.enabled", n),
("start_button.ui.enabled", n),
(
"stop_button.ui.enabled",
acquire and not self.__single_button,
),
)
if not acquire:
self.__single_button = False
def __handle_device_error(self, exc):
self.put_exception(exc)
def __handle_image(self, image):
with self.lock_for_update() as txn:
self.__current_value["image"] = image
if self.__update_plot(txn) and self.__single_button:
self.__set(txn, "acquire", False, _BUTTONS_DISABLE)
txn.multi_group_attr_set(
("single_button.ui.enabled", True),
("start_button.ui.enabled", True),
)
def __new_image_set(self, txn):
self.__image_set = slicops.plot.ImageSet(
txn.multi_field_value(
(
"beam_path",
"camera",
"curve_fit_method",
"images_to_average",
"csi_name",
)
),
)
def __handle_target_status(self, status):
with self.lock_for_update() as txn:
self.__current_value["target"] = status
txn.multi_group_attr_set(
("target_status", status.name),
(
"target_in_button.ui.enabled",
status == slicops.device.screen.TargetStatus.OUT,
),
(
"start_button.ui.enabled",
status == slicops.device.screen.TargetStatus.IN,
),
(
"single_button.ui.enabled",
status == slicops.device.screen.TargetStatus.IN,
),
(
"target_out_button.ui.enabled",
status == slicops.device.screen.TargetStatus.IN,
),
)
def __set(self, txn, accessor, value, txn_set, method=None):
if not self.__device or not self.__handler:
# buttons already disabled
return
v = self.__current_value[accessor]
if v is not None and v == value:
# No button disable since nothing changed
return
txn.multi_group_attr_set(txn_set)
try:
if method is None:
self.__device.put(accessor, value)
else:
m = getattr(self.__device, method)
m(value)
except slicops.device.DeviceError as e:
pkdlog(
"error={} on {}, clearing camera; stack={}", e, self.__device, pkdexc()
)
raise pykern.util.APIError(e)
def __update_plot(self, txn):
if not self.__device or not self.__handler:
return False
if (i := self.__current_value["image"]) is None or not i.size:
return False
if (p := self.__image_set.add_frame(i, pykern.pkcompat.utcnow())) is None:
return False
if not txn.group_attr("plot", "ui", "visible"):
txn.multi_group_attr_set(_PLOT_ENABLE)
txn.field_value_set("plot", p)
return True
def __user_alert(self, txn, fmt, *args):
pkdlog("TODO: USER ALERT: " + fmt, *args)
CLASS = Screen
class _Handler(slicops.device.screen.EventHandler):
def __init__(
self,
handle_device_error,
handle_device_update,
):
self.__destroyed = False
self.__lock = threading.Lock()
self.__handle_device_error = handle_device_error
self.__handle_device_update = handle_device_update
def destroy(self):
with self.__lock:
if self.__destroyed:
return
self.__destroyed = True
self.__handle_device_error = None
self.__handle_device_update = None
def on_screen_device_error(self, exc):
self.__handle_device_error(exc)
def on_screen_device_update(self, accessor_name, value):
# TODO move prev value to sliclet within txn
if not accessor_name in self.__handle_device_update:
raise AssertionError(f"unsupported accessor={n} {self}")
h = self.__handle_device_update[accessor_name]
h(value)
def _init():
global _cfg
_cfg = pykern.pkconfig.init(
dev=PKDict(
beam_path=("DEV_BEAM_PATH", str, "dev beam path name"),
camera=("DEV_CAMERA", str, "dev camera name"),
),
)
_init()