send.py revision 2357:add41108b549
1#!/usr/bin/env python 2# Copyright (c) 2006 The Regents of The University of Michigan 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are 7# met: redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer; 9# redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution; 12# neither the name of the copyright holders nor the names of its 13# contributors may be used to endorse or promote products derived from 14# this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27# 28# Authors: Kevin Lim 29 30import os, os.path, re, socket, sys 31from os import environ as env, listdir 32from os.path import basename, isdir, isfile, islink, join as joinpath, normpath 33from filecmp import cmp as filecmp 34from shutil import copy 35 36def nfspath(dir): 37 if dir.startswith('/.automount/'): 38 dir = '/n/%s' % dir[12:] 39 elif not dir.startswith('/n/'): 40 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir) 41 return dir 42 43def syncdir(srcdir, destdir): 44 srcdir = normpath(srcdir) 45 destdir = normpath(destdir) 46 if not isdir(destdir): 47 sys.exit('destination directory "%s" does not exist' % destdir) 48 49 for root, dirs, files in os.walk(srcdir): 50 root = normpath(root) 51 prefix = os.path.commonprefix([root, srcdir]) 52 root = root[len(prefix):] 53 if root.startswith('/'): 54 root = root[1:] 55 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']: 56 dirs.remove(rem) 57 58 for entry in dirs: 59 newdir = joinpath(destdir, root, entry) 60 if not isdir(newdir): 61 os.mkdir(newdir) 62 print 'mkdir', newdir 63 64 for i,d in enumerate(dirs): 65 if islink(joinpath(srcdir, root, d)): 66 dirs[i] = joinpath(d, '.') 67 68 for entry in files: 69 dest = normpath(joinpath(destdir, root, entry)) 70 src = normpath(joinpath(srcdir, root, entry)) 71 if not isfile(dest) or not filecmp(src, dest): 72 print 'copy %s %s' % (dest, src) 73 copy(src, dest) 74 75progpath = nfspath(sys.path[0]) 76progname = basename(sys.argv[0]) 77usage = """\ 78Usage: 79 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp> 80 -c clean directory if job can be run 81 -C submit the checkpointing runs 82 -d Make jobs be dependent on the completion of the checkpoint runs 83 -e only echo pbs command info, don't actually send the job 84 -f force the job to run regardless of state 85 -q <queue> submit job to the named queue 86 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 87 -v be verbose 88 89 %(progname)s [-j <jobfile>] -l [-v] <regexp> 90 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 91 -l list job names, don't submit 92 -v be verbose (list job parameters) 93 94 %(progname)s -h 95 -h display this help 96""" % locals() 97 98try: 99 import getopt 100 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v') 101except getopt.GetoptError: 102 sys.exit(usage) 103 104depend = False 105clean = False 106onlyecho = False 107exprs = [] 108force = False 109listonly = False 110queue = '' 111verbose = False 112jfile = 'Test.py' 113docpts = False 114doruns = True 115runflag = False 116node_type = 'FAST' 117update = True 118 119for opt,arg in opts: 120 if opt == '-C': 121 docpts = True 122 if opt == '-c': 123 clean = True 124 if opt == '-d': 125 depend = True 126 if opt == '-e': 127 onlyecho = True 128 if opt == '-f': 129 force = True 130 if opt == '-h': 131 print usage 132 sys.exit(0) 133 if opt == '-j': 134 jfile = arg 135 if opt == '-l': 136 listonly = True 137 if opt == '-n': 138 update = False 139 if opt == '-q': 140 queue = arg 141 if opt == '-R': 142 runflag = True 143 if opt == '-t': 144 node_type = arg 145 if opt == '-v': 146 verbose = True 147 148if docpts: 149 doruns = runflag 150 151for arg in args: 152 exprs.append(re.compile(arg)) 153 154import jobfile, batch 155from job import JobDir, date 156 157conf = jobfile.JobFile(jfile) 158 159if update and not listonly and not onlyecho and isdir(conf.linkdir): 160 if verbose: 161 print 'Checking for outdated files in Link directory' 162 if not isdir(conf.basedir): 163 os.mkdir(conf.basedir) 164 syncdir(conf.linkdir, conf.basedir) 165 166jobnames = {} 167joblist = [] 168 169if docpts and doruns: 170 gen = conf.alljobs() 171elif docpts: 172 gen = conf.checkpoints() 173elif doruns: 174 gen = conf.jobs() 175 176for job in gen: 177 if job.name in jobnames: 178 continue 179 180 if exprs: 181 for expr in exprs: 182 if expr.match(job.name): 183 joblist.append(job) 184 break 185 else: 186 joblist.append(job) 187 188if listonly: 189 if verbose: 190 for job in joblist: 191 job.printinfo() 192 else: 193 for job in joblist: 194 print job.name 195 sys.exit(0) 196 197if not onlyecho: 198 newlist = [] 199 for job in joblist: 200 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 201 if jobdir.exists(): 202 if not force: 203 status = jobdir.getstatus() 204 if status == 'queued': 205 continue 206 207 if status == 'running': 208 continue 209 210 if status == 'success': 211 continue 212 213 if not clean: 214 sys.exit('job directory %s not clean!' % jobdir) 215 216 jobdir.clean() 217 newlist.append(job) 218 joblist = newlist 219 220class NameHack(object): 221 def __init__(self, host='pbs.pool', port=24465): 222 self.host = host 223 self.port = port 224 self.socket = None 225 226 def setname(self, jobid, jobname): 227 try: 228 jobid = int(jobid) 229 except ValueError: 230 jobid = int(jobid.strip().split('.')[0]) 231 232 jobname = jobname.strip() 233 # since pbs can handle jobnames of 15 characters or less, 234 # don't use the raj hack. 235 if len(jobname) <= 15: 236 return 237 238 if self.socket is None: 239 import socket 240 self.socket = socket.socket() 241 # Connect to pbs.pool and send the jobid/jobname pair to port 242 # 24465 (Raj didn't realize that there are only 64k ports and 243 # setup inetd to point to port 90001) 244 self.socket.connect((self.host, self.port)) 245 246 self.socket.send("%s %s\n" % (jobid, jobname)) 247 248namehack = NameHack() 249 250rootdir = conf.rootdir 251script = joinpath(rootdir, 'Base', 'job.py') 252 253for job in joblist: 254 jobdir = JobDir(joinpath(rootdir, job.name)) 255 if depend: 256 cptdir = JobDir(joinpath(rootdir, job.checkpoint.name)) 257 path = str(cptdir) 258 if not isdir(path) or not isfile(joinpath(path, '.success')): 259 continue 260 261 cptjob = cptdir.readval('.batch_jobid') 262 263 if not onlyecho: 264 jobdir.create() 265 os.chdir(str(jobdir)) 266 os.environ['PWD'] = str(jobdir) 267 268 print 'Job name: %s' % job.name 269 print 'Job directory: %s' % jobdir 270 271 272 qsub = batch.oarsub() 273 qsub.oarhost = 'poolfs.eecs.umich.edu' 274 #qsub.stdout = jobdir.file('jobout') 275 qsub.name = job.name 276 qsub.walltime = '50' 277 #qsub.join = True 278 #qsub.node_type = node_type 279 #qsub.env['ROOTDIR'] = conf.rootdir 280 #qsub.env['JOBNAME'] = job.name 281 #if depend: 282 # qsub.afterok = cptjob 283 #if queue: 284 # qsub.queue = queue 285 qsub.properties = "64bit = 'Yes' or 64bit = 'No'" 286 qsub.build(script) 287 288 if verbose: 289 print 'cwd: %s' % qsub.command 290 print 'PBS Command: %s' % qsub.command 291 292 if not onlyecho: 293 ec = qsub.do() 294 if ec == 0: 295 jobid = qsub.result 296 print 'OAR Jobid: %s' % jobid 297 #namehack.setname(jobid, job.name) 298 queued = date() 299 jobdir.echofile('.batch_jobid', jobid) 300 jobdir.echofile('.batch_jobname', job.name) 301 jobdir.echofile('.queued', queued) 302 jobdir.setstatus('queued on %s' % queued) 303 else: 304 print 'OAR Failed' 305 print 306 print 307