Source code for slicops.pkcli.epics

"""SlicOps EPICS utilities.

:copyright: Copyright (c) 2024 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy).  All Rights Reserved.
:license: http://github.com/slaclab/slicops/LICENSE
"""

from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdlog, pkdp
import epics
import os
import pykern.pkcli
import pykern.pkio
import subprocess
import threading
import time

# Local so should connect quickly
_SIM_DETECTOR_TIMEOUT = 5
_LOG_BASE = "sim_detector.log"
_CAM1_STATUS_PV = "13SIM1:cam1:ShutterMode"
_CAM1_CONTROL_PV = "13SIM1:cam1:TriggerMode"
_INITIAL_TARGET_STATUS = 1


[docs] def init_sim_detector(): """Initialize the ADSimDetector module with useful default values.""" for name, value in { "13SIM1:cam1:SimMode": 1, "13SIM1:cam1:PeakStartX": 600, "13SIM1:cam1:PeakStartY": 400, "13SIM1:cam1:PeakNumX": 1, "13SIM1:cam1:PeakNumY": 2, "13SIM1:cam1:PeakStepX": 300, "13SIM1:cam1:PeakStepY": 140, "13SIM1:cam1:PeakWidthX": 90, "13SIM1:cam1:PeakWidthY": 60, "13SIM1:cam1:PeakVariation": 10, "13SIM1:cam1:Noise": 50, "13SIM1:cam1:Gain": 100, "13SIM1:cam1:AcquirePeriod": 0.5, "13SIM1:cam1:SizeX": 1024, "13SIM1:cam1:SizeY": 768, "13SIM1:image1:EnableCallbacks": 1, _CAM1_STATUS_PV: _INITIAL_TARGET_STATUS, _CAM1_CONTROL_PV: 0, }.items(): pv = epics.PV(name) v = pv.put(value, wait=True, timeout=_SIM_DETECTOR_TIMEOUT) if not pv.connected: raise RuntimeError(f"PV={name} failed to connect") if v is None: raise RuntimeError(f"PV={name} ")
[docs] def sim_detector(ioc_sim_detector_dir=None): """Run an EPICS IOC Sim Detector Args: ioc_sim_detector_dir (str, optional): Directory containing EPICS code for iocSimDetector """ # TODO(robnagler) use https://github.com/ralphlange/procServ # Macs don't have /dev/stdin|out so /dev/tty is more portable def _app_path(dir_path): p = dir_path.join("../../bin/*/simDetectorApp") f = pykern.pkio.sorted_glob(p) if len(f) == 0: pykern.pkcli.command_error("no files matching pattern={}", p) if len(f) > 1: pykern.pkcli.command_error("too many simDetectorApps={}", f) return str(f[0]) def _dir(): return pykern.pkio.py_path( ioc_sim_detector_dir or "~/.local/epics/extensions/synApps/support/areaDetector-R3-12-1/ADSimDetector/iocs/simDetectorIOC/iocBoot/iocSimDetector" ) def _log(): f = pykern.pkio.py_path(_LOG_BASE) pkdlog("log: {}", f) return f.open("w+") def _st_cmd(dir_path): """POSIT: st.cmd contains `<envPaths <st_base.cmd` so we don't have to write a temporary file. """ return dir_path.join("envPaths").read("rb") + dir_path.join("st_base.cmd").read( "rb" ) def _watch_target(): def _control_monitor(pvname, value, **kwargs): # control value: 0: move out, 1: move in # status: 1: out, 2: in state.new_status = value + 1 state.trigger.set() def _watch_status(): while True: state.trigger.wait() state.trigger.clear() if state.status != state.new_status: state.status_pv.put(0, wait=True, timeout=3) time.sleep(2) state.status = state.new_status state.status_pv.put(state.status, wait=True, timeout=3) state = PKDict( new_status=_INITIAL_TARGET_STATUS, status=_INITIAL_TARGET_STATUS, status_pv=epics.PV(_CAM1_STATUS_PV), trigger=threading.Event(), ) epics.PV(_CAM1_CONTROL_PV).add_callback(_control_monitor) threading.Thread(target=_watch_status, daemon=True).start() d = _dir() with _log() as o: p = subprocess.Popen( [_app_path(d)], stdin=subprocess.PIPE, stdout=o, stderr=subprocess.STDOUT, ) try: p.stdin.write(_st_cmd(d)) p.stdin.flush() # do not close: input hangs, because ioc exits at EOF pkdlog("started pid={}; sleep 2 seconds", p.pid) # Wait a little bit for the process to initialize and print time.sleep(2) pkdlog("initializing sim detector") init_sim_detector() _watch_target() pkdlog("waiting for pid={} to exit", p.pid) p.wait() except KeyboardInterrupt: pass finally: if p.poll() is not None: p.terminate() time.sleep(1) p.kill()