Package Camelot :: Package camelot :: Package view :: Package model_thread :: Module signal_slot_model_thread
[frames] | no frames]

Source Code for Module Camelot.camelot.view.model_thread.signal_slot_model_thread

  1  ''' 
  2  Created on Sep 9, 2009 
  3   
  4  @author: tw55413 
  5  ''' 
  6  import logging 
  7  logger = logging.getLogger('camelot.view.model_thread.signal_slot_model_thread') 
  8   
  9  from PyQt4 import QtCore 
 10   
 11  from camelot.view.model_thread import AbstractModelThread, gui_function, setup_model 
12 13 -class Task(QtCore.QObject):
14 15 finished = QtCore.SIGNAL('finished') 16 exception = QtCore.SIGNAL('exception') 17
18 - def __init__(self, request, name=''):
19 QtCore.QObject.__init__(self) 20 self._request = request 21 self._name = name
22
23 - def clear(self):
24 """clear this tasks references to other objects""" 25 self._request = None 26 self._name = None
27
28 - def execute(self):
29 logger.debug('executing %s' % (self._name)) 30 try: 31 result = self._request() 32 #import sip 33 #print '===', self._name, '===' 34 #print self.receivers(QtCore.SIGNAL('finished')) 35 #sip.dump(self) 36 self.emit(QtCore.SIGNAL('finished'), result ) 37 except Exception, e: 38 logger.error( 'exception caught in model thread while executing %s'%self._name, exc_info = e ) 39 import traceback, cStringIO 40 sio = cStringIO.StringIO() 41 traceback.print_exc(file=sio) 42 traceback_print = sio.getvalue() 43 sio.close() 44 exception_info = (e, traceback_print) 45 self.emit(QtCore.SIGNAL('exception'), exception_info) 46 except: 47 logger.error( 'unhandled exception in model thread' )
48
49 -def synchronized( original_function ):
50 """Decorator for synchronized access to an object, the object should 51 have an attribute _mutex which is of type QMutex 52 """ 53 54 from functools import wraps 55 56 @wraps( original_function ) 57 def wrapper(self, *args, **kwargs): 58 QtCore.QMutexLocker(self._mutex) 59 return original_function(self, *args, **kwargs)
60 61 return wrapper 62
63 -class TaskHandler(QtCore.QObject):
64 """A task handler is an object that handles tasks that appear in a queue, 65 when its handle_task method is called, it will sequentially handle all tasks 66 that are in the queue. 67 """ 68
69 - def __init__(self, queue):
70 """:param queue: the queue from which to pop a task when handle_task 71 is called""" 72 QtCore.QObject.__init__(self) 73 self._mutex = QtCore.QMutex() 74 self._queue = queue 75 self._tasks_done = [] 76 self._busy = False 77 logger.debug("TaskHandler created.")
78 79 @synchronized
80 - def busy(self):
81 """:return True/False: indicating if this task handler is busy""" 82 return self._busy
83 84 @synchronized
85 - def handle_task(self):
86 """Handle all tasks that are in the queue""" 87 self._busy = True 88 self.emit( AbstractModelThread.thread_busy_signal, True ) 89 task = self._queue.pop() 90 while task: 91 task.execute() 92 # we keep track of the tasks done to prevent them being garbage collected 93 # apparently when they are garbage collected, they are recycled, but their 94 # signal slot connections seem to survive this recycling. 95 # @todo: this should be investigated in more detail, since we are causing 96 # a deliberate memory leak here 97 task.clear() 98 self._tasks_done.append(task) 99 task = self._queue.pop() 100 self.emit( AbstractModelThread.thread_busy_signal, False ) 101 self._busy = False
102
103 -class SignalSlotModelThread( QtCore.QThread, AbstractModelThread ):
104 """A model thread implementation that uses signals and slots 105 to communicate between the model thread and the gui thread 106 107 there is no explicit model thread verification on these methods, 108 since this model thread might not be THE model thread. 109 """ 110 111 task_available = QtCore.SIGNAL('task_available') 112
113 - def __init__( self, setup_thread = setup_model ):
114 """ 115 @param setup_thread: function to be called at startup of the thread to initialize 116 everything, by default this will setup the model. set to None if nothing should 117 be done. 118 """ 119 QtCore.QThread.__init__( self ) 120 AbstractModelThread.__init__( self, setup_thread ) 121 self._task_handler = None 122 self._mutex = QtCore.QMutex() 123 self._request_queue = [] 124 self._connected = False 125 self._setup_busy = True
126
127 - def run( self ):
128 self.logger.debug( 'model thread started' ) 129 self._task_handler = TaskHandler(self) 130 self.connect(self._task_handler, self.thread_busy_signal, self._thread_busy, QtCore.Qt.QueuedConnection) 131 self._thread_busy(True) 132 try: 133 self._setup_thread() 134 except Exception, e: 135 self.logger.error('thread setup incomplete', exc_info=e) 136 self._thread_busy(False) 137 self.logger.debug('thread setup finished') 138 # Some tasks might have been posted before the signals were connected to the task handler, 139 # so once force the handling of tasks 140 self._task_handler.handle_task() 141 self._setup_busy = False 142 self.exec_() 143 self.logger.debug('model thread stopped')
144
145 - def _thread_busy(self, busy_state):
146 self.emit(self.thread_busy_signal, busy_state)
147 148 @synchronized
149 - def post( self, request, response = None, exception = None ):
150 if not self._connected and self._task_handler: 151 # creating this connection in the model thread throws QT exceptions 152 self.connect(self, self.task_available, self._task_handler.handle_task, QtCore.Qt.QueuedConnection) 153 self._connected = True 154 # response should be a slot method of a QObject 155 if response: 156 name = '%s -> %s.%s'%(request.__name__, response.im_self.__class__.__name__, response.__name__) 157 else: 158 name = request.__name__ 159 task = Task(request, name=name) 160 # QObject::connect is a thread safe function 161 if response: 162 assert response.im_self 163 assert isinstance(response.im_self, QtCore.QObject) 164 self.connect(task, QtCore.SIGNAL('finished'), response, QtCore.Qt.QueuedConnection) 165 if exception: 166 self.connect(task, QtCore.SIGNAL('exception'), exception, QtCore.Qt.QueuedConnection) 167 task.moveToThread(self) 168 # only put the task in the queue when it is completely set up 169 self._request_queue.append(task) 170 #print 'task created --->', id(task) 171 self.emit(self.task_available)
172 173 @synchronized
174 - def pop( self ):
175 """Pop a task from the queue, return None if the queue is empty""" 176 if len(self._request_queue): 177 task = self._request_queue.pop(0) 178 return task
179 180 @synchronized
181 - def busy( self ):
182 """Return True or False indicating wether either the model or the 183 gui thread is doing something""" 184 while not self._task_handler: 185 import time 186 time.sleep(1) 187 app = QtCore.QCoreApplication.instance() 188 return app.hasPendingEvents() or len(self._request_queue) or self._task_handler.busy() or self._setup_busy
189 190 @gui_function
191 - def wait_on_work(self):
192 """Wait for all work to be finished, this function should only be used 193 to do unit testing and such, since it will block the calling thread until 194 all work is done""" 195 app = QtCore.QCoreApplication.instance() 196 while self.busy(): 197 app.processEvents()
198 199 # app = QCoreApplication.instance() 200 # waiting = True 201 # while waiting: 202 # waiting = False 203 # if app.hasPendingEvents(): 204 # app.processEvents() 205 # waiting = True 206