send.py revision 1948
1#!/usr/bin/env python 2# Copyright (c) 2005 The Regents of The University of Michigan 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are 7# met: redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer; 9# redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution; 12# neither the name of the copyright holders nor the names of its 13# contributors may be used to endorse or promote products derived from 14# this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27# 28# Authors: Ali Saidi 29# Nathan Binkert 30 31import os, os.path, re, socket, sys 32from os import environ as env, listdir 33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath 34from filecmp import cmp as filecmp 35from shutil import copy 36 37def nfspath(dir): 38 if dir.startswith('/.automount/'): 39 dir = '/n/%s' % dir[12:] 40 elif not dir.startswith('/n/'): 41 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir) 42 return dir 43 44def syncdir(srcdir, destdir): 45 srcdir = normpath(srcdir) 46 destdir = normpath(destdir) 47 if not isdir(destdir): 48 sys.exit('destination directory "%s" does not exist' % destdir) 49 50 for root, dirs, files in os.walk(srcdir): 51 root = normpath(root) 52 prefix = os.path.commonprefix([root, srcdir]) 53 root = root[len(prefix):] 54 if root.startswith('/'): 55 root = root[1:] 56 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']: 57 dirs.remove(rem) 58 59 for entry in dirs: 60 newdir = joinpath(destdir, root, entry) 61 if not isdir(newdir): 62 os.mkdir(newdir) 63 print 'mkdir', newdir 64 65 for i,d in enumerate(dirs): 66 if islink(joinpath(srcdir, root, d)): 67 dirs[i] = joinpath(d, '.') 68 69 for entry in files: 70 dest = normpath(joinpath(destdir, root, entry)) 71 src = normpath(joinpath(srcdir, root, entry)) 72 if not isfile(dest) or not filecmp(src, dest): 73 print 'copy %s %s' % (dest, src) 74 copy(src, dest) 75 76progpath = nfspath(sys.path[0]) 77progname = basename(sys.argv[0]) 78usage = """\ 79Usage: 80 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp> 81 -c clean directory if job can be run 82 -e only echo pbs command info, don't actually send the job 83 -f force the job to run regardless of state 84 -q <queue> submit job to the named queue 85 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 86 -v be verbose 87 88 %(progname)s [-j <jobfile>] -l [-v] <regexp> 89 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 90 -l list job names, don't submit 91 -v be verbose (list job parameters) 92 93 %(progname)s -h 94 -h display this help 95""" % locals() 96 97try: 98 import getopt 99 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v') 100except getopt.GetoptError: 101 sys.exit(usage) 102 103depend = False 104clean = False 105onlyecho = False 106exprs = [] 107force = False 108listonly = False 109queue = '' 110verbose = False 111jfile = 'Test.py' 112docpts = False 113doruns = True 114runflag = False 115node_type = 'FAST' 116update = True 117 118for opt,arg in opts: 119 if opt == '-C': 120 docpts = True 121 if opt == '-c': 122 clean = True 123 if opt == '-d': 124 depend = True 125 if opt == '-e': 126 onlyecho = True 127 if opt == '-f': 128 force = True 129 if opt == '-h': 130 print usage 131 sys.exit(0) 132 if opt == '-j': 133 jfile = arg 134 if opt == '-l': 135 listonly = True 136 if opt == '-n': 137 update = False 138 if opt == '-q': 139 queue = arg 140 if opt == '-R': 141 runflag = True 142 if opt == '-t': 143 node_type = arg 144 if opt == '-v': 145 verbose = True 146 147if docpts: 148 doruns = runflag 149 150for arg in args: 151 exprs.append(re.compile(arg)) 152 153import jobfile, pbs 154from job import JobDir, date 155 156conf = jobfile.JobFile(jfile) 157 158if update and not listonly and not onlyecho and isdir(conf.linkdir): 159 if verbose: 160 print 'Checking for outdated files in Link directory' 161 if not isdir(conf.basedir): 162 os.mkdir(conf.basedir) 163 syncdir(conf.linkdir, conf.basedir) 164 165jobnames = {} 166joblist = [] 167 168if docpts and doruns: 169 gen = conf.alljobs() 170elif docpts: 171 gen = conf.checkpoints() 172elif doruns: 173 gen = conf.jobs() 174 175for job in gen: 176 if job.name in jobnames: 177 continue 178 179 if exprs: 180 for expr in exprs: 181 if expr.match(job.name): 182 joblist.append(job) 183 break 184 else: 185 joblist.append(job) 186 187if listonly: 188 if verbose: 189 for job in joblist: 190 job.printinfo() 191 else: 192 for job in joblist: 193 print job.name 194 sys.exit(0) 195 196if not onlyecho: 197 newlist = [] 198 for job in joblist: 199 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 200 if jobdir.exists(): 201 if not force: 202 status = jobdir.getstatus() 203 if status == 'queued': 204 continue 205 206 if status == 'running': 207 continue 208 209 if status == 'success': 210 continue 211 212 if not clean: 213 sys.exit('job directory %s not clean!' % jobdir) 214 215 jobdir.clean() 216 newlist.append(job) 217 joblist = newlist 218 219class NameHack(object): 220 def __init__(self, host='pbs.pool', port=24465): 221 self.host = host 222 self.port = port 223 self.socket = None 224 225 def setname(self, jobid, jobname): 226 try: 227 jobid = int(jobid) 228 except ValueError: 229 jobid = int(jobid.strip().split('.')[0]) 230 231 jobname = jobname.strip() 232 # since pbs can handle jobnames of 15 characters or less, 233 # don't use the raj hack. 234 if len(jobname) <= 15: 235 return 236 237 if self.socket is None: 238 import socket 239 self.socket = socket.socket() 240 # Connect to pbs.pool and send the jobid/jobname pair to port 241 # 24465 (Raj didn't realize that there are only 64k ports and 242 # setup inetd to point to port 90001) 243 self.socket.connect((self.host, self.port)) 244 245 self.socket.send("%s %s\n" % (jobid, jobname)) 246 247namehack = NameHack() 248 249for job in joblist: 250 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 251 if depend: 252 cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name)) 253 cptjob = cptdir.readval('.pbs_jobid') 254 255 if not onlyecho: 256 jobdir.create() 257 258 print 'Job name: %s' % job.name 259 print 'Job directory: %s' % jobdir 260 261 qsub = pbs.qsub() 262 qsub.pbshost = 'simpool.eecs.umich.edu' 263 qsub.stdout = jobdir.file('jobout') 264 qsub.name = job.name[:15] 265 qsub.join = True 266 qsub.node_type = node_type 267 qsub.env['ROOTDIR'] = conf.rootdir 268 qsub.env['JOBNAME'] = job.name 269 if depend: 270 qsub.afterok = cptjob 271 if queue: 272 qsub.queue = queue 273 qsub.build(joinpath(progpath, 'job.py')) 274 275 if verbose: 276 print 'PBS Command: %s' % qsub.command 277 278 if not onlyecho: 279 ec = qsub.do() 280 if ec == 0: 281 jobid = qsub.result 282 print 'PBS Jobid: %s' % jobid 283 namehack.setname(jobid, job.name) 284 queued = date() 285 jobdir.echofile('.pbs_jobid', jobid) 286 jobdir.echofile('.pbs_jobname', job.name) 287 jobdir.echofile('.queued', queued) 288 jobdir.setstatus('queued on %s' % queued) 289 else: 290 print 'PBS Failed' 291