Source code for PvMail.pvMail

#!/usr/bin/env python

########### SVN repository information ###################
# $Date: 2013-10-18 11:27:26 -0500 (Fri, 18 Oct 2013) $
# $Author: jemian $
# $Revision: 1448 $
# $URL: https://subversion.xray.aps.anl.gov/bcdaext/pvMail/src/PvMail/pvMail.py $
# $Id: pvMail.py 1448 2013-10-18 16:27:26Z jemian $
########### SVN repository information ###################

'''
===================================
pvMail: combined CLI and GUI
===================================

Functionally based on pvMail UNIX shell script written in 1999.

Summary
--------

Watches an EPICS PV and sends email when it changes from 0 to 1.
PV value can be either integer or float.

.. note::
   When "running", wait for trigger PV to go from 0 to 1.  When that
   happens, fetch mail message from message PV.  Then, send that
   message out to each of the email addresses.  The message 
   content is prioritized for view on a small-screen device such 
   as a pager or a PDA or smartphone.

:author: Kurt Goetze (original version)
:author: Pete Jemian (this version)
:organization: AES/BCDA, Advanced Photon Source, Argonne National Laboratory

:license: Copyright (c) 2011, UChicago Argonne, LLC
:license: Copyright (c) 2009, The University of Chicago, as Operator of Argonne
     National Laboratory.
:license: Copyright (c) 2009 The Regents of the University of California, as
     Operator of Los Alamos National Laboratory.
:license: This file is distributed subject to a Software License Agreement found
     in the file LICENSE that is included with this distribution. 

:note: Version History:
:note: 05.09.07  kag  Initial alpha version.  Needs testing.
:note: 2009-12-02 prj: converted to use wxPython (no Tkinter or Pmw)
:note: 2011-11-23 prj: complete rewrite using PyEpics 
    and combined GUI (Traits) and CLI
    
:requires: EPICS system (http://www.aps.anl.gov/epics) 
    with at least two process variables (PVs)
    where the "Trigger PV" toggles between values of 0 and 1
    and the "SendMessage PV" contains a string to send as part of 
    the email message.
:requires: PyEpics (http://cars9.uchicago.edu/software/python/pyepics3/)
:requires: Traits (http://code.enthought.com/projects/traits/)
'''


import argparse
import datetime
import epics
import logging
import os
import socket
import sys
import threading
import time
import traceback


__yyyymmdd__ = str(datetime.datetime.now()).split()[0]
__project_name__ = "PvMail"
__description__ = "Watch an EPICS PV. Send email when it changes from 0 to 1."
__svnid__ = "$Id: pvMail.py 1448 2013-10-18 16:27:26Z jemian $"
__version__ = "3"
__minor_version__ = "0.2"
__revision__ = __svnid__.split(" ")[2]
#__full_version__ = "%s.%s-r%s" % (__version__, __minor_version__, __revision__) 
__full_version__ = "v%s.%s, %s" % (__version__, __minor_version__, __yyyymmdd__) 
__author__ = "Pete Jemian"
__institution__ = "Advanced Photon Source, Argonne National Laboratory"
__author_email__= "jemian@anl.gov"
__url__ = "http://subversion.xray.aps.anl.gov/admin_bcdaext/pvMail"
__license__ = "(c) 2009-2012, UChicago Argonne, LLC"
__license__ += " (see LICENSE file for details)"
# create & install console_scripts in <python>/bin
__console_scripts__ = ['pvMail = PvMail.pvMail:main', ]

LOG_FILE = "pvMail-%d.log" % os.getpid()
RETRY_INTERVAL_S = 0.2
CHECKPOINT_INTERVAL_S = 5 * 60.0

gui_object = None


[docs]class PvMail(threading.Thread): ''' Watch an EPICS PV (using PyEpics interface) and send an email when the PV changes from 0 to 1. ''' def __init__(self): self.trigger = False self.message = "default message" self.subject = "pvMail.py" self.triggerPV = "" self.messagePV = "" self.recipients = [] self.old_value = None self.monitoredPVs = [] self.ca_timestamp = None
[docs] def basicChecks(self): ''' check for valid inputs, raise exceptions as discovered, otherwise no return result ''' if len(self.recipients) == 0: msg = "need at least one email address for list of recipients" raise RuntimeWarning, msg fmt = "could not connect to %s PV: %s" parts = {'message': self.messagePV, 'trigger': self.triggerPV} for name, pv in parts.items(): if len(pv) == 0: raise RuntimeWarning, "no name for the %s PV" % name if pv not in self.monitoredPVs: if self.testConnect(pv, timeout=0.5) is False: raise RuntimeWarning, fmt % (name, pv)
[docs] def testConnect(self, pvname, timeout=5.0): ''' create PV, wait for connection, return connection state (True | False) adapted from PyEpics __createPV() method ''' logger("test connect with %s" % pvname) retry_interval_s = 0.0001 start_time = time.time() thispv = epics.PV(pvname) thispv.connect() while not thispv.connected: time.sleep(retry_interval_s) epics.ca.poll() if time.time()-start_time > timeout: break return thispv.connected
[docs] def do_start(self): '''start watching for triggers''' logger("do_start") if len(self.monitoredPVs) == 0: self.basicChecks() logger("passed basicChecks(), starting monitors") epics.camonitor(self.messagePV, callback=self.receiveMessageMonitor) epics.camonitor(self.triggerPV, callback=self.receiveTriggerMonitor) self.old_value = epics.caget(self.triggerPV) self.message = epics.caget(self.messagePV) # What happens if either self.messagePV or self.triggerPV # are changed after self.do_start() is called? # Keep a local list of what was initiated. self.monitoredPVs.append(self.messagePV) self.monitoredPVs.append(self.triggerPV)
[docs] def do_stop(self): '''stop watching for triggers''' logger("do_stop") for pv in self.monitoredPVs: epics.camonitor_clear(pv) # no problem if pv was not monitored self.monitoredPVs = []
[docs] def do_restart(self): '''restart watching for triggers''' self.do_stop() self.do_start()
[docs] def receiveMessageMonitor(self, value, **kw): '''respond to EPICS CA monitors on message PV''' logger("%s = %s" % (self.messagePV, value)) self.message = value
[docs] def receiveTriggerMonitor(self, value, **kw): '''respond to EPICS CA monitors on trigger PV''' logger("%s = %s" % (self.triggerPV, value)) # print self.old_value, type(self.old_value), value, type(value) if self.old_value == 0: if value == 1: self.ca_timestamp = None # Cannot use this definition: # self.trigger = (value == 1) # since the trigger PV just may transition back # to zero before SendMessage() runs. self.trigger = True pv = epics._MONITORS_[self.triggerPV] self.ca_timestamp = pv.timestamp # or epics.ca.get_timestamp(pv.chid) SendMessage(self) self.old_value = value
[docs] def send_test_message(self): ''' sends a test message, used for development only ''' logger("send_test_message") self.recipients = ["jemian", "prjemian"] message = '' message += 'host: %s\n' % socket.gethostname() message += 'date: %s (UNIX, not PV)\n' % datetime.datetime.now() message += 'program: %s\n' % sys.argv[0] message += 'trigger PV: %s\n' % self.triggerPV message += 'message PV: %s\n' % self.messagePV message += 'recipients: %s\n' % ", ".join(self.recipients) self.subject = "pvMail development test" sendMail(self.subject, self.message, self.recipients)
[docs]class SendMessage(threading.Thread): ''' initiate sending the message in a separate thread :param obj pvm: instance of PvMail object on which to report ''' def __init__(self, pvm): logger("SendMessage") pvm.trigger = False # triggered event received try: pvm.basicChecks() pvm.subject = "pvMail.py: " + pvm.triggerPV msg = '' # start with a new message msg += "\n\n" msg += pvm.message msg += "\n\n" msg += 'user: %s\n' % os.environ['LOGNAME'] msg += 'host: %s\n' % socket.gethostname() msg += 'date: %s (UNIX, not PV)\n' % datetime.datetime.now() try: msg += 'CA_timestamp: %d\n' % pvm.ca_timestamp except: msg += 'CA_timestamp: not available\n' msg += 'program: %s\n' % sys.argv[0] msg += 'PID: %d\n' % os.getpid() msg += 'trigger PV: %s\n' % pvm.triggerPV msg += 'message PV: %s\n' % pvm.messagePV msg += 'recipients: %s\n' % ", ".join(pvm.recipients) pvm.message = msg sendMail(pvm.subject, msg, pvm.recipients) logger("message(s) sent") except: err_msg = traceback.format_exc() final_msg = "pvm.subject = %s\nmsg = %s\ntraceback: %s" % (pvm.subject, str(msg), err_msg) logger(final_msg)
[docs]def sendMail(subject, message, recipients): ''' send an email message using sendmail :param str subject: short text for email subject :param str message: full text of email body :param [str] recipients: list of email addresses to receive the message ''' global gui_object from_addr = sys.argv[0] to_addr = str(" ".join(recipients)) cmd = 'the mail configuration has not been set yet' if 'el' in str(os.uname()): # RHEL uses postfix email_program = '/usr/lib/sendmail' mail_command = "%s -F %s -t %s" % (email_program, from_addr, to_addr) mail_message = [mail_command, "Subject: "+subject, message] cmd = '''cat << +++ | %s\n+++''' % "\n".join(mail_message) if 'Ubuntu' in str(os.uname()): # some Ubuntu (11) uses exim, some (10) postfix email_program = '/usr/bin/mail' mail_command = "%s %s" % (email_program, to_addr) content = '%s\n%s' % (subject, message) # TODO: needs to do THIS ''' cat /tmp/message.txt | mail jemian@anl.gov ''' cmd = '''echo %s | %s''' % (content, mail_command) msg = "sending email to: %s" % to_addr logger(msg) if gui_object is not None: gui_object.SetStatus(msg) # FIXME: get the GUI status line to update try: logger( "email command:\n" + cmd ) if os.path.exists(email_program): os.popen(cmd) # send the message else: logger( 'email program (%s) does not exist' % email_program ) except: err_msg = traceback.format_exc() final_msg = "cmd = %s\ntraceback: %s" % (cmd, err_msg) logger(final_msg)
[docs]def logger(message): ''' log a report from this class. :param str message: words to be logged ''' now = datetime.datetime.now() name = sys.argv[0] name = os.path.basename(name) text = "(%s,%s) %s" % (name, now, message) logging.info(text)
[docs]def basicStartTest(): '''simple test of the PvMail class''' logging.basicConfig(filename=LOG_FILE,level=logging.INFO) logger("startup") pvm = PvMail() pvm.recipients = ['prjemian@gmail.com'] pvm.triggerPV = "pvMail:trigger" pvm.messagePV = "pvMail:message" retry_interval_s = 0.05 end_time = time.time() + 60 report_time = time.time() + 5.0 pvm.do_start() while time.time() < end_time: if time.time() > report_time: report_time = time.time() + 5.0 logger("time remaining: %.1f seconds ..." % (end_time - time.time())) time.sleep(retry_interval_s) pvm.do_stop()
[docs]def basicMailTest(): '''simple test sending mail using the PvMail class''' pvm = PvMail() pvm.send_test_message()
[docs]def cli(results): ''' command-line interface to the PvMail class :param obj results: default parameters from argparse, see main() ''' logging_interval = min(60*60, max(5.0, results.logging_interval)) sleep_duration = min(5.0, max(0.0001, results.sleep_duration)) pvm = PvMail() pvm.triggerPV = results.trigger_PV pvm.messagePV = results.message_PV pvm.recipients = results.email_addresses.strip().split(",") checkpoint_time = time.time() pvm.do_start() while True: # endless loop, kill with ^C or equal if time.time() > checkpoint_time: checkpoint_time += logging_interval logger("checkpoint") if pvm.trigger: SendMessage(pvm) time.sleep(sleep_duration) pvm.do_stop() # this will never be called
[docs]def gui(results): ''' graphical user interface to the PvMail class :param obj results: default parameters from argparse, see main() ''' import traits_gui global gui_object gui_object = traits_gui.PvMail_GUI(results.trigger_PV, results.message_PV, results.email_addresses.strip().split(","), results.log_file, ) gui_object.configure_traits()
[docs]def main(): '''parse command-line arguments and choose which interface to use''' parser = argparse.ArgumentParser(description=__description__) # positional arguments # not required if GUI option is selected parser.add_argument('trigger_PV', action='store', nargs='?', help="EPICS trigger PV name", default="") parser.add_argument('message_PV', action='store', nargs='?', help="EPICS message PV name", default="") parser.add_argument('email_addresses', action='store', nargs='?', help="email address(es), comma-separated if more than one", default="") # optional arguments parser.add_argument('-l', action='store', dest='log_file', help="for logging program progress and comments", default=LOG_FILE) parser.add_argument('-i', action='store', dest='logging_interval', type=float, help="checkpoint reporting interval (s) in log file", default=CHECKPOINT_INTERVAL_S) parser.add_argument('-r', action='store', dest='sleep_duration', type=float, help="sleep duration (s) in main event loop", default=RETRY_INTERVAL_S) parser.add_argument('-g', '--gui', action='store_true', default=False, dest='interface', help='Use the graphical rather than command-line interface') parser.add_argument('-v', '--version', action='version', version=__full_version__) results = parser.parse_args() addresses = results.email_addresses.strip().split(",") interface = {False: 'command-line', True: 'GUI'}[results.interface] logging.basicConfig(filename=results.log_file, level=logging.INFO) logger("#"*60) logger("startup") logger("trigger PV = " + results.trigger_PV) logger("message PV = " + results.message_PV) logger("email list = " + str(addresses) ) logger("log file = " + results.log_file) logger("logging interval = " + str( results.logging_interval ) ) logger("sleep duration = " + str( results.sleep_duration ) ) logger("interface = " + interface) logger("user = " + os.environ['LOGNAME']) logger("host = " + socket.gethostname() ) logger("program = " + sys.argv[0] ) logger("PID = " + str(os.getpid()) ) if results.interface is False: # When the GUI is not selected, # ensure the positional arguments are given tests = [ len(results.trigger_PV), len(results.message_PV), len(" ".join(addresses)), ] if 0 in tests: parser.print_usage() sys.exit() # call the interface {False: cli, True: gui}[results.interface](results)
if __name__ == '__main__': # ./pvMail.py pvMail:trigger pvMail:message jemian if False: # code development only sys.argv.append('pvMail:trigger') sys.argv.append('pvMail:message') sys.argv.append('jemian') sys.argv.append('-l mylog.log') main()