"""Basic device operations
:copyright: Copyright (c) 2024 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy). All Rights Reserved.
:license: http://github.com/slaclab/slicops/LICENSE
"""
from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp
import epics
import slicops.device_db
import threading
# TODO(robnagler) configure via device_db
_TIMEOUT = 5
[docs]
class AccessorPutError(RuntimeError):
"""This accessor is not writable"""
pass
[docs]
class DeviceError(RuntimeError):
"""Error communicating with accessor"""
pass
[docs]
class Device:
"""Wrapper around physical device
Attributes:
name (str): name of device
meta (slicops.device_db.DeviceMeta): information about device
"""
def __init__(self, device_name):
self.device_name = device_name
self.meta = slicops.device_db.meta_for_device(device_name)
self._destroyed = False
self._accessor = PKDict()
self.connected = False
[docs]
def accessor(self, accessor_name):
"""Get `_Accessor` for more complex operations
Args:
accessor_name (str): control system independent name
Returns:
_Accessor: object holding control system state
"""
if self._destroyed:
raise AssertionError(f"destroyed {self}")
return self._accessor.pksetdefault(
accessor_name, lambda: _Accessor(self, accessor_name)
)[accessor_name]
[docs]
def destroy(self):
"""Disconnect from accessors and remove state about device"""
if self._destroyed:
return
self._destroyed = True
x = list(self._accessor.values())
self._accessor = PKDict()
for a in x:
a.destroy()
[docs]
def get(self, accessor_name):
"""Read from accessor
Args:
accessor_name (str):
Returns:
object: the value from the control system converted to a Python type
"""
return self.accessor(accessor_name).get()
[docs]
def has_accessor(self, accessor_name):
"""Check whether device has accessor
Args:
accessor_name (str): control system independent name
Returns:
bool: True if accessor is found for device
"""
return accessor_name in self.meta.accessor
[docs]
def put(self, accessor_name, value):
"""Set accessor to value
Args:
accessor_name (str): control system independent name
value (object): Value to write to control system
"""
return self.accessor(accessor_name).put(value)
def __repr__(self):
return f"<Device {self.device_name}>"
class _Accessor:
"""Container for a control system interface (CSI): value, metadata, and dynamic state
Attributes:
device (Device): object holding this accessor
meta (PKDict): meta data about the accessor, e.g. csi_name, writable
"""
def __init__(self, device, accessor_name):
self.device = device
self.accessor_name = accessor_name
self.meta = device.meta.accessor[accessor_name]
self._callback = None
self._destroyed = False
self._lock = threading.Lock()
self._initialized = threading.Event()
self._initializing = False
# Defer initialization
self._cs = None
def destroy(self):
"""Stop all monitoring and disconnect from accessor"""
if self._destroyed:
return
with self._lock:
if self._destroyed:
return
self._destroyed = True
self._initializing = False
self._callback = None
if (p := self._cs) is None:
return
self._cs = None
self._initialized.set()
try:
# Clears all callbacks
p.disconnect()
except Exception as e:
pkdlog("error={} {} stack={}", e, self, pkdexc())
def get(self):
"""Read from control system
Returns:
object: the value from the accessor converted to a Python type
"""
p = self.__cs()
if (rv := p.get(timeout=_TIMEOUT)) is None:
raise DeviceError(f"unable to get {self}")
if not p.connected:
raise DeviceError(f"disconnected {self}")
return self._fixup_value(rv)
def monitor(self, callback):
"""Monitor accessor and call callback with updates and connection changes
The argument to the callback is a `PKDict` with one or more of:
error : str
error occured in the values from the callback (unlikely)
value : object
control system reported this change
connected : bool
connection state changed: True if connected
Args:
callback (callable): accepts a single `PKDict` as ag
"""
with self._lock:
self._assert_not_destroyed()
if self._callback:
raise AssertionError("may only call monitor once")
if self._cs or self._initializing:
raise AssertionError("monitor must be called before get/put")
self._callback = callback
self.__cs()
def monitor_stop(self):
"""Stops monitoring accessor"""
with self._lock:
if self._destroyed or not self._callback:
return
self._callback = None
def put(self, value):
"""Set accessor to value
Args:
value (object): Value to write to control system
"""
if not self.meta.writable:
raise AccessorPutError(f"read-only {self}")
if self.meta.py_type == bool:
v = bool(value)
elif self.meta.py_type == int:
v = int(value)
elif self.meta.py_type == float:
v = float(value)
else:
raise AccessorPutError(f"unhandled py_type={self.meta.py_type} {self}")
# ECA_NORMAL == 0 and None is normal, too, apparently
p = self.__cs()
if (e := p.put(v)) != 1:
raise DeviceError(f"put error={e} value={v} {self}")
if not p.connected:
raise DeviceError(f"disconnected {self}")
def _assert_not_destroyed(self):
if self._destroyed:
raise AssertionError(f"destroyed {self}")
def _fixup_value(self, raw):
def _reshape(image):
return image.reshape(self._image_shape)
if self.meta.py_type == bool:
return bool(raw)
if self.accessor_name == "image":
return _reshape(raw)
return raw
def _on_connection(self, **kwargs):
try:
if "conn" not in kwargs:
# This shouldn't happen
pkdlog("missing 'conn' in kwargs={}", kwargs)
self._run_callback(error="missing conn")
else:
self._run_callback(connected=kwargs["conn"])
except Exception as e:
pkdlog("error={} {} stack={}", e, self, pkdexc())
raise
def _on_value(self, **kwargs):
try:
if (v := kwargs.get("value")) is None:
pkdlog("missing 'value' in kwargs={} {}", kwargs, self)
self._run_callback(error="missing value or None")
else:
if self.meta.accessor_name == "image" and not len(v):
pkdlog("empty image received {}", self)
return
self._run_callback(value=self._fixup_value(v))
except Exception as e:
pkdlog("error={} {} stack={}", e, self, pkdexc())
raise
def __cs(self):
with self._lock:
self._assert_not_destroyed()
if self._cs:
return self._cs
if not (i := self._initializing):
self._initializing = True
if i:
self._initialized.wait(timeout=_TIMEOUT)
else:
k = (
PKDict(callback=self._on_value, auto_monitor=True)
if self._callback
else PKDict()
)
if self.accessor_name == "image":
# TODO(robnagler) this has to be done here, because you can't get accessor
# from within a monitor callback.
# TODO(robnagler) need a better way of dealing with this
self._image_shape = (self.device.get("n_row"), self.device.get("n_col"))
self._cs = epics.PV(
self.meta.csi_name,
connection_callback=self._on_connection,
connection_timeout=_TIMEOUT,
**k,
)
self._initialized.set()
return self._cs
def __repr__(self):
return f"<_Accessor {self.device.device_name}.{self.accessor_name} {self.meta.csi_name}>"
def _run_callback(self, **kwargs):
k = PKDict(accessor=self, **kwargs)
with self._lock:
c = self._callback
if c:
c(k)