handlers.py revision 12882:dd87d7f2f3e5
1# Copyright (c) 2017 Mark D. Hill and David A. Wood
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met: redistributions of source code must retain the above copyright
7# notice, this list of conditions and the following disclaimer;
8# redistributions in binary form must reproduce the above copyright
9# notice, this list of conditions and the following disclaimer in the
10# documentation and/or other materials provided with the distribution;
11# neither the name of the copyright holders nor the names of its
12# contributors may be used to endorse or promote products derived from
13# this software without specific prior written permission.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
18# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
19# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26#
27# Authors: Sean Wilson
28
29'''
30Handlers for the testlib Log.
31
32
33'''
34from __future__ import print_function
35
36import multiprocessing
37import os
38import Queue
39import sys
40import threading
41import time
42import traceback
43
44import helper
45import log
46import result
47import state
48import test
49import terminal
50
51from config import config, constants
52
53
54class _TestStreamManager(object):
55    def __init__(self):
56        self._writers = {}
57
58    def open_writer(self, test_result):
59        if test_result in self._writers:
60            raise ValueError('Cannot have multiple writters on a single test.')
61        self._writers[test_result] = _TestStreams(test_result.stdout,
62                test_result.stderr)
63
64    def get_writer(self, test_result):
65        if test_result not in self._writers:
66            self.open_writer(test_result)
67        return self._writers[test_result]
68
69    def close_writer(self, test_result):
70        if test_result in self._writers:
71            writer = self._writers.pop(test_result)
72            writer.close()
73
74    def close(self):
75        for writer in self._writers.values():
76            writer.close()
77        self._writers.clear()
78
79class _TestStreams(object):
80    def __init__(self, stdout, stderr):
81        helper.mkdir_p(os.path.dirname(stdout))
82        helper.mkdir_p(os.path.dirname(stderr))
83        self.stdout = open(stdout, 'w')
84        self.stderr = open(stderr, 'w')
85
86    def close(self):
87        self.stdout.close()
88        self.stderr.close()
89
90class ResultHandler(log.Handler):
91    '''
92    Log handler which listens for test results and output saving data as
93    it is reported.
94
95    When the handler is closed it writes out test results in the python pickle
96    format.
97    '''
98    def __init__(self, schedule, directory):
99        '''
100        :param schedule: The entire schedule as a :class:`LoadedLibrary`
101            object.
102
103        :param directory: Directory to save test stdout/stderr and aggregate
104            results to.
105        '''
106        self.directory = directory
107        self.internal_results = result.InternalLibraryResults(schedule,
108                directory)
109        self.test_stream_manager = _TestStreamManager()
110        self._closed = False
111
112        self.mapping = {
113            log.LibraryStatus.type_id: self.handle_library_status,
114
115            log.SuiteResult.type_id: self.handle_suite_result,
116            log.TestResult.type_id: self.handle_test_result,
117
118            log.TestStderr.type_id: self.handle_stderr,
119            log.TestStdout.type_id: self.handle_stdout,
120        }
121
122    def handle(self, record):
123        if not self._closed:
124            self.mapping.get(record.type_id, lambda _:None)(record)
125
126    def handle_library_status(self, record):
127        if record['status'] in (state.Status.Complete, state.Status.Avoided):
128            self.test_stream_manager.close()
129
130    def handle_suite_result(self, record):
131        suite_result = self.internal_results.get_suite_result(
132                    record['metadata'].uid)
133        suite_result.result = record['result']
134
135    def handle_test_result(self, record):
136        test_result = self._get_test_result(record)
137        test_result.result = record['result']
138
139    def handle_stderr(self, record):
140        self.test_stream_manager.get_writer(
141            self._get_test_result(record)
142        ).stderr.write(record['buffer'])
143
144    def handle_stdout(self, record):
145        self.test_stream_manager.get_writer(
146            self._get_test_result(record)
147        ).stdout.write(record['buffer'])
148
149    def _get_test_result(self, test_record):
150        return self.internal_results.get_test_result(
151                    test_record['metadata'].uid,
152                    test_record['metadata'].suite_uid)
153
154    def _save(self):
155        #FIXME Hardcoded path name
156        result.InternalSavedResults.save(
157            self.internal_results,
158            os.path.join(self.directory, constants.pickle_filename))
159        result.JUnitSavedResults.save(
160            self.internal_results,
161            os.path.join(self.directory, constants.xml_filename))
162
163    def close(self):
164        if self._closed:
165            return
166        self._closed = True
167        self._save()
168
169
170#TODO Change from a handler to an internal post processor so it can be used
171# to reprint results
172class SummaryHandler(log.Handler):
173    '''
174    A log handler which listens to the log for test results
175    and reports the aggregate results when closed.
176    '''
177    color = terminal.get_termcap()
178    reset = color.Normal
179    colormap = {
180            state.Result.Errored: color.Red,
181            state.Result.Failed: color.Red,
182            state.Result.Passed: color.Green,
183            state.Result.Skipped: color.Cyan,
184    }
185    sep_fmtkey = 'separator'
186    sep_fmtstr = '{%s}' % sep_fmtkey
187
188    def __init__(self):
189        self.mapping = {
190            log.TestResult.type_id: self.handle_testresult,
191            log.LibraryStatus.type_id: self.handle_library_status,
192        }
193        self._timer = helper.Timer()
194        self.results = []
195
196    def handle_library_status(self, record):
197        if record['status'] == state.Status.Building:
198            self._timer.restart()
199
200    def handle_testresult(self, record):
201        result = record['result'].value
202        if result in (state.Result.Skipped, state.Result.Failed,
203                state.Result.Passed, state.Result.Errored):
204            self.results.append(result)
205
206    def handle(self, record):
207        self.mapping.get(record.type_id, lambda _:None)(record)
208
209    def close(self):
210        print(self._display_summary())
211
212    def _display_summary(self):
213        most_severe_outcome = None
214        outcome_fmt = ' {count} {outcome}'
215        strings = []
216
217        outcome_count = [0] * len(state.Result.enums)
218        for result in self.results:
219            outcome_count[result] += 1
220
221        # Iterate over enums so they are in order of severity
222        for outcome in state.Result.enums:
223            outcome = getattr(state.Result, outcome)
224            count  = outcome_count[outcome]
225            if count:
226                strings.append(outcome_fmt.format(count=count,
227                        outcome=state.Result.enums[outcome]))
228                most_severe_outcome = outcome
229        string = ','.join(strings)
230        if most_severe_outcome is None:
231            string = ' No testing done'
232            most_severe_outcome = state.Result.Passed
233        else:
234            string = ' Results:' + string + ' in {:.2} seconds '.format(
235                    self._timer.active_time())
236        string += ' '
237        return terminal.insert_separator(
238                string,
239                color=self.colormap[most_severe_outcome] + self.color.Bold)
240
241class TerminalHandler(log.Handler):
242    color = terminal.get_termcap()
243    verbosity_mapping = {
244        log.LogLevel.Warn: color.Yellow,
245        log.LogLevel.Error: color.Red,
246    }
247    default = color.Normal
248
249    def __init__(self, verbosity=log.LogLevel.Info, machine_only=False):
250        self.stream = verbosity >= log.LogLevel.Trace
251        self.verbosity = verbosity
252        self.machine_only = machine_only
253        self.mapping = {
254            log.TestResult.type_id: self.handle_testresult,
255            log.SuiteStatus.type_id: self.handle_suitestatus,
256            log.TestStatus.type_id: self.handle_teststatus,
257            log.TestStderr.type_id: self.handle_stderr,
258            log.TestStdout.type_id: self.handle_stdout,
259            log.TestMessage.type_id: self.handle_testmessage,
260            log.LibraryMessage.type_id: self.handle_librarymessage,
261        }
262
263    def _display_outcome(self, name, outcome, reason=None):
264        print(self.color.Bold
265                 + SummaryHandler.colormap[outcome]
266                 + name
267                 + ' '
268                 + state.Result.enums[outcome]
269                 + SummaryHandler.reset)
270
271        if reason is not None:
272            log.test_log.info('')
273            log.test_log.info('Reason:')
274            log.test_log.info(reason)
275            log.test_log.info(terminal.separator('-'))
276
277    def handle_teststatus(self, record):
278        if record['status'] == state.Status.Running:
279            log.test_log.debug('Starting Test Case: %s' %\
280                    record['metadata'].name)
281
282    def handle_testresult(self, record):
283        self._display_outcome(
284            'Test: %s'  % record['metadata'].name,
285            record['result'].value)
286
287    def handle_suitestatus(self, record):
288        if record['status'] == state.Status.Running:
289              log.test_log.debug('Starting Test Suite: %s ' %\
290                    record['metadata'].name)
291
292    def handle_stderr(self, record):
293        if self.stream:
294            print(record.data['buffer'], file=sys.stderr, end='')
295
296    def handle_stdout(self, record):
297        if self.stream:
298            print(record.data['buffer'], file=sys.stdout, end='')
299
300    def handle_testmessage(self, record):
301        if self.stream:
302            print(self._colorize(record['message'], record['level']))
303
304    def handle_librarymessage(self, record):
305        if not self.machine_only or record.data.get('machine_readable', False):
306            print(self._colorize(record['message'], record['level'],
307                    record['bold']))
308
309    def _colorize(self, message, level, bold=False):
310        return '%s%s%s%s' % (
311                self.color.Bold if bold else '',
312                self.verbosity_mapping.get(level, ''),
313                message,
314                self.default)
315
316    def handle(self, record):
317        if record.data.get('level', self.verbosity) > self.verbosity:
318            return
319        self.mapping.get(record.type_id, lambda _:None)(record)
320
321    def set_verbosity(self, verbosity):
322        self.verbosity = verbosity
323
324
325class PrintHandler(log.Handler):
326    def __init__(self):
327        pass
328
329    def handle(self, record):
330        print(str(record).rstrip())
331
332    def close(self):
333        pass
334
335
336class MultiprocessingHandlerWrapper(log.Handler):
337    '''
338    A handler class which forwards log records to subhandlers, enabling
339    logging across multiprocessing python processes.
340
341    The 'parent' side of the handler should execute either
342    :func:`async_process` or :func:`process` to forward
343    log records to subhandlers.
344    '''
345    def __init__(self, *subhandlers):
346        # Create thread to spin handing recipt of messages
347        # Create queue to push onto
348        self.queue = multiprocessing.Queue()
349        self.queue.cancel_join_thread()
350        self._shutdown = threading.Event()
351
352        # subhandlers should be accessed with the _handler_lock
353        self._handler_lock = threading.Lock()
354        self._subhandlers = subhandlers
355
356    def add_handler(self, handler):
357        self._handler_lock.acquire()
358        self._subhandlers = (handler, ) + self._subhandlers
359        self._handler_lock.release()
360
361    def _with_handlers(self, callback):
362        exception = None
363        self._handler_lock.acquire()
364        for handler in self._subhandlers:
365            # Prevent deadlock when using this handler by delaying
366            # exception raise until we get a chance to unlock.
367            try:
368                callback(handler)
369            except Exception as e:
370                exception = e
371                break
372        self._handler_lock.release()
373
374        if exception is not None:
375            raise exception
376
377    def async_process(self):
378        self.thread = threading.Thread(target=self.process)
379        self.thread.daemon = True
380        self.thread.start()
381
382    def process(self):
383        while not self._shutdown.is_set():
384            try:
385                item = self.queue.get(timeout=0.1)
386                self._handle(item)
387            except (KeyboardInterrupt, SystemExit):
388                raise
389            except EOFError:
390                return
391            except Queue.Empty:
392                continue
393
394    def _drain(self):
395        while True:
396            try:
397                item = self.queue.get(block=False)
398                self._handle(item)
399            except (KeyboardInterrupt, SystemExit):
400                raise
401            except EOFError:
402                return
403            except Queue.Empty:
404                return
405
406    def _handle(self, record):
407        self._with_handlers(lambda handler: handler.handle(record))
408
409    def handle(self, record):
410        self.queue.put(record)
411
412    def _close(self):
413        if hasattr(self, 'thread'):
414            self.thread.join()
415        _wrap(self._drain)
416        self._with_handlers(lambda handler: _wrap(handler.close))
417
418        # NOTE Python2 has an known bug which causes IOErrors to be raised
419        # if this shutdown doesn't go cleanly on both ends.
420        # This sleep adds some time for the sender threads on this process to
421        # finish pickling the object and complete shutdown after the queue is
422        # closed.
423        time.sleep(.2)
424        self.queue.close()
425        time.sleep(.2)
426
427    def close(self):
428        if not self._shutdown.is_set():
429            self._shutdown.set()
430            self._close()
431
432
433def _wrap(callback, *args, **kwargs):
434    try:
435        callback(*args, **kwargs)
436    except:
437        traceback.print_exc()
438