Source code for pvWebMonitor.pvwatch

'''
pvWebMonitor.pvwatch
'''

# Copyright (c) 2005-2020, UChicago Argonne, LLC.
# See LICENSE file for details.


import datetime
import epics
import fnmatch
from lxml import etree
import numpy
import os
import time
import utils


XML_SCHEMA_FILE = 'pvlist.xsd'
XML_RAWDATA_FILE_NAME = 'rawdata.xml'
XSL_PVLIST_FILE_NAME = 'pvlist.xsl'
XSL_RAWDATA_FILE_NAME = 'rawdata.xsl'
XSL_INDEX_FILE_NAME = 'index.xsl'


[docs]class PvNotRegistered(Exception): '''pv not in pvdb''' pass
[docs]class CouldNotParseXml(Exception): '''Could not parse XML file''' pass
def _xslt_(xslt_file, source_xml_file): ''' convenience routine for XSLT transformations For a given XSLT file *abcdefg.xsl*, will produce a file *abcdefg.html*:: abcdefg.xsl + xml_data --> abcdefg.html ''' output_xml_file = os.path.splitext(xslt_file)[0] + os.extsep + 'html' utils.xslt_transformation(xslt_file, source_xml_file, output_xml_file)
[docs]class PvWatch(object): ''' Core function of the pvWebMonitor package To call this code, first define ``configuration=dict()`` with terms as defined in :meth:`read_config.read_xml`, then statements such as: .. code-block:: python :linenos: watcher = PvWatch(configuration) watcher.start() ''' def __init__(self, configuration): self.configuration = configuration # from XML configuration file self.pvdb = {} # cache of last known good values self.xref = {} # cross-reference between mnemonics and PV names: {mne:pvname} self.monitor_counter = 0 self.upload_patterns = configuration['PATTERNS'] self.get_pvlist() utils.logMessage('read list of PVs to monitor') pv_conn = [pv['ch'].connected for pv in self.pvdb.values()] numConnected = numpy.count_nonzero(pv_conn) utils.logMessage("Connected %d of total %d EPICS PVs" % (numConnected, len(self.pvdb)) )
[docs] def start(self): '''begin receiving PV updates and posting new web content''' nextReport = utils.getTime() nextLog = nextReport delta_report = datetime.timedelta(seconds=self.configuration['REPORT_INTERVAL_S']) delta_log = datetime.timedelta(seconds=self.configuration['LOG_INTERVAL_S']) mainLoopCount = 0 while True: mainLoopCount = (mainLoopCount + 1) % self.configuration['MAINLOOP_COUNTER_TRIGGER'] dt = utils.getTime() epics.ca.poll() if mainLoopCount == 0: utils.logMessage(" %s times through main loop" % self.configuration['MAINLOOP_COUNTER_TRIGGER']) if dt >= nextReport: nextReport = dt + delta_report try: self.report() # write contents of pvdb to a file except Exception: utils.logException("report()") if dt >= nextLog: nextLog = dt + delta_log msg = "checkpoint, %d EPICS monitor events received" % self.monitor_counter utils.logMessage(msg) self.monitor_counter = 0 # reset time.sleep(self.configuration['SLEEP_INTERVAL_S'])
[docs] def get_pvlist(self): '''get the PVs from the XML file''' pvlist_file = self.configuration['PVLIST_FILE'] if not os.path.exists(pvlist_file): utils.logMessage('could not find file: ' + pvlist_file) return try: tree = etree.parse(pvlist_file) except Exception as exc: msg = 'could not parse file: ' + pvlist_file + ", " + str(exc) utils.logMessage(msg) raise CouldNotParseXml(msg) utils.validate(tree, XML_SCHEMA_FILE) msg = 'validated file: ' + pvlist_file utils.logMessage(msg) for key in tree.findall(".//EPICS_PV"): if key.get("_ignore_", "false").lower() == "false": mne = key.get("mne") pv = key.get("PV") desc = key.get("description") fmt = key.get("display_format", "%s") # default format as_string = key.get("as_string", False) # default format # :see: http://cars9.uchicago.edu/software/python/pyepics3/pv.html?highlight=as_string#pv.get try: self.add_pv(mne, pv, desc, fmt, as_string) except: msg = "%s: problem connecting: %s" % (pvlist_file, etree.tostring(key)) utils.logException(msg) utils.logMessage('all PVs added')
[docs] def add_pv(self, mne, pv, desc, fmt, as_string): '''Connect to a EPICS (PyEpics) process variable''' if pv in self.pvdb: msg = "key '%s' already defined by id=%s" % (pv, self.pvdb[pv]['id']) raise KeyError(msg) ch = epics.PV(pv) entry = { 'name': pv, # EPICS PV name 'id': mne, # symbolic name used in the python code 'description': desc, # text description for humans 'timestamp': None, # client time last monitor was received 'counter': 0, # number of monitor events received 'units': "", # engineering units 'ch': ch, # EPICS PV channel 'format': fmt, # format for display 'value': None, # formatted value 'raw_value': None, # unformatted value 'char_value': None, # string value 'as_string': as_string, # whether to return the string representation of the value } self.pvdb[pv] = entry self.xref[mne] = pv # mne is local mnemonic, define actual PV in pvlist.xml ch.add_callback(self.EPICS_monitor_receiver) # start callbacks now cv = ch.get_ctrlvars() unit_renames = { # handle some non SI unit names # old new 'millime': 'mm', 'millira': 'mr', 'degrees': 'deg', 'Volts': 'V', 'VDC': 'V', 'eng': '', } if cv is not None and 'units' in cv: units = cv['units'] if units in unit_renames: units = unit_renames[units] entry['units'] = units # report the RTYP (record type, if available) basename = pv.split('.')[0] field = pv[len(basename):] rtyp_pv = epics.PV(basename + '.RTYP') rtyp = rtyp_pv.get() or 'unknown' if basename == pv or field == '.VAL': entry['record_type'] = rtyp else: # field of record entry['record_type'] = rtyp + field # FIXME: what to do if PV did not connect? (ch.connected == False) if not ch.connected: utils.logMessage('PV not connected yet: ' + pv) self.update_pvdb(pv, ch.get()) # initialize the cache
[docs] def add_file_pattern(self, pattern): ''' add ``pattern`` as an additional file extension pattern Any file with extension matching any of the patterns in ``self.upload_patterns`` will copied to the WWW directory, if they are newer. ''' self.upload_patterns.append(pattern)
[docs] def update_pvdb(self, pv, raw_value): ''' log PV value to the cache in pvdb :param str pv: name of EPICS PV :param obj raw_value: could be str, float, int, or ... ''' if pv not in self.pvdb: raise PvNotRegistered( '!!!ERROR!!! PV %s was not found in pvdb!', pv ) entry = self.pvdb[pv] ch = entry['ch'] entry['timestamp'] = utils.getTime() entry['counter'] += 1 entry['raw_value'] = raw_value entry['char_value'] = ch.char_value if entry['as_string']: entry['value'] = ch.char_value else: entry['value'] = entry['format'] % raw_value
[docs] def EPICS_monitor_receiver(self, *args, **kws): '''Response to an EPICS (PyEpics) monitor on the channel''' pv = kws['pvname'] if pv not in self.pvdb: msg = '!!!ERROR!!! %s was not found in pvdb!' % pv raise PvNotRegistered, msg self.update_pvdb(pv, kws['value']) # cache the last known good value self.monitor_counter += 1
[docs] def buildReport(self): '''build the report''' root = etree.Element("pvWebMonitor") root.set("version", "1") node = etree.SubElement(root, "written_by") node.text = 'pvWebMonitor/PvWatch' node = etree.SubElement(root, "datetime") node.text = str(utils.getTime()).split('.')[0] sorted_id_list = sorted(self.xref) fields = ("name", "id", "description", "timestamp", "record_type", "counter", "units", "value", "char_value", "raw_value", "format") for mne in sorted_id_list: pv = self.xref[mne] entry = self.pvdb[pv] node = etree.SubElement(root, "pv") node.set("id", mne) node.set("name", pv) for item in fields: subnode = etree.SubElement(node, item) subnode.text = str(entry[item]) try: pi_xml = etree.ProcessingInstruction('xml', 'version="1.0"') xmlText = etree.tostring(pi_xml, pretty_print=True) except ValueError: # some instanced of lxml raise a ValueError saying that 'xml' is not allowed xmlText = '<?xml version="1.0" ?>\n' pi_xsl = etree.ProcessingInstruction('xml-stylesheet', 'type="text/xsl" href="pvlist.xsl"') xmlText += etree.tostring(pi_xsl, pretty_print=True) xmlText += etree.tostring(root, pretty_print=True) return xmlText
[docs] def report(self): ''' write the values out to files The values of the monitored EPICS PVs (the "raw data") is written to an XML file. This file is then used with one or more XSLT stylesheets to create HTML pages. An overall "home page" (index.html) is created to provide a table of contents of this static web site. ''' xmlText = self.buildReport() utils.writeFile(XML_RAWDATA_FILE_NAME, xmlText) # accumulate list of each file written below www_site_file_list = [] xslt_file_list_used = ['index.xsl', ] # do the index.xsl file last www_site_file_list.append(XML_RAWDATA_FILE_NAME) # add pvlist.xml to file list pvlist_xml_file_name = self.configuration['PVLIST_FILE'] www_site_file_list.append(pvlist_xml_file_name) # add pvlist.xsl to file list xslt_file_name = XSL_PVLIST_FILE_NAME if os.path.exists(xslt_file_name): _xslt_(xslt_file_name, pvlist_xml_file_name) xslt_file_list_used.append(xslt_file_name) # add report.xml to file list report_xml_file_name = XML_RAWDATA_FILE_NAME if os.path.exists(report_xml_file_name): # write "report.xml" : values of monitored EPICS PVs www_site_file_list.append(report_xml_file_name) xslt_file_name = XSL_RAWDATA_FILE_NAME if os.path.exists(xslt_file_name): _xslt_(xslt_file_name, report_xml_file_name) xslt_file_list_used.append(xslt_file_name) # convert all .xsl files xslt_files = fnmatch.filter(os.listdir('.'), '*.xsl') for xslt_file_name in xslt_files: if xslt_file_name not in xslt_file_list_used: _xslt_(xslt_file_name, report_xml_file_name) # finally, write index.html from file list, table of files and descriptions as provided xslt_file_name = XSL_INDEX_FILE_NAME if os.path.exists(xslt_file_name): # TODO: each XSLT file has a "description" attribute # This could be used when building "index.html" file # For now, this is manually copied from .xsl file to the table in index.xsl # To automate this process, a new, temporary XML document will need to be # created with the names and descriptions of all HTML pages. # Then use that XML in the following XSLT. # Also should add a time stamp string. _xslt_(xslt_file_name, report_xml_file_name) # include any other useful files from the project directory local_files = os.listdir('.') for file_pattern in self.upload_patterns: www_site_file_list += fnmatch.filter(local_files, file_pattern) www_site_file_list += fnmatch.filter(local_files, file_pattern.upper()) # only copy files if web_site_path is not the current dir www_site_file_list = sorted(set(www_site_file_list)) www_site_path = os.path.abspath(self.configuration['LOCAL_WWW_LIVEDATA_DIR']) if www_site_path != os.path.abspath(os.getcwd()): for fname in www_site_file_list: utils.copyToWebServer(fname, www_site_path)