Source code for slicops.sliclet

"""Base for sliclets

:copyright: Copyright (c) 2025 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy).  All Rights Reserved.
:license: http://github.com/slaclab/slicops/LICENSE
"""

from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp, pkdformat
import asyncio
import contextlib
import enum
import importlib
import inspect
import itertools
import pykern.pkconfig
import pykern.pkconst
import pykern.pkinspect
import pykern.pkio
import pykern.util
import queue
import re
import slicops.config
import slicops.ctx
import slicops.field
import threading


class _Work(enum.IntEnum):
    error = 1
    session_end = 2
    ctx_write = 3
    start = 4


_ON_METHODS_RE = re.compile(r"^on_(click|change)_(\w+)$")

_CTX_WRITE_ARGS = frozenset(["field_values"])

_NAMES = None


[docs] def instance(name, queue): def _import(name): # TODO(robnagler) move to pykern, copied from sirepo.util # NOTE: fixed a bug (s = None) s = None for p in slicops.config.cfg().package_path: n = None try: n = f"{p}.sliclet.{name}" return importlib.import_module(n) except ModuleNotFoundError as e: if n is not None and n != e.name: # import is failing due to ModuleNotFoundError in a sub-import # not the module we are looking for raise s = pkdexc() pass # gives more debugging info (perhaps more confusion) if s: pkdc(s) raise AssertionError( f"cannot find sliclet={name} in package_path={slicops.config.cfg().package_path}" ) if not name: name = _cfg.default return _import(name).CLASS(name, queue)
[docs] def names(): def _find(): # TODO(robnagler) move to pykern, copied from sirepo for p in slicops.config.cfg().package_path: yield pykern.pkinspect.package_module_names(f"{p}.sliclet") global _NAMES if _NAMES is None: _NAMES = tuple(sorted(itertools.chain.from_iterable(_find()))) return _NAMES
[docs] class Base: def __init__(self, name, ctx_update_q): self.name = name self.title = self.__class__.__name__ self.__loop = asyncio.get_event_loop() self.__ctx_update_q = ctx_update_q # This might fail due to errors in the yaml self.__locked = False self.__ctx = slicops.ctx.Ctx(self.name, self.title) self.__work_q = queue.Queue() self.__lock = threading.RLock() self.__on_methods = self.__inspect_on_methods() txn = slicops.ctx.Txn(self.__ctx) self.handle_init(txn) txn.commit(None) self.__ctx_update(self.__ctx.first_time()) self.__put_work(_Work.start, None) self.__thread = threading.Thread(target=self.__run, daemon=True) self.__thread.start()
[docs] def ctx_write(self, field_values): for k, v in field_values.items(): if not (f := self.__ctx.fields.get(k)): raise pykern.util.APIError("unknown field={}", k) # This is pre-emptive so errors make sense in context of write if isinstance((v := f.value_check(v)), slicops.field.InvalidFieldValue): raise pykern.util.APIError("invalid value for field={} error={}", k, v) self.__put_work(_Work.ctx_write, field_values)
[docs] def handle_destroy(self): pass
[docs] def handle_init(self, txn): pass
[docs] def handle_start(self, txn): pass
[docs] @contextlib.contextmanager def lock_for_update(self, log_op=None): ok = True try: with self.__lock: if self.__locked: ok = False raise AssertionError("may only lock once") txn = None try: self.__locked = True txn = slicops.ctx.Txn(self.__ctx) yield txn except Exception: if txn: txn.rollback() raise else: txn.commit(self.__ctx_update) finally: self.__locked = False except Exception as e: if not ok: raise if not (d := str(e)): d = str(type(e)) d = f"error={d}" try: if log_op: d += f" op={log_op}" except Exception as e2: pkdlog("error={} during exception stack={}", e2, pkdexc(simplify=True)) if not isinstance(e, pykern.util.APIError): pkdlog("stack={}", pkdexc(simplify=True)) pkdlog("ERROR {}", d) self.__put_work(_Work.error, PKDict(desc=d))
[docs] def put_exception(self, exc): self.__put_work(_Work.error, exc)
[docs] def save_file_path(self): return _cfg.save_file_root.join(self.__class__.__name__).ensure(dir=True)
[docs] def session_end(self): self.__put_work(_Work.session_end, None)
def __inspect_on_methods(self): rv = PKDict() for k, v in inspect.getmembers(self): if (m := _ON_METHODS_RE.search(k)) and inspect.ismethod(v): if m.group(2) in rv: raise AssertionError( f"only one of on_click or on_change field={m.group(2)}" ) rv[m.group(2)] = PKDict(kind=m.group(1), func=v) return rv def __put_work(self, work, arg): self.__work_q.put_nowait((work, arg)) def __run(self): def _destroy(): try: self.handle_destroy() except Exception: pass try: # Try to end session. Might already be ended self.__loop.call_soon_threadsafe(self.__ctx_update_q.put_nowait, None) except Exception: pass try: while True: w = a = None try: w, a = self.__work_q.get() self.__work_q.task_done() if not getattr(self, f"_work_{w._name_}")(a): return except Exception as e: pkdlog("{}={} error={} stack={}", w, a, e, pkdexc()) if w == _Work.error: pkdlog("error during error handling error={}", e) return self.__put_work(_Work.error, f"error={e} op={w}") except Exception as e: try: pkdlog("error={} stack={}", e, pkdexc()) self._work_error(e) except Exception: pass finally: _destroy() def __ctx_update(self, result): self.__loop.call_soon_threadsafe(self.__ctx_update_q.put_nowait, result) def _work_error(self, msg): self.__ctx_update( msg if isinstance(msg, pykern.util.APIError) else pykern.util.APIError("{}", msg) ) return False def _work_ctx_write(self, field_values): def _change(updates): for u in updates: if u.on_method.kind == "change": u.pkdel("on_method").func(**u) else: yield u def _click(updates): for u in updates: u.pkdel("on_method").func(**u) def _updates(): m = self.__on_methods for k, v in field_values.items(): if c := txn.field_value_set_via_api(k, v, m.get(k)): yield c with self.lock_for_update(log_op="ctx_write") as txn: _click(tuple(_change(sorted(_updates(), key=lambda x: x.field_name)))) return True def _work_session_end(self, unused): # TODO(robnagler) maybe if there are too many errors fail or stop logging? return False def _work_start(self, unused): with self.lock_for_update(log_op="start") as txn: self.handle_start(txn=txn) return True
def _cfg_py_path(value): if isinstance(value, str): if not os.path.isabs(value): pykern.pkconfig.raise_error(f"not an absolute path={value}") return pykern.pkio.py_path(value) if not isinstance(value, pykern.pkconst.PY_PATH_LOCAL_TYPE): pykern.pkconfig.raise_error(f"not a py.path type={type(value)} value={value}") return value def _init(): global _cfg def _path(): rv = (_cfg_py_path, "root path for screen save files") if pykern.pkconfig.in_dev_mode(): return (pykern.util.dev_run_dir(_path).join("save").ensure(dir=True),) + rv return pykern.pkconfig.Required(rv) _cfg = pykern.pkconfig.init( default=("screen", str, "default sliclet"), save_file_root=_path(), ) _init()