1# Copyright (c) 2017 Mark D. Hill and David A. Wood
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met: redistributions of source code must retain the above copyright
7# notice, this list of conditions and the following disclaimer;
8# redistributions in binary form must reproduce the above copyright
9# notice, this list of conditions and the following disclaimer in the
10# documentation and/or other materials provided with the distribution;
11# neither the name of the copyright holders nor the names of its
12# contributors may be used to endorse or promote products derived from
13# this software without specific prior written permission.
14#
15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
18# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
19# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
20# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26#
27# Authors: Sean Wilson
28
29import multiprocessing
30import pdb
31import os
32import sys
33import threading
34import traceback
35
36import log
37
38pdb._Pdb = pdb.Pdb
39class ForkedPdb(pdb._Pdb):
40    '''
41    A Pdb subclass that may be used from a forked multiprocessing child
42    '''
43    io_manager = None
44    def interaction(self, *args, **kwargs):
45        _stdin = sys.stdin
46        self.io_manager.restore_pipes()
47        try:
48            sys.stdin = open('/dev/stdin')
49            pdb._Pdb.interaction(self, *args, **kwargs)
50        finally:
51            sys.stdin = _stdin
52            self.io_manager.replace_pipes()
53
54
55#TODO Refactor duplicate stdout, stderr logic
56class IoManager(object):
57    def __init__(self, test, suite):
58        self.test = test
59        self.suite = suite
60        self.log = log.test_log
61        self._init_pipes()
62
63    def _init_pipes(self):
64        self.stdout_rp, self.stdout_wp = os.pipe()
65        self.stderr_rp, self.stderr_wp = os.pipe()
66
67    def close_parent_pipes(self):
68        os.close(self.stdout_wp)
69        os.close(self.stderr_wp)
70
71    def setup(self):
72        self.replace_pipes()
73        self.fixup_pdb()
74
75    def fixup_pdb(self):
76        ForkedPdb.io_manager = self
77        pdb.Pdb = ForkedPdb
78
79    def replace_pipes(self):
80        self.old_stderr = os.dup(sys.stderr.fileno())
81        self.old_stdout = os.dup(sys.stdout.fileno())
82
83        os.dup2(self.stderr_wp, sys.stderr.fileno())
84        sys.stderr = os.fdopen(self.stderr_wp, 'w', 0)
85        os.dup2(self.stdout_wp, sys.stdout.fileno())
86        sys.stdout = os.fdopen(self.stdout_wp, 'w', 0)
87
88    def restore_pipes(self):
89        self.stderr_wp = os.dup(sys.stderr.fileno())
90        self.stdout_wp = os.dup(sys.stdout.fileno())
91
92        os.dup2(self.old_stderr, sys.stderr.fileno())
93        sys.stderr = os.fdopen(self.old_stderr, 'w', 0)
94        os.dup2(self.old_stdout, sys.stdout.fileno())
95        sys.stdout = os.fdopen(self.old_stdout, 'w', 0)
96
97    def start_loggers(self):
98        self.log_ouput()
99
100    def log_ouput(self):
101        def _log_output(pipe, log_callback):
102            with os.fdopen(pipe, 'r') as pipe:
103                # Read iteractively, don't allow input to fill the pipe.
104                for line in iter(pipe.readline, ''):
105                    log_callback(line)
106
107        # Don't keep a backpointer to self in the thread.
108        log = self.log
109        test = self.test
110        suite = self.suite
111
112        self.stdout_thread = threading.Thread(
113                target=_log_output,
114                args=(self.stdout_rp,
115                      lambda buf: log.test_stdout(test, suite, buf))
116        )
117        self.stderr_thread = threading.Thread(
118                target=_log_output,
119                args=(self.stderr_rp,
120                      lambda buf: log.test_stderr(test, suite, buf))
121        )
122
123        # Daemon + Join to not lock up main thread if something breaks
124        # but provide consistent execution if nothing goes wrong.
125        self.stdout_thread.daemon = True
126        self.stderr_thread.daemon = True
127        self.stdout_thread.start()
128        self.stderr_thread.start()
129
130    def join_loggers(self):
131        self.stdout_thread.join()
132        self.stderr_thread.join()
133
134
135class SubprocessException(Exception):
136    def __init__(self, trace):
137        super(SubprocessException, self).__init__(trace)
138
139class ExceptionProcess(multiprocessing.Process):
140    class Status(object):
141        def __init__(self, exitcode, exception_tuple):
142            self.exitcode = exitcode
143            if exception_tuple is not None:
144                self.trace = exception_tuple[0]
145            else:
146                self.trace = None
147
148    def __init__(self, *args, **kwargs):
149        multiprocessing.Process.__init__(self, *args, **kwargs)
150        self._pconn, self._cconn = multiprocessing.Pipe()
151        self._exception = None
152
153    def run(self):
154        try:
155            super(ExceptionProcess, self).run()
156            self._cconn.send(None)
157        except Exception:
158            tb = traceback.format_exc()
159            self._cconn.send((tb, ))
160            raise
161
162    @property
163    def status(self):
164        if self._pconn.poll():
165            self._exception = self._pconn.recv()
166
167        return self.Status(self.exitcode, self._exception)
168
169
170class Sandbox(object):
171    def __init__(self, test_parameters):
172
173        self.params = test_parameters
174        self.io_manager = IoManager(self.params.test, self.params.suite)
175
176        self.p = ExceptionProcess(target=self.entrypoint)
177        # Daemon + Join to not lock up main thread if something breaks
178        self.p.daemon = True
179        self.io_manager.start_loggers()
180        self.p.start()
181        self.io_manager.close_parent_pipes()
182        self.p.join()
183        self.io_manager.join_loggers()
184
185        status = self.p.status
186        if status.exitcode:
187            raise SubprocessException(status.trace)
188
189    def entrypoint(self):
190        self.io_manager.setup()
191        self.params.test.test(self.params)
192