Source code for slicops.sliclet.yaml_db

"""YAML Db Sliclet

:copyright: Copyright (c) 2025 The Board of Trustees of the Leland Stanford Junior University, through SLAC National Accelerator Laboratory (subject to receipt of any required approvals from the U.S. Dept. of Energy).  All Rights Reserved.
:license: http://github.com/slaclab/slicops/LICENSE
"""

from pykern.pkcollections import PKDict
from pykern.pkdebug import pkdc, pkdexc, pkdlog, pkdp
import numpy
import pykern.pkio
import slicops.pkcli.fractals
import slicops.pkcli.yaml_db
import slicops.sliclet
import watchdog.events
import watchdog.observers
import watchdog.observers.polling


_EVENT_TYPES = frozenset(
    (
        watchdog.events.EVENT_TYPE_MOVED,
        watchdog.events.EVENT_TYPE_CREATED,
        watchdog.events.EVENT_TYPE_MODIFIED,
    ),
)


[docs] class YAMLDb(slicops.sliclet.Base):
[docs] def handle_destroy(self): # TODO(robnagler) Need to make idempotent if self.__db_watcher: self.__db_watcher.destroy() self.__db_watcher = None
[docs] def handle_init(self, txn): self.__db_watcher = None self.__db_cache = PKDict() if not self.__read_db(txn): self.__write(txn) self.__db_watcher = _DBWatcher( ( slicops.pkcli.yaml_db.path(self.name), slicops.pkcli.fractals.path(), ), self.__db_watcher_update, )
[docs] def on_click_save(self, txn, **kwargs): self.__write(txn)
[docs] def on_click_revert(self, txn, **kwargs): # TODO(robnagler) the read and the ctx_put could happen outside the context self.__db_cache = PKDict() self.__read_db(txn)
def __db_watcher_update(self): if not self.__db_watcher_update: # destroyed return with self.lock_for_update() as txn: self.__read_db(txn) # TODO(robnagler) can this be encapsulated in a base prototype? # field.new would need to evaluate in a particular order. # Perhaps there should be "related:" color_map, numpy_file_field, etc. def __numpy_file(self, txn, plot, links): def _visibility(value): yield (f"{plot}.ui.visible", value) if x := links.get("color_map"): yield (f"{x}.ui.visible", value) if not (n := links.get("numpy_file")): # Not numpy field return None # Set plot always, and raw_pixels may get filled in below p = PKDict(raw_pixels=None) v = False try: if not (l := txn.field_value(n)): return None p.raw_pixels = numpy.load(l) v = True return p except Exception as e: pkdlog("numpy.load error={} path={} link={} stack={}", e, l, n, pkdexc()) finally: txn.field_value_set(plot, p) txn.multi_group_attr_set(tuple(_visibility(v))) def __read_db(self, txn): def _numpy_files(): for k in txn.field_names(): if l := txn.group_attr(k, "links"): if v := self.__numpy_file(txn, k, l): yield k, v def _on_changes(changes): # TODO(robnagler) only needed for fractals # POSIT: same as sliclet.Base._work_ctx_write for k in sorted(changes.keys()): if f := getattr(self, f"on_change_{k}", None): # TODO(robnagler) fractals only needs these f(txn=txn, value=changes[k]) def _set(db): for k in txn.field_names(): # If cache (read/wrote last time) is unchanged, # there will be no updates. Avoids churn if k in db and db[k] != self.__db_cache.get(k): txn.field_value_set(k, db[k]) yield k, db[k] if not (r := slicops.pkcli.yaml_db.read(self.name)): return False c = PKDict(_set(r)).pkupdate(_numpy_files()) self.__db_cache = r _on_changes(c) return True def __write(self, txn): def _keys(): for k in txn.field_names(): g = txn.group_attr(k, "ui") if g.get("clickable") or not g.get("writable"): continue yield k # TODO(robnagler) work: maybe should happen outside lock self.__db_cache = PKDict((k, txn.field_value(k)) for k in _keys()) slicops.pkcli.yaml_db.write(self.name, self.__db_cache)
CLASS = YAMLDb class _DBWatcher(watchdog.events.FileSystemEventHandler): def __init__(self, paths, update_op): super().__init__() self.__update_op = update_op self.__paths = str(paths) self.__observer = watchdog.observers.polling.PollingObserver() self.__observer.schedule(self, paths[0].dirname, recursive=False) self.__observer.start() def on_any_event(self, event): if event.event_type in _EVENT_TYPES and ( event.src_path in self.__paths or event.dest_path in self.__paths ): self.__update_op() def destroy(self): if self.__observer: o = self.__observer self.__observer = None o.stop()