1# Copyright (c) 2017 Mark D. Hill and David A. Wood
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met: redistributions of source code must retain the above copyright
7# notice, this list of conditions and the following disclaimer;
8# redistributions in binary form must reproduce the above copyright
9# notice, this list of conditions and the following disclaimer in the
10# documentation and/or other materials provided with the distribution;
11# neither the name of the copyright holders nor the names of its
12# contributors may be used to endorse or promote products derived from
13# this software without specific prior written permission.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
18# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
19# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26#
27# Authors: Sean Wilson
28
29'''
30Handlers for the testlib Log.
31
32
33'''
34from __future__ import print_function
35
36import multiprocessing
37import os
38import Queue
39import sys
40import threading
41import time
42import traceback
43
44import helper
45import log
46import result
47import state
48import test
49import terminal
50
51from config import config, constants
52
53
54class _TestStreamManager(object):
55    def __init__(self):
56        self._writers = {}
57
58    def open_writer(self, test_result):
59        if test_result in self._writers:
60            raise ValueError('Cannot have multiple writters on a single test.')
61        self._writers[test_result] = _TestStreams(test_result.stdout,
62                test_result.stderr)
63
64    def get_writer(self, test_result):
65        if test_result not in self._writers:
66            self.open_writer(test_result)
67        return self._writers[test_result]
68
69    def close_writer(self, test_result):
70        if test_result in self._writers:
71            writer = self._writers.pop(test_result)
72            writer.close()
73
74    def close(self):
75        for writer in self._writers.values():
76            writer.close()
77        self._writers.clear()
78
79class _TestStreams(object):
80    def __init__(self, stdout, stderr):
81        helper.mkdir_p(os.path.dirname(stdout))
82        helper.mkdir_p(os.path.dirname(stderr))
83        self.stdout = open(stdout, 'w')
84        self.stderr = open(stderr, 'w')
85
86    def close(self):
87        self.stdout.close()
88        self.stderr.close()
89
90class ResultHandler(log.Handler):
91    '''
92    Log handler which listens for test results and output saving data as
93    it is reported.
94
95    When the handler is closed it writes out test results in the python pickle
96    format.
97    '''
98    def __init__(self, schedule, directory):
99        '''
100        :param schedule: The entire schedule as a :class:`LoadedLibrary`
101            object.
102
103        :param directory: Directory to save test stdout/stderr and aggregate
104            results to.
105        '''
106        self.directory = directory
107        self.internal_results = result.InternalLibraryResults(schedule,
108                directory)
109        self.test_stream_manager = _TestStreamManager()
110        self._closed = False
111
112        self.mapping = {
113            log.LibraryStatus.type_id: self.handle_library_status,
114
115            log.SuiteResult.type_id: self.handle_suite_result,
116            log.TestResult.type_id: self.handle_test_result,
117
118            log.TestStderr.type_id: self.handle_stderr,
119            log.TestStdout.type_id: self.handle_stdout,
120        }
121
122    def handle(self, record):
123        if not self._closed:
124            self.mapping.get(record.type_id, lambda _:None)(record)
125
126    def handle_library_status(self, record):
127        if record['status'] in (state.Status.Complete, state.Status.Avoided):
128            self.test_stream_manager.close()
129
130    def handle_suite_result(self, record):
131        suite_result = self.internal_results.get_suite_result(
132                    record['metadata'].uid)
133        suite_result.result = record['result']
134
135    def handle_test_result(self, record):
136        test_result = self._get_test_result(record)
137        test_result.result = record['result']
138
139    def handle_stderr(self, record):
140        self.test_stream_manager.get_writer(
141            self._get_test_result(record)
142        ).stderr.write(record['buffer'])
143
144    def handle_stdout(self, record):
145        self.test_stream_manager.get_writer(
146            self._get_test_result(record)
147        ).stdout.write(record['buffer'])
148
149    def _get_test_result(self, test_record):
150        return self.internal_results.get_test_result(
151                    test_record['metadata'].uid,
152                    test_record['metadata'].suite_uid)
153
154    def _save(self):
155        #FIXME Hardcoded path name
156        result.InternalSavedResults.save(
157            self.internal_results,
158            os.path.join(self.directory, constants.pickle_filename))
159        result.JUnitSavedResults.save(
160            self.internal_results,
161            os.path.join(self.directory, constants.xml_filename))
162
163    def close(self):
164        if self._closed:
165            return
166        self._closed = True
167        self._save()
168
169    def unsuccessful(self):
170        '''
171        Performs an or reduce on all of the results.
172        Returns true if at least one test is unsuccessful, false when all tests
173        pass
174        '''
175        for suite_result in self.internal_results:
176            if suite_result.unsuccessful:
177                return True
178        # If all are successful, then this wasn't "unsuccessful"
179        return False
180
181
182#TODO Change from a handler to an internal post processor so it can be used
183# to reprint results
184class SummaryHandler(log.Handler):
185    '''
186    A log handler which listens to the log for test results
187    and reports the aggregate results when closed.
188    '''
189    color = terminal.get_termcap()
190    reset = color.Normal
191    colormap = {
192            state.Result.Errored: color.Red,
193            state.Result.Failed: color.Red,
194            state.Result.Passed: color.Green,
195            state.Result.Skipped: color.Cyan,
196    }
197    sep_fmtkey = 'separator'
198    sep_fmtstr = '{%s}' % sep_fmtkey
199
200    def __init__(self):
201        self.mapping = {
202            log.TestResult.type_id: self.handle_testresult,
203            log.LibraryStatus.type_id: self.handle_library_status,
204        }
205        self._timer = helper.Timer()
206        self.results = []
207
208    def handle_library_status(self, record):
209        if record['status'] == state.Status.Building:
210            self._timer.restart()
211
212    def handle_testresult(self, record):
213        result = record['result'].value
214        if result in (state.Result.Skipped, state.Result.Failed,
215                state.Result.Passed, state.Result.Errored):
216            self.results.append(result)
217
218    def handle(self, record):
219        self.mapping.get(record.type_id, lambda _:None)(record)
220
221    def close(self):
222        print(self._display_summary())
223
224    def _display_summary(self):
225        most_severe_outcome = None
226        outcome_fmt = ' {count} {outcome}'
227        strings = []
228
229        outcome_count = [0] * len(state.Result.enums)
230        for result in self.results:
231            outcome_count[result] += 1
232
233        # Iterate over enums so they are in order of severity
234        for outcome in state.Result.enums:
235            outcome = getattr(state.Result, outcome)
236            count  = outcome_count[outcome]
237            if count:
238                strings.append(outcome_fmt.format(count=count,
239                        outcome=state.Result.enums[outcome]))
240                most_severe_outcome = outcome
241        string = ','.join(strings)
242        if most_severe_outcome is None:
243            string = ' No testing done'
244            most_severe_outcome = state.Result.Passed
245        else:
246            string = ' Results:' + string + ' in {:.2} seconds '.format(
247                    self._timer.active_time())
248        string += ' '
249        return terminal.insert_separator(
250                string,
251                color=self.colormap[most_severe_outcome] + self.color.Bold)
252
253class TerminalHandler(log.Handler):
254    color = terminal.get_termcap()
255    verbosity_mapping = {
256        log.LogLevel.Warn: color.Yellow,
257        log.LogLevel.Error: color.Red,
258    }
259    default = color.Normal
260
261    def __init__(self, verbosity=log.LogLevel.Info, machine_only=False):
262        self.stream = verbosity >= log.LogLevel.Trace
263        self.verbosity = verbosity
264        self.machine_only = machine_only
265        self.mapping = {
266            log.TestResult.type_id: self.handle_testresult,
267            log.SuiteStatus.type_id: self.handle_suitestatus,
268            log.TestStatus.type_id: self.handle_teststatus,
269            log.TestStderr.type_id: self.handle_stderr,
270            log.TestStdout.type_id: self.handle_stdout,
271            log.TestMessage.type_id: self.handle_testmessage,
272            log.LibraryMessage.type_id: self.handle_librarymessage,
273        }
274
275    def _display_outcome(self, name, outcome, reason=None):
276        print(self.color.Bold
277                 + SummaryHandler.colormap[outcome]
278                 + name
279                 + ' '
280                 + state.Result.enums[outcome]
281                 + SummaryHandler.reset)
282
283        if reason is not None:
284            log.test_log.info('')
285            log.test_log.info('Reason:')
286            log.test_log.info(reason)
287            log.test_log.info(terminal.separator('-'))
288
289    def handle_teststatus(self, record):
290        if record['status'] == state.Status.Running:
291            log.test_log.debug('Starting Test Case: %s' %\
292                    record['metadata'].name)
293
294    def handle_testresult(self, record):
295        self._display_outcome(
296            'Test: %s'  % record['metadata'].name,
297            record['result'].value)
298
299    def handle_suitestatus(self, record):
300        if record['status'] == state.Status.Running:
301              log.test_log.debug('Starting Test Suite: %s ' %\
302                    record['metadata'].name)
303
304    def handle_stderr(self, record):
305        if self.stream:
306            print(record.data['buffer'], file=sys.stderr, end='')
307
308    def handle_stdout(self, record):
309        if self.stream:
310            print(record.data['buffer'], file=sys.stdout, end='')
311
312    def handle_testmessage(self, record):
313        if self.stream:
314            print(self._colorize(record['message'], record['level']))
315
316    def handle_librarymessage(self, record):
317        if not self.machine_only or record.data.get('machine_readable', False):
318            print(self._colorize(record['message'], record['level'],
319                    record['bold']))
320
321    def _colorize(self, message, level, bold=False):
322        return '%s%s%s%s' % (
323                self.color.Bold if bold else '',
324                self.verbosity_mapping.get(level, ''),
325                message,
326                self.default)
327
328    def handle(self, record):
329        if record.data.get('level', self.verbosity) > self.verbosity:
330            return
331        self.mapping.get(record.type_id, lambda _:None)(record)
332
333    def set_verbosity(self, verbosity):
334        self.verbosity = verbosity
335
336
337class PrintHandler(log.Handler):
338    def __init__(self):
339        pass
340
341    def handle(self, record):
342        print(str(record).rstrip())
343
344    def close(self):
345        pass
346
347
348class MultiprocessingHandlerWrapper(log.Handler):
349    '''
350    A handler class which forwards log records to subhandlers, enabling
351    logging across multiprocessing python processes.
352
353    The 'parent' side of the handler should execute either
354    :func:`async_process` or :func:`process` to forward
355    log records to subhandlers.
356    '''
357    def __init__(self, *subhandlers):
358        # Create thread to spin handing recipt of messages
359        # Create queue to push onto
360        self.queue = multiprocessing.Queue()
361        self.queue.cancel_join_thread()
362        self._shutdown = threading.Event()
363
364        # subhandlers should be accessed with the _handler_lock
365        self._handler_lock = threading.Lock()
366        self._subhandlers = subhandlers
367
368    def add_handler(self, handler):
369        self._handler_lock.acquire()
370        self._subhandlers = (handler, ) + self._subhandlers
371        self._handler_lock.release()
372
373    def _with_handlers(self, callback):
374        exception = None
375        self._handler_lock.acquire()
376        for handler in self._subhandlers:
377            # Prevent deadlock when using this handler by delaying
378            # exception raise until we get a chance to unlock.
379            try:
380                callback(handler)
381            except Exception as e:
382                exception = e
383                break
384        self._handler_lock.release()
385
386        if exception is not None:
387            raise exception
388
389    def async_process(self):
390        self.thread = threading.Thread(target=self.process)
391        self.thread.daemon = True
392        self.thread.start()
393
394    def process(self):
395        while not self._shutdown.is_set():
396            try:
397                item = self.queue.get(timeout=0.1)
398                self._handle(item)
399            except (KeyboardInterrupt, SystemExit):
400                raise
401            except EOFError:
402                return
403            except Queue.Empty:
404                continue
405
406    def _drain(self):
407        while True:
408            try:
409                item = self.queue.get(block=False)
410                self._handle(item)
411            except (KeyboardInterrupt, SystemExit):
412                raise
413            except EOFError:
414                return
415            except Queue.Empty:
416                return
417
418    def _handle(self, record):
419        self._with_handlers(lambda handler: handler.handle(record))
420
421    def handle(self, record):
422        self.queue.put(record)
423
424    def _close(self):
425        if hasattr(self, 'thread'):
426            self.thread.join()
427        _wrap(self._drain)
428        self._with_handlers(lambda handler: _wrap(handler.close))
429
430        # NOTE Python2 has an known bug which causes IOErrors to be raised
431        # if this shutdown doesn't go cleanly on both ends.
432        # This sleep adds some time for the sender threads on this process to
433        # finish pickling the object and complete shutdown after the queue is
434        # closed.
435        time.sleep(.2)
436        self.queue.close()
437        time.sleep(.2)
438
439    def close(self):
440        if not self._shutdown.is_set():
441            self._shutdown.set()
442            self._close()
443
444
445def _wrap(callback, *args, **kwargs):
446    try:
447        callback(*args, **kwargs)
448    except:
449        traceback.print_exc()
450