1# Copyright (c) 2017 Mark D. Hill and David A. Wood 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: redistributions of source code must retain the above copyright 7# notice, this list of conditions and the following disclaimer; 8# redistributions in binary form must reproduce the above copyright 9# notice, this list of conditions and the following disclaimer in the 10# documentation and/or other materials provided with the distribution; 11# neither the name of the copyright holders nor the names of its 12# contributors may be used to endorse or promote products derived from 13# this software without specific prior written permission. 14# 15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 18# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 19# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 25# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26# 27# Authors: Sean Wilson 28 29''' 30Handlers for the testlib Log. 31 32 33''' 34from __future__ import print_function 35 36import multiprocessing 37import os 38import Queue 39import sys 40import threading 41import time 42import traceback 43 44import helper 45import log 46import result 47import state 48import test 49import terminal 50 51from config import config, constants 52 53 54class _TestStreamManager(object): 55 def __init__(self): 56 self._writers = {} 57 58 def open_writer(self, test_result): 59 if test_result in self._writers: 60 raise ValueError('Cannot have multiple writters on a single test.') 61 self._writers[test_result] = _TestStreams(test_result.stdout, 62 test_result.stderr) 63 64 def get_writer(self, test_result): 65 if test_result not in self._writers: 66 self.open_writer(test_result) 67 return self._writers[test_result] 68 69 def close_writer(self, test_result): 70 if test_result in self._writers: 71 writer = self._writers.pop(test_result) 72 writer.close() 73 74 def close(self): 75 for writer in self._writers.values(): 76 writer.close() 77 self._writers.clear() 78 79class _TestStreams(object): 80 def __init__(self, stdout, stderr): 81 helper.mkdir_p(os.path.dirname(stdout)) 82 helper.mkdir_p(os.path.dirname(stderr)) 83 self.stdout = open(stdout, 'w') 84 self.stderr = open(stderr, 'w') 85 86 def close(self): 87 self.stdout.close() 88 self.stderr.close() 89 90class ResultHandler(log.Handler): 91 ''' 92 Log handler which listens for test results and output saving data as 93 it is reported. 94 95 When the handler is closed it writes out test results in the python pickle 96 format. 97 ''' 98 def __init__(self, schedule, directory): 99 ''' 100 :param schedule: The entire schedule as a :class:`LoadedLibrary` 101 object. 102 103 :param directory: Directory to save test stdout/stderr and aggregate 104 results to. 105 ''' 106 self.directory = directory 107 self.internal_results = result.InternalLibraryResults(schedule, 108 directory) 109 self.test_stream_manager = _TestStreamManager() 110 self._closed = False 111 112 self.mapping = { 113 log.LibraryStatus.type_id: self.handle_library_status, 114 115 log.SuiteResult.type_id: self.handle_suite_result, 116 log.TestResult.type_id: self.handle_test_result, 117 118 log.TestStderr.type_id: self.handle_stderr, 119 log.TestStdout.type_id: self.handle_stdout, 120 } 121 122 def handle(self, record): 123 if not self._closed: 124 self.mapping.get(record.type_id, lambda _:None)(record) 125 126 def handle_library_status(self, record): 127 if record['status'] in (state.Status.Complete, state.Status.Avoided): 128 self.test_stream_manager.close() 129 130 def handle_suite_result(self, record): 131 suite_result = self.internal_results.get_suite_result( 132 record['metadata'].uid) 133 suite_result.result = record['result'] 134 135 def handle_test_result(self, record): 136 test_result = self._get_test_result(record) 137 test_result.result = record['result'] 138 139 def handle_stderr(self, record): 140 self.test_stream_manager.get_writer( 141 self._get_test_result(record) 142 ).stderr.write(record['buffer']) 143 144 def handle_stdout(self, record): 145 self.test_stream_manager.get_writer( 146 self._get_test_result(record) 147 ).stdout.write(record['buffer']) 148 149 def _get_test_result(self, test_record): 150 return self.internal_results.get_test_result( 151 test_record['metadata'].uid, 152 test_record['metadata'].suite_uid) 153 154 def _save(self): 155 #FIXME Hardcoded path name 156 result.InternalSavedResults.save( 157 self.internal_results, 158 os.path.join(self.directory, constants.pickle_filename)) 159 result.JUnitSavedResults.save( 160 self.internal_results, 161 os.path.join(self.directory, constants.xml_filename)) 162 163 def close(self): 164 if self._closed: 165 return 166 self._closed = True 167 self._save() 168 169 def unsuccessful(self): 170 ''' 171 Performs an or reduce on all of the results. 172 Returns true if at least one test is unsuccessful, false when all tests 173 pass 174 ''' 175 for suite_result in self.internal_results: 176 if suite_result.unsuccessful: 177 return True 178 # If all are successful, then this wasn't "unsuccessful" 179 return False 180 181 182#TODO Change from a handler to an internal post processor so it can be used 183# to reprint results 184class SummaryHandler(log.Handler): 185 ''' 186 A log handler which listens to the log for test results 187 and reports the aggregate results when closed. 188 ''' 189 color = terminal.get_termcap() 190 reset = color.Normal 191 colormap = { 192 state.Result.Errored: color.Red, 193 state.Result.Failed: color.Red, 194 state.Result.Passed: color.Green, 195 state.Result.Skipped: color.Cyan, 196 } 197 sep_fmtkey = 'separator' 198 sep_fmtstr = '{%s}' % sep_fmtkey 199 200 def __init__(self): 201 self.mapping = { 202 log.TestResult.type_id: self.handle_testresult, 203 log.LibraryStatus.type_id: self.handle_library_status, 204 } 205 self._timer = helper.Timer() 206 self.results = [] 207 208 def handle_library_status(self, record): 209 if record['status'] == state.Status.Building: 210 self._timer.restart() 211 212 def handle_testresult(self, record): 213 result = record['result'].value 214 if result in (state.Result.Skipped, state.Result.Failed, 215 state.Result.Passed, state.Result.Errored): 216 self.results.append(result) 217 218 def handle(self, record): 219 self.mapping.get(record.type_id, lambda _:None)(record) 220 221 def close(self): 222 print(self._display_summary()) 223 224 def _display_summary(self): 225 most_severe_outcome = None 226 outcome_fmt = ' {count} {outcome}' 227 strings = [] 228 229 outcome_count = [0] * len(state.Result.enums) 230 for result in self.results: 231 outcome_count[result] += 1 232 233 # Iterate over enums so they are in order of severity 234 for outcome in state.Result.enums: 235 outcome = getattr(state.Result, outcome) 236 count = outcome_count[outcome] 237 if count: 238 strings.append(outcome_fmt.format(count=count, 239 outcome=state.Result.enums[outcome])) 240 most_severe_outcome = outcome 241 string = ','.join(strings) 242 if most_severe_outcome is None: 243 string = ' No testing done' 244 most_severe_outcome = state.Result.Passed 245 else: 246 string = ' Results:' + string + ' in {:.2} seconds '.format( 247 self._timer.active_time()) 248 string += ' ' 249 return terminal.insert_separator( 250 string, 251 color=self.colormap[most_severe_outcome] + self.color.Bold) 252 253class TerminalHandler(log.Handler): 254 color = terminal.get_termcap() 255 verbosity_mapping = { 256 log.LogLevel.Warn: color.Yellow, 257 log.LogLevel.Error: color.Red, 258 } 259 default = color.Normal 260 261 def __init__(self, verbosity=log.LogLevel.Info, machine_only=False): 262 self.stream = verbosity >= log.LogLevel.Trace 263 self.verbosity = verbosity 264 self.machine_only = machine_only 265 self.mapping = { 266 log.TestResult.type_id: self.handle_testresult, 267 log.SuiteStatus.type_id: self.handle_suitestatus, 268 log.TestStatus.type_id: self.handle_teststatus, 269 log.TestStderr.type_id: self.handle_stderr, 270 log.TestStdout.type_id: self.handle_stdout, 271 log.TestMessage.type_id: self.handle_testmessage, 272 log.LibraryMessage.type_id: self.handle_librarymessage, 273 } 274 275 def _display_outcome(self, name, outcome, reason=None): 276 print(self.color.Bold 277 + SummaryHandler.colormap[outcome] 278 + name 279 + ' ' 280 + state.Result.enums[outcome] 281 + SummaryHandler.reset) 282 283 if reason is not None: 284 log.test_log.info('') 285 log.test_log.info('Reason:') 286 log.test_log.info(reason) 287 log.test_log.info(terminal.separator('-')) 288 289 def handle_teststatus(self, record): 290 if record['status'] == state.Status.Running: 291 log.test_log.debug('Starting Test Case: %s' %\ 292 record['metadata'].name) 293 294 def handle_testresult(self, record): 295 self._display_outcome( 296 'Test: %s' % record['metadata'].name, 297 record['result'].value) 298 299 def handle_suitestatus(self, record): 300 if record['status'] == state.Status.Running: 301 log.test_log.debug('Starting Test Suite: %s ' %\ 302 record['metadata'].name) 303 304 def handle_stderr(self, record): 305 if self.stream: 306 print(record.data['buffer'], file=sys.stderr, end='') 307 308 def handle_stdout(self, record): 309 if self.stream: 310 print(record.data['buffer'], file=sys.stdout, end='') 311 312 def handle_testmessage(self, record): 313 if self.stream: 314 print(self._colorize(record['message'], record['level'])) 315 316 def handle_librarymessage(self, record): 317 if not self.machine_only or record.data.get('machine_readable', False): 318 print(self._colorize(record['message'], record['level'], 319 record['bold'])) 320 321 def _colorize(self, message, level, bold=False): 322 return '%s%s%s%s' % ( 323 self.color.Bold if bold else '', 324 self.verbosity_mapping.get(level, ''), 325 message, 326 self.default) 327 328 def handle(self, record): 329 if record.data.get('level', self.verbosity) > self.verbosity: 330 return 331 self.mapping.get(record.type_id, lambda _:None)(record) 332 333 def set_verbosity(self, verbosity): 334 self.verbosity = verbosity 335 336 337class PrintHandler(log.Handler): 338 def __init__(self): 339 pass 340 341 def handle(self, record): 342 print(str(record).rstrip()) 343 344 def close(self): 345 pass 346 347 348class MultiprocessingHandlerWrapper(log.Handler): 349 ''' 350 A handler class which forwards log records to subhandlers, enabling 351 logging across multiprocessing python processes. 352 353 The 'parent' side of the handler should execute either 354 :func:`async_process` or :func:`process` to forward 355 log records to subhandlers. 356 ''' 357 def __init__(self, *subhandlers): 358 # Create thread to spin handing recipt of messages 359 # Create queue to push onto 360 self.queue = multiprocessing.Queue() 361 self.queue.cancel_join_thread() 362 self._shutdown = threading.Event() 363 364 # subhandlers should be accessed with the _handler_lock 365 self._handler_lock = threading.Lock() 366 self._subhandlers = subhandlers 367 368 def add_handler(self, handler): 369 self._handler_lock.acquire() 370 self._subhandlers = (handler, ) + self._subhandlers 371 self._handler_lock.release() 372 373 def _with_handlers(self, callback): 374 exception = None 375 self._handler_lock.acquire() 376 for handler in self._subhandlers: 377 # Prevent deadlock when using this handler by delaying 378 # exception raise until we get a chance to unlock. 379 try: 380 callback(handler) 381 except Exception as e: 382 exception = e 383 break 384 self._handler_lock.release() 385 386 if exception is not None: 387 raise exception 388 389 def async_process(self): 390 self.thread = threading.Thread(target=self.process) 391 self.thread.daemon = True 392 self.thread.start() 393 394 def process(self): 395 while not self._shutdown.is_set(): 396 try: 397 item = self.queue.get(timeout=0.1) 398 self._handle(item) 399 except (KeyboardInterrupt, SystemExit): 400 raise 401 except EOFError: 402 return 403 except Queue.Empty: 404 continue 405 406 def _drain(self): 407 while True: 408 try: 409 item = self.queue.get(block=False) 410 self._handle(item) 411 except (KeyboardInterrupt, SystemExit): 412 raise 413 except EOFError: 414 return 415 except Queue.Empty: 416 return 417 418 def _handle(self, record): 419 self._with_handlers(lambda handler: handler.handle(record)) 420 421 def handle(self, record): 422 self.queue.put(record) 423 424 def _close(self): 425 if hasattr(self, 'thread'): 426 self.thread.join() 427 _wrap(self._drain) 428 self._with_handlers(lambda handler: _wrap(handler.close)) 429 430 # NOTE Python2 has an known bug which causes IOErrors to be raised 431 # if this shutdown doesn't go cleanly on both ends. 432 # This sleep adds some time for the sender threads on this process to 433 # finish pickling the object and complete shutdown after the queue is 434 # closed. 435 time.sleep(.2) 436 self.queue.close() 437 time.sleep(.2) 438 439 def close(self): 440 if not self._shutdown.is_set(): 441 self._shutdown.set() 442 self._close() 443 444 445def _wrap(callback, *args, **kwargs): 446 try: 447 callback(*args, **kwargs) 448 except: 449 traceback.print_exc() 450