source: moxy/trunk/src/moxy/model/pv.py @ 835

Last change on this file since 835 was 835, checked in by jemian, 12 years ago

refactoring

  • Property svn:eol-style set to native
  • Property svn:executable set to *
  • Property svn:keywords set to Author Date Id Rev Url
File size: 5.0 KB
Line 
1#!/usr/bin/env python
2
3########### SVN repository information ###################
4# $Date: 2012-04-26 15:54:14 +0000 (Thu, 26 Apr 2012) $
5# $Author: jemian $
6# $Revision: 835 $
7# $URL$
8# $Id: pv.py 835 2012-04-26 15:54:14Z jemian $
9########### SVN repository information ###################
10
11'''
12details about a model single PV connection
13
14Copyright (c) 2009 - 2011, UChicago Argonne, LLC.
15See LICENSE file for details.
16'''
17
18
19# - - - - - - - - - - - - - - - - - - Imports
20
21import epics
22from threading import Thread
23from time import sleep
24
25
26# - - - - - - - - - - - - - - - - - - Global
27
28
29__svnid__ = "$Id: pv.py 835 2012-04-26 15:54:14Z jemian $"
30
31
32# - - - - - - - - - - - - - - - - - - class
33
34class PvConnection(Thread):
35    '''
36    A single EPICS Process Variable connection
37   
38    :param str pvname: full EPICS Process Variable name
39    :param channel: PV connection objection
40    :type channel: epics.PV() object
41    :param int cb_index: callback index number from PyEpics :meth:`add_callback()`
42    :param bool connected: Is PV connected with EPICS now?
43    :param str connectedText: describes connection state for humans
44    '''
45   
46    def __init__(self, pv = ''):
47        Thread.__init__(self)
48        self.pvname = pv
49        self.channel = None
50        self.cb_index = 0
51        self.connected = False
52        self.connectedText = 'not connected'
53        self.ext_callback = None
54   
55    def connect(self, callback = None, suppliedname = None, **kw):
56        '''
57        connect to the EPICS PV
58       
59        The name given in ``self.pvname`` will override a name supplied to this method.
60        If ``self.pvname`` is not given, then use ``suppliedname`` if it is given.
61        '''
62        pvname = self.pvname.strip()
63        if len(pvname) == 0 and suppliedname is not None:
64            pvname = suppliedname
65
66        if callback is None:
67            def callback(pvname=None, value=None, char_value=None, **kwds):
68                "generic monitor callback"
69                if char_value is None:
70                    char_value = repr(value)
71                print("%.32s %s %s" % (pvname, epics.pv.fmt_time(), char_value))
72
73        if len(pvname) > 0:    # PV name must not be an empty string
74            self.channel = epics.PV(pvname, connection_callback=self.connection_cb)
75            if callback is not None:
76                self.ext_callback = callback
77                self.cb_index = self.channel.add_callback(callback, **kw)
78   
79    def disconnect(self):
80        '''stop monitoring the PV for CA callbacks and drop the channel'''
81        if self.channel is not None:
82            if self.cb_index != 0:
83                self.channel.remove_callback(self.cb_index)
84                self.cb_index = 0
85            self.channel.disconnect()
86            self.connected = False
87            self.channel = None
88            self.ext_callback = None
89
90    def connection_cb(self, **metadata):
91        "PV connection callback"
92        self.connected = metadata['conn']       # connection state
93        self.connectedText = {True: 'connected', 
94                              False: 'not connected'}[self.connected]
95        if self.ext_callback is not None:
96            self.ext_callback(**metadata)
97
98
99# - - - - - - - - - - - - - - - - - - methods
100
101
102def pvIsAvailable( pv, timeout = None ):
103    '''
104    Is this PV available?
105   
106    :param str pv:  EPICS process variable name
107    :param float timeout:  seconds to wait before giving up
108    :return:  Is this PV available?
109    :rtype: bool
110    '''
111    ch = epics.PV( pv )
112    ch.connect( timeout )
113    return ch.wait_for_connection( timeout )
114
115
116
117def GetBasePv( pv ):
118    '''
119    Returns the record type of "pv"
120   
121    :param str pv:  EPICS process variable name
122    :return: EPICS record type or None if cannot connect
123    :rtype: str
124    '''
125    if len(pv) == 0:
126        return None
127    base = pv.split('.')[0]
128    return base
129
130
131def GetRTYP( pv ):
132    '''
133    Returns the record type of "pv"
134   
135    :param str pv:  EPICS process variable name
136    :return: EPICS record type or None if cannot connect
137    :rtype: str
138    '''
139    base = GetBasePv(pv)
140    if pvIsAvailable(base):
141        return epics.caget(base + '.RTYP')
142    else:
143        return None
144
145
146def pvIsMotorRec( pv ):
147    '''
148    test if the given PV is a motor record
149   
150    :param pv: [string] EPICS Process Variable name
151    '''
152    return GetRTYP(pv) == 'motor'
153
154
155def __demo_waitmove(pv, position, seconds):
156    '''
157    Move this motor pv and wait for a fixed time.
158   
159    .. note: Only use this method for demonstration purposes.
160    '''
161    print "moving", pv, " to ", position
162    epics.caput(pv, position)
163    for i in range(seconds, 0, -1):
164        print i, " seconds remain"
165        sleep(1)
166    print "done waiting"
167
168
169def _demo_():
170    '''demonstrate use of this module'''
171    pv = "prj:m1"
172    if not pvIsAvailable(pv):
173        print "Cannot connect to PV = ", pv
174        exit(0)
175   
176    print pv, " is a motor record: ", str( pvIsMotorRec(pv) )
177    conn = PvConnection(pv+".RBV")
178    conn.connect()
179
180    # now try a couple moves
181    __demo_waitmove(pv, position=5,   seconds=5)
182    sleep(2.5)
183    __demo_waitmove(pv, position=1.1, seconds=5)
184
185# - - - - - - - - - - - - - - - - - - main
186
187
188if __name__ == '__main__':
189    _demo_()
Note: See TracBrowser for help on using the repository browser.