handlers.py revision 12882:dd87d7f2f3e5
1# Copyright (c) 2017 Mark D. Hill and David A. Wood 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: redistributions of source code must retain the above copyright 7# notice, this list of conditions and the following disclaimer; 8# redistributions in binary form must reproduce the above copyright 9# notice, this list of conditions and the following disclaimer in the 10# documentation and/or other materials provided with the distribution; 11# neither the name of the copyright holders nor the names of its 12# contributors may be used to endorse or promote products derived from 13# this software without specific prior written permission. 14# 15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 18# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 19# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 25# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26# 27# Authors: Sean Wilson 28 29''' 30Handlers for the testlib Log. 31 32 33''' 34from __future__ import print_function 35 36import multiprocessing 37import os 38import Queue 39import sys 40import threading 41import time 42import traceback 43 44import helper 45import log 46import result 47import state 48import test 49import terminal 50 51from config import config, constants 52 53 54class _TestStreamManager(object): 55 def __init__(self): 56 self._writers = {} 57 58 def open_writer(self, test_result): 59 if test_result in self._writers: 60 raise ValueError('Cannot have multiple writters on a single test.') 61 self._writers[test_result] = _TestStreams(test_result.stdout, 62 test_result.stderr) 63 64 def get_writer(self, test_result): 65 if test_result not in self._writers: 66 self.open_writer(test_result) 67 return self._writers[test_result] 68 69 def close_writer(self, test_result): 70 if test_result in self._writers: 71 writer = self._writers.pop(test_result) 72 writer.close() 73 74 def close(self): 75 for writer in self._writers.values(): 76 writer.close() 77 self._writers.clear() 78 79class _TestStreams(object): 80 def __init__(self, stdout, stderr): 81 helper.mkdir_p(os.path.dirname(stdout)) 82 helper.mkdir_p(os.path.dirname(stderr)) 83 self.stdout = open(stdout, 'w') 84 self.stderr = open(stderr, 'w') 85 86 def close(self): 87 self.stdout.close() 88 self.stderr.close() 89 90class ResultHandler(log.Handler): 91 ''' 92 Log handler which listens for test results and output saving data as 93 it is reported. 94 95 When the handler is closed it writes out test results in the python pickle 96 format. 97 ''' 98 def __init__(self, schedule, directory): 99 ''' 100 :param schedule: The entire schedule as a :class:`LoadedLibrary` 101 object. 102 103 :param directory: Directory to save test stdout/stderr and aggregate 104 results to. 105 ''' 106 self.directory = directory 107 self.internal_results = result.InternalLibraryResults(schedule, 108 directory) 109 self.test_stream_manager = _TestStreamManager() 110 self._closed = False 111 112 self.mapping = { 113 log.LibraryStatus.type_id: self.handle_library_status, 114 115 log.SuiteResult.type_id: self.handle_suite_result, 116 log.TestResult.type_id: self.handle_test_result, 117 118 log.TestStderr.type_id: self.handle_stderr, 119 log.TestStdout.type_id: self.handle_stdout, 120 } 121 122 def handle(self, record): 123 if not self._closed: 124 self.mapping.get(record.type_id, lambda _:None)(record) 125 126 def handle_library_status(self, record): 127 if record['status'] in (state.Status.Complete, state.Status.Avoided): 128 self.test_stream_manager.close() 129 130 def handle_suite_result(self, record): 131 suite_result = self.internal_results.get_suite_result( 132 record['metadata'].uid) 133 suite_result.result = record['result'] 134 135 def handle_test_result(self, record): 136 test_result = self._get_test_result(record) 137 test_result.result = record['result'] 138 139 def handle_stderr(self, record): 140 self.test_stream_manager.get_writer( 141 self._get_test_result(record) 142 ).stderr.write(record['buffer']) 143 144 def handle_stdout(self, record): 145 self.test_stream_manager.get_writer( 146 self._get_test_result(record) 147 ).stdout.write(record['buffer']) 148 149 def _get_test_result(self, test_record): 150 return self.internal_results.get_test_result( 151 test_record['metadata'].uid, 152 test_record['metadata'].suite_uid) 153 154 def _save(self): 155 #FIXME Hardcoded path name 156 result.InternalSavedResults.save( 157 self.internal_results, 158 os.path.join(self.directory, constants.pickle_filename)) 159 result.JUnitSavedResults.save( 160 self.internal_results, 161 os.path.join(self.directory, constants.xml_filename)) 162 163 def close(self): 164 if self._closed: 165 return 166 self._closed = True 167 self._save() 168 169 170#TODO Change from a handler to an internal post processor so it can be used 171# to reprint results 172class SummaryHandler(log.Handler): 173 ''' 174 A log handler which listens to the log for test results 175 and reports the aggregate results when closed. 176 ''' 177 color = terminal.get_termcap() 178 reset = color.Normal 179 colormap = { 180 state.Result.Errored: color.Red, 181 state.Result.Failed: color.Red, 182 state.Result.Passed: color.Green, 183 state.Result.Skipped: color.Cyan, 184 } 185 sep_fmtkey = 'separator' 186 sep_fmtstr = '{%s}' % sep_fmtkey 187 188 def __init__(self): 189 self.mapping = { 190 log.TestResult.type_id: self.handle_testresult, 191 log.LibraryStatus.type_id: self.handle_library_status, 192 } 193 self._timer = helper.Timer() 194 self.results = [] 195 196 def handle_library_status(self, record): 197 if record['status'] == state.Status.Building: 198 self._timer.restart() 199 200 def handle_testresult(self, record): 201 result = record['result'].value 202 if result in (state.Result.Skipped, state.Result.Failed, 203 state.Result.Passed, state.Result.Errored): 204 self.results.append(result) 205 206 def handle(self, record): 207 self.mapping.get(record.type_id, lambda _:None)(record) 208 209 def close(self): 210 print(self._display_summary()) 211 212 def _display_summary(self): 213 most_severe_outcome = None 214 outcome_fmt = ' {count} {outcome}' 215 strings = [] 216 217 outcome_count = [0] * len(state.Result.enums) 218 for result in self.results: 219 outcome_count[result] += 1 220 221 # Iterate over enums so they are in order of severity 222 for outcome in state.Result.enums: 223 outcome = getattr(state.Result, outcome) 224 count = outcome_count[outcome] 225 if count: 226 strings.append(outcome_fmt.format(count=count, 227 outcome=state.Result.enums[outcome])) 228 most_severe_outcome = outcome 229 string = ','.join(strings) 230 if most_severe_outcome is None: 231 string = ' No testing done' 232 most_severe_outcome = state.Result.Passed 233 else: 234 string = ' Results:' + string + ' in {:.2} seconds '.format( 235 self._timer.active_time()) 236 string += ' ' 237 return terminal.insert_separator( 238 string, 239 color=self.colormap[most_severe_outcome] + self.color.Bold) 240 241class TerminalHandler(log.Handler): 242 color = terminal.get_termcap() 243 verbosity_mapping = { 244 log.LogLevel.Warn: color.Yellow, 245 log.LogLevel.Error: color.Red, 246 } 247 default = color.Normal 248 249 def __init__(self, verbosity=log.LogLevel.Info, machine_only=False): 250 self.stream = verbosity >= log.LogLevel.Trace 251 self.verbosity = verbosity 252 self.machine_only = machine_only 253 self.mapping = { 254 log.TestResult.type_id: self.handle_testresult, 255 log.SuiteStatus.type_id: self.handle_suitestatus, 256 log.TestStatus.type_id: self.handle_teststatus, 257 log.TestStderr.type_id: self.handle_stderr, 258 log.TestStdout.type_id: self.handle_stdout, 259 log.TestMessage.type_id: self.handle_testmessage, 260 log.LibraryMessage.type_id: self.handle_librarymessage, 261 } 262 263 def _display_outcome(self, name, outcome, reason=None): 264 print(self.color.Bold 265 + SummaryHandler.colormap[outcome] 266 + name 267 + ' ' 268 + state.Result.enums[outcome] 269 + SummaryHandler.reset) 270 271 if reason is not None: 272 log.test_log.info('') 273 log.test_log.info('Reason:') 274 log.test_log.info(reason) 275 log.test_log.info(terminal.separator('-')) 276 277 def handle_teststatus(self, record): 278 if record['status'] == state.Status.Running: 279 log.test_log.debug('Starting Test Case: %s' %\ 280 record['metadata'].name) 281 282 def handle_testresult(self, record): 283 self._display_outcome( 284 'Test: %s' % record['metadata'].name, 285 record['result'].value) 286 287 def handle_suitestatus(self, record): 288 if record['status'] == state.Status.Running: 289 log.test_log.debug('Starting Test Suite: %s ' %\ 290 record['metadata'].name) 291 292 def handle_stderr(self, record): 293 if self.stream: 294 print(record.data['buffer'], file=sys.stderr, end='') 295 296 def handle_stdout(self, record): 297 if self.stream: 298 print(record.data['buffer'], file=sys.stdout, end='') 299 300 def handle_testmessage(self, record): 301 if self.stream: 302 print(self._colorize(record['message'], record['level'])) 303 304 def handle_librarymessage(self, record): 305 if not self.machine_only or record.data.get('machine_readable', False): 306 print(self._colorize(record['message'], record['level'], 307 record['bold'])) 308 309 def _colorize(self, message, level, bold=False): 310 return '%s%s%s%s' % ( 311 self.color.Bold if bold else '', 312 self.verbosity_mapping.get(level, ''), 313 message, 314 self.default) 315 316 def handle(self, record): 317 if record.data.get('level', self.verbosity) > self.verbosity: 318 return 319 self.mapping.get(record.type_id, lambda _:None)(record) 320 321 def set_verbosity(self, verbosity): 322 self.verbosity = verbosity 323 324 325class PrintHandler(log.Handler): 326 def __init__(self): 327 pass 328 329 def handle(self, record): 330 print(str(record).rstrip()) 331 332 def close(self): 333 pass 334 335 336class MultiprocessingHandlerWrapper(log.Handler): 337 ''' 338 A handler class which forwards log records to subhandlers, enabling 339 logging across multiprocessing python processes. 340 341 The 'parent' side of the handler should execute either 342 :func:`async_process` or :func:`process` to forward 343 log records to subhandlers. 344 ''' 345 def __init__(self, *subhandlers): 346 # Create thread to spin handing recipt of messages 347 # Create queue to push onto 348 self.queue = multiprocessing.Queue() 349 self.queue.cancel_join_thread() 350 self._shutdown = threading.Event() 351 352 # subhandlers should be accessed with the _handler_lock 353 self._handler_lock = threading.Lock() 354 self._subhandlers = subhandlers 355 356 def add_handler(self, handler): 357 self._handler_lock.acquire() 358 self._subhandlers = (handler, ) + self._subhandlers 359 self._handler_lock.release() 360 361 def _with_handlers(self, callback): 362 exception = None 363 self._handler_lock.acquire() 364 for handler in self._subhandlers: 365 # Prevent deadlock when using this handler by delaying 366 # exception raise until we get a chance to unlock. 367 try: 368 callback(handler) 369 except Exception as e: 370 exception = e 371 break 372 self._handler_lock.release() 373 374 if exception is not None: 375 raise exception 376 377 def async_process(self): 378 self.thread = threading.Thread(target=self.process) 379 self.thread.daemon = True 380 self.thread.start() 381 382 def process(self): 383 while not self._shutdown.is_set(): 384 try: 385 item = self.queue.get(timeout=0.1) 386 self._handle(item) 387 except (KeyboardInterrupt, SystemExit): 388 raise 389 except EOFError: 390 return 391 except Queue.Empty: 392 continue 393 394 def _drain(self): 395 while True: 396 try: 397 item = self.queue.get(block=False) 398 self._handle(item) 399 except (KeyboardInterrupt, SystemExit): 400 raise 401 except EOFError: 402 return 403 except Queue.Empty: 404 return 405 406 def _handle(self, record): 407 self._with_handlers(lambda handler: handler.handle(record)) 408 409 def handle(self, record): 410 self.queue.put(record) 411 412 def _close(self): 413 if hasattr(self, 'thread'): 414 self.thread.join() 415 _wrap(self._drain) 416 self._with_handlers(lambda handler: _wrap(handler.close)) 417 418 # NOTE Python2 has an known bug which causes IOErrors to be raised 419 # if this shutdown doesn't go cleanly on both ends. 420 # This sleep adds some time for the sender threads on this process to 421 # finish pickling the object and complete shutdown after the queue is 422 # closed. 423 time.sleep(.2) 424 self.queue.close() 425 time.sleep(.2) 426 427 def close(self): 428 if not self._shutdown.is_set(): 429 self._shutdown.set() 430 self._close() 431 432 433def _wrap(callback, *args, **kwargs): 434 try: 435 callback(*args, **kwargs) 436 except: 437 traceback.print_exc() 438