send.py revision 13540
1#!/usr/bin/env python2.7 2# Copyright (c) 2005 The Regents of The University of Michigan 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are 7# met: redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer; 9# redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution; 12# neither the name of the copyright holders nor the names of its 13# contributors may be used to endorse or promote products derived from 14# this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27# 28# Authors: Ali Saidi 29# Nathan Binkert 30 31import os, os.path, re, socket, sys 32from os import environ as env, listdir 33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath 34from filecmp import cmp as filecmp 35from shutil import copy 36 37def nfspath(dir): 38 if dir.startswith('/.automount/'): 39 dir = '/n/%s' % dir[12:] 40 elif not dir.startswith('/n/'): 41 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir) 42 return dir 43 44def syncdir(srcdir, destdir): 45 srcdir = normpath(srcdir) 46 destdir = normpath(destdir) 47 if not isdir(destdir): 48 sys.exit('destination directory "%s" does not exist' % destdir) 49 50 for root, dirs, files in os.walk(srcdir): 51 root = normpath(root) 52 prefix = os.path.commonprefix([root, srcdir]) 53 root = root[len(prefix):] 54 if root.startswith('/'): 55 root = root[1:] 56 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']: 57 dirs.remove(rem) 58 59 for entry in dirs: 60 newdir = joinpath(destdir, root, entry) 61 if not isdir(newdir): 62 os.mkdir(newdir) 63 print 'mkdir', newdir 64 65 for i,d in enumerate(dirs): 66 if islink(joinpath(srcdir, root, d)): 67 dirs[i] = joinpath(d, '.') 68 69 for entry in files: 70 dest = normpath(joinpath(destdir, root, entry)) 71 src = normpath(joinpath(srcdir, root, entry)) 72 if not isfile(dest) or not filecmp(src, dest): 73 print 'copy %s %s' % (dest, src) 74 copy(src, dest) 75 76progpath = nfspath(sys.path[0]) 77progname = basename(sys.argv[0]) 78usage = """\ 79Usage: 80 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp> 81 -c clean directory if job can be run 82 -C submit the checkpointing runs 83 -d Make jobs be dependent on the completion of the checkpoint runs 84 -e only echo pbs command info, don't actually send the job 85 -f force the job to run regardless of state 86 -q <queue> submit job to the named queue 87 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 88 -v be verbose 89 90 %(progname)s [-j <jobfile>] -l [-v] <regexp> 91 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 92 -l list job names, don't submit 93 -v be verbose (list job parameters) 94 95 %(progname)s -h 96 -h display this help 97""" % locals() 98 99try: 100 import getopt 101 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v') 102except getopt.GetoptError: 103 sys.exit(usage) 104 105depend = False 106clean = False 107onlyecho = False 108exprs = [] 109force = False 110listonly = False 111queue = '' 112verbose = False 113jfile = 'Test.py' 114docpts = False 115doruns = True 116runflag = False 117node_type = 'FAST' 118update = True 119 120for opt,arg in opts: 121 if opt == '-C': 122 docpts = True 123 if opt == '-c': 124 clean = True 125 if opt == '-d': 126 depend = True 127 if opt == '-e': 128 onlyecho = True 129 if opt == '-f': 130 force = True 131 if opt == '-h': 132 print usage 133 sys.exit(0) 134 if opt == '-j': 135 jfile = arg 136 if opt == '-l': 137 listonly = True 138 if opt == '-n': 139 update = False 140 if opt == '-q': 141 queue = arg 142 if opt == '-R': 143 runflag = True 144 if opt == '-t': 145 node_type = arg 146 if opt == '-v': 147 verbose = True 148 149if docpts: 150 doruns = runflag 151 152for arg in args: 153 exprs.append(re.compile(arg)) 154 155import jobfile, pbs 156from job import JobDir, date 157 158conf = jobfile.JobFile(jfile) 159 160if update and not listonly and not onlyecho and isdir(conf.linkdir): 161 if verbose: 162 print 'Checking for outdated files in Link directory' 163 if not isdir(conf.basedir): 164 os.mkdir(conf.basedir) 165 syncdir(conf.linkdir, conf.basedir) 166 167jobnames = {} 168joblist = [] 169 170if docpts and doruns: 171 gen = conf.alljobs() 172elif docpts: 173 gen = conf.checkpoints() 174elif doruns: 175 gen = conf.jobs() 176 177for job in gen: 178 if job.name in jobnames: 179 continue 180 181 if exprs: 182 for expr in exprs: 183 if expr.match(job.name): 184 joblist.append(job) 185 break 186 else: 187 joblist.append(job) 188 189if listonly: 190 if verbose: 191 for job in joblist: 192 job.printinfo() 193 else: 194 for job in joblist: 195 print job.name 196 sys.exit(0) 197 198if not onlyecho: 199 newlist = [] 200 for job in joblist: 201 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 202 if jobdir.exists(): 203 if not force: 204 status = jobdir.getstatus() 205 if status == 'queued': 206 continue 207 208 if status == 'running': 209 continue 210 211 if status == 'success': 212 continue 213 214 if not clean: 215 sys.exit('job directory %s not clean!' % jobdir) 216 217 jobdir.clean() 218 newlist.append(job) 219 joblist = newlist 220 221class NameHack(object): 222 def __init__(self, host='pbs.pool', port=24465): 223 self.host = host 224 self.port = port 225 self.socket = None 226 227 def setname(self, jobid, jobname): 228 try: 229 jobid = int(jobid) 230 except ValueError: 231 jobid = int(jobid.strip().split('.')[0]) 232 233 jobname = jobname.strip() 234 # since pbs can handle jobnames of 15 characters or less, 235 # don't use the raj hack. 236 if len(jobname) <= 15: 237 return 238 239 if self.socket is None: 240 import socket 241 self.socket = socket.socket() 242 # Connect to pbs.pool and send the jobid/jobname pair to port 243 # 24465 (Raj didn't realize that there are only 64k ports and 244 # setup inetd to point to port 90001) 245 self.socket.connect((self.host, self.port)) 246 247 self.socket.send("%s %s\n" % (jobid, jobname)) 248 249namehack = NameHack() 250 251for job in joblist: 252 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 253 if depend: 254 cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name)) 255 cptjob = cptdir.readval('.pbs_jobid') 256 257 if not onlyecho: 258 jobdir.create() 259 260 print 'Job name: %s' % job.name 261 print 'Job directory: %s' % jobdir 262 263 qsub = pbs.qsub() 264 qsub.pbshost = 'simpool.eecs.umich.edu' 265 qsub.stdout = jobdir.file('jobout') 266 qsub.name = job.name[:15] 267 qsub.join = True 268 qsub.node_type = node_type 269 qsub.env['ROOTDIR'] = conf.rootdir 270 qsub.env['JOBNAME'] = job.name 271 if depend: 272 qsub.afterok = cptjob 273 if queue: 274 qsub.queue = queue 275 qsub.build(joinpath(progpath, 'job.py')) 276 277 if verbose: 278 print 'PBS Command: %s' % qsub.command 279 280 if not onlyecho: 281 ec = qsub.do() 282 if ec == 0: 283 jobid = qsub.result 284 print 'PBS Jobid: %s' % jobid 285 namehack.setname(jobid, job.name) 286 queued = date() 287 jobdir.echofile('.pbs_jobid', jobid) 288 jobdir.echofile('.pbs_jobname', job.name) 289 jobdir.echofile('.queued', queued) 290 jobdir.setstatus('queued on %s' % queued) 291 else: 292 print 'PBS Failed' 293