1# Copyright (c) 2006 The Regents of The University of Michigan 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted provided that the following conditions are 6# met: redistributions of source code must retain the above copyright 7# notice, this list of conditions and the following disclaimer; 8# redistributions in binary form must reproduce the above copyright 9# notice, this list of conditions and the following disclaimer in the 10# documentation and/or other materials provided with the distribution; 11# neither the name of the copyright holders nor the names of its 12# contributors may be used to endorse or promote products derived from 13# this software without specific prior written permission. 14# 15# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 16# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 17# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 18# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 19# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 21# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 25# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26# 27# Authors: Kevin Lim 28 29import os, popen2, re, sys 30 31class MyPOpen(object): 32 def __init__(self, cmd, input = None, output = None, bufsize = -1): 33 self.status = -1 34 35 if input is None: 36 p2c_read, p2c_write = os.pipe() 37 self.tochild = os.fdopen(p2c_write, 'w', bufsize) 38 else: 39 p2c_write = None 40 if isinstance(input, file): 41 p2c_read = input.fileno() 42 elif isinstance(input, str): 43 input = file(input, 'r') 44 p2c_read = input.fileno() 45 elif isinstance(input, int): 46 p2c_read = input 47 else: 48 raise AttributeError 49 50 if output is None: 51 c2p_read, c2p_write = os.pipe() 52 self.fromchild = os.fdopen(c2p_read, 'r', bufsize) 53 else: 54 c2p_read = None 55 if isinstance(output, file): 56 c2p_write = output.fileno() 57 elif isinstance(output, str): 58 output = file(output, 'w') 59 c2p_write = output.fileno() 60 elif isinstance(output, int): 61 c2p_write = output 62 else: 63 raise AttributeError 64 65 self.pid = os.fork() 66 if self.pid == 0: 67 os.dup2(p2c_read, sys.stdin.fileno()) 68 os.dup2(c2p_write, sys.stdout.fileno()) 69 os.dup2(c2p_write, sys.stderr.fileno()) 70 try: 71 os.execvp(cmd[0], cmd) 72 finally: 73 os._exit(1) 74 75 os.close(p2c_read) 76 os.close(c2p_write) 77 78 def poll(self): 79 if self.status < 0: 80 pid, status = os.waitpid(self.pid, os.WNOHANG) 81 if pid == self.pid: 82 self.status = status 83 return self.status 84 85 def wait(self): 86 if self.status < 0: 87 pid, status = os.waitpid(self.pid, 0) 88 if pid == self.pid: 89 self.status = status 90 return self.status 91 92 93class oarsub: 94 def __init__(self): 95 self.walltime = None 96 self.queue = None 97 self.properties = None 98 99 # OAR 2.0 parameters only! 100 self.name = None 101 self.afterok = None 102 self.notify = None 103 self.stderr = None 104 self.stdout = None 105 106 107 self.oarhost = None 108 self.oarsub = 'oarsub' 109 110 self.jobid = re.compile('IdJob = (\S+)') 111 #self.outfile = open("jobnames.dat", "a+") 112 113 def build(self, script, args = []): 114 self.cmd = [ self.oarsub ] 115 116 print "args:", args 117 print "script:", script 118 if self.properties: 119 self.cmd.append('-p"%s"' % self.properties ) 120 121 if self.queue: 122 self.cmd.append('-q "%s"' % self.queue) 123 124 if self.walltime: 125 self.cmd.append('-l walltime=%s' % self.walltime) 126 127 if script[0] != "/": 128 self.script = os.getcwd() 129 else: 130 self.script = script 131 132 self.cmd.extend(args) 133 self.cmd.append(self.script) 134 #cmd = [ 'ssh', '-x', self.oarhost, '"cd %s; %s"' % (os.getcwd(), self.command) ] 135 self.command = ' '.join(self.cmd) 136 137 print "command: [%s]" % self.command 138 139 def do(self): 140 oar = MyPOpen(self.cmd) 141 self.result = oar.fromchild.read() 142 ec = oar.wait() 143 144 if ec != 0 and self.oarhost: 145 pstdin, pstdout = os.popen4(self.command) 146 self.result = pstdout.read() 147 148 jobid = self.jobid.match(self.result) 149 if jobid == None: 150 print "Couldn't get jobid from [%s]" % self.result 151 sys.exit(1) 152 else: 153 #self.outfile.write("%d %s\n" %(int(jobid.group(1)), self.name)); 154 #self.outfile.flush() 155 self.result = jobid.group(1) 156 157 return 0 158 159class qsub: 160 def __init__(self): 161 self.afterok = None 162 self.hold = False 163 self.join = False 164 self.keep_stdout = False 165 self.keep_stderr = False 166 self.node_type = None 167 self.mail_abort = False 168 self.mail_begin = False 169 self.mail_end = False 170 self.name = None 171 self.stdout = None 172 self.priority = None 173 self.queue = None 174 self.pbshost = None 175 self.qsub = 'qsub' 176 self.env = {} 177 178 def build(self, script, args = []): 179 self.cmd = [ self.qsub ] 180 181 if self.env: 182 arg = '-v' 183 arg += ','.join([ '%s=%s' % i for i in self.env.iteritems() ]) 184 self.cmd.append(arg) 185 186 if self.hold: 187 self.cmd.append('-h') 188 189 if self.stdout: 190 self.cmd.append('-olocalhost:' + self.stdout) 191 192 if self.keep_stdout and self.keep_stderr: 193 self.cmd.append('-koe') 194 elif self.keep_stdout: 195 self.cmd.append('-ko') 196 elif self.keep_stderr: 197 self.cmd.append('-ke') 198 else: 199 self.cmd.append('-kn') 200 201 if self.join: 202 self.cmd.append('-joe') 203 204 if self.node_type: 205 self.cmd.append('-lnodes=' + self.node_type) 206 207 if self.mail_abort or self.mail_begin or self.mail_end: 208 flags = '' 209 if self.mail_abort: 210 flags.append('a') 211 if self.mail_begin: 212 flags.append('b') 213 if self.mail_end: 214 flags.append('e') 215 if len(flags): 216 self.cmd.append('-m ' + flags) 217 else: 218 self.cmd.append('-mn') 219 220 if self.name: 221 self.cmd.append("-N%s" % self.name) 222 223 if self.priority: 224 self.cmd.append('-p' + self.priority) 225 226 if self.queue: 227 self.cmd.append('-q' + self.queue) 228 229 if self.afterok: 230 self.cmd.append('-Wdepend=afterok:%s' % self.afterok) 231 232 self.cmd.extend(args) 233 self.script = script 234 self.command = ' '.join(self.cmd + [ self.script ]) 235 236 def do(self): 237 pbs = MyPOpen(self.cmd + [ self.script ]) 238 self.result = pbs.fromchild.read() 239 ec = pbs.wait() 240 241 if ec != 0 and self.pbshost: 242 cmd = ' '.join(self.cmd + [ '-' ]) 243 cmd = [ 'ssh', '-x', self.pbshost, cmd ] 244 self.command = ' '.join(cmd) 245 ssh = MyPOpen(cmd, input = self.script) 246 self.result = ssh.fromchild.read() 247 ec = ssh.wait() 248 249 return ec 250