send.py revision 1908
1#!/usr/bin/env python 2# Copyright (c) 2005 The Regents of The University of Michigan 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are 7# met: redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer; 9# redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution; 12# neither the name of the copyright holders nor the names of its 13# contributors may be used to endorse or promote products derived from 14# this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27# 28# Authors: Ali Saidi 29# Nathan Binkert 30 31import os, os.path, re, socket, sys 32from os import environ as env, listdir 33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath 34from filecmp import cmp as filecmp 35from shutil import copy 36 37def nfspath(dir): 38 if dir.startswith('/.automount/'): 39 dir = '/n/%s' % dir[12:] 40 elif not dir.startswith('/n/'): 41 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir) 42 return dir 43 44def syncdir(srcdir, destdir): 45 srcdir = normpath(srcdir) 46 destdir = normpath(destdir) 47 if not isdir(destdir): 48 sys.exit('destination directory "%s" does not exist' % destdir) 49 50 for root, dirs, files in os.walk(srcdir): 51 root = normpath(root) 52 prefix = os.path.commonprefix([root, srcdir]) 53 root = root[len(prefix):] 54 if root.startswith('/'): 55 root = root[1:] 56 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']: 57 dirs.remove(rem) 58 59 for entry in dirs: 60 newdir = joinpath(destdir, root, entry) 61 if not isdir(newdir): 62 os.mkdir(newdir) 63 print 'mkdir', newdir 64 65 for i,d in enumerate(dirs): 66 if islink(joinpath(srcdir, root, d)): 67 dirs[i] = joinpath(d, '.') 68 69 for entry in files: 70 dest = normpath(joinpath(destdir, root, entry)) 71 src = normpath(joinpath(srcdir, root, entry)) 72 if not isfile(dest) or not filecmp(src, dest): 73 print 'copy %s %s' % (dest, src) 74 copy(src, dest) 75 76progpath = nfspath(sys.path[0]) 77progname = basename(sys.argv[0]) 78usage = """\ 79Usage: 80 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp> 81 -c clean directory if job can be run 82 -e only echo pbs command info, don't actually send the job 83 -f force the job to run regardless of state 84 -q <queue> submit job to the named queue 85 -j <jobfile> specify the jobfile (default is <basedir>/test.py) 86 -v be verbose 87 88 %(progname)s [-j <jobfile>] -l [-v] <regexp> 89 -j <jobfile> specify the jobfile (default is <basedir>/test.py) 90 -l list job names, don't submit 91 -v be verbose (list job parameters) 92 93 %(progname)s -h 94 -h display this help 95""" % locals() 96 97try: 98 import getopt 99 opts, args = getopt.getopt(sys.argv[1:], '-CRcd:efhj:lq:v') 100except getopt.GetoptError: 101 sys.exit(usage) 102 103clean = False 104onlyecho = False 105exprs = [] 106force = False 107listonly = False 108queue = '' 109verbose = False 110jfile = 'Base/test.py' 111docpts = False 112doruns = True 113runflag = False 114 115for opt,arg in opts: 116 if opt == '-C': 117 docpts = True 118 if opt == '-R': 119 runflag = True 120 if opt == '-c': 121 clean = True 122 if opt == '-e': 123 onlyecho = True 124 if opt == '-f': 125 force = True 126 if opt == '-h': 127 print usage 128 sys.exit(0) 129 if opt == '-j': 130 jfile = arg 131 if opt == '-l': 132 listonly = True 133 if opt == '-q': 134 queue = arg 135 if opt == '-v': 136 verbose = True 137 138if docpts: 139 doruns = runflag 140 141for arg in args: 142 exprs.append(re.compile(arg)) 143 144import jobfile, pbs 145from job import JobDir, date 146 147conf = jobfile.JobFile(jfile) 148 149if not listonly and not onlyecho and isdir(conf.linkdir): 150 if verbose: 151 print 'Checking for outdated files in Link directory' 152 syncdir(conf.linkdir, conf.basedir) 153 154jobnames = {} 155joblist = [] 156 157if docpts and doruns: 158 gen = conf.alljobs() 159elif docpts: 160 gen = conf.checkpoints() 161elif doruns: 162 gen = conf.jobs() 163 164for job in gen: 165 if job.name in jobnames: 166 continue 167 168 if exprs: 169 for expr in exprs: 170 if expr.match(job.name): 171 joblist.append(job) 172 break 173 else: 174 joblist.append(job) 175 176if listonly: 177 if verbose: 178 for job in joblist: 179 job.printinfo() 180 else: 181 for job in joblist: 182 print job.name 183 sys.exit(0) 184 185if not onlyecho: 186 newlist = [] 187 for job in joblist: 188 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 189 if jobdir.exists(): 190 if not force: 191 status = jobdir.getstatus() 192 if status == 'queued': 193 continue 194 195 if status == 'running': 196 continue 197 198 if status == 'success': 199 continue 200 201 if not clean: 202 sys.exit('job directory %s not clean!' % jobdir) 203 204 jobdir.clean() 205 newlist.append(job) 206 joblist = newlist 207 208class NameHack(object): 209 def __init__(self, host='pbs.pool', port=24465): 210 self.host = host 211 self.port = port 212 self.socket = None 213 214 def setname(self, jobid, jobname): 215 try: 216 jobid = int(jobid) 217 except ValueError: 218 jobid = int(jobid.strip().split('.')[0]) 219 220 jobname = jobname.strip() 221 # since pbs can handle jobnames of 15 characters or less, 222 # don't use the raj hack. 223 if len(jobname) <= 15: 224 return 225 226 if self.socket is None: 227 import socket 228 self.socket = socket.socket() 229 # Connect to pbs.pool and send the jobid/jobname pair to port 230 # 24465 (Raj didn't realize that there are only 64k ports and 231 # setup inetd to point to port 90001) 232 self.socket.connect((self.host, self.port)) 233 234 self.socket.send("%s %s\n" % (jobid, jobname)) 235 236namehack = NameHack() 237 238for job in joblist: 239 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 240 241 if not onlyecho: 242 jobdir.create() 243 244 print 'Job name: %s' % job.name 245 print 'Job directory: %s' % jobdir 246 247 qsub = pbs.qsub() 248 qsub.pbshost = 'simpool.eecs.umich.edu' 249 qsub.stdout = jobdir.file('jobout') 250 qsub.name = job.name[:15] 251 qsub.join = True 252 qsub.node_type = 'FAST' 253 qsub.env['ROOTDIR'] = conf.rootdir 254 qsub.env['JOBNAME'] = job.name 255 if len(queue): 256 qsub.queue = queue 257 qsub.build(joinpath(progpath, 'job.py')) 258 259 if verbose: 260 print 'PBS Command: %s' % qsub.command 261 262 if not onlyecho: 263 ec = qsub.do() 264 if ec == 0: 265 jobid = qsub.result 266 print 'PBS Jobid: %s' % jobid 267 namehack.setname(jobid, job.name) 268 queued = date() 269 jobdir.echofile('.pbs_jobid', jobid) 270 jobdir.echofile('.pbs_jobname', job.name) 271 jobdir.echofile('.queued', queued) 272 jobdir.setstatus('queued on %s' % queued) 273 else: 274 print 'PBS Failed' 275