send.py revision 1916
1#!/usr/bin/env python 2# Copyright (c) 2005 The Regents of The University of Michigan 3# All rights reserved. 4# 5# Redistribution and use in source and binary forms, with or without 6# modification, are permitted provided that the following conditions are 7# met: redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer; 9# redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution; 12# neither the name of the copyright holders nor the names of its 13# contributors may be used to endorse or promote products derived from 14# this software without specific prior written permission. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27# 28# Authors: Ali Saidi 29# Nathan Binkert 30 31import os, os.path, re, socket, sys 32from os import environ as env, listdir 33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath 34from filecmp import cmp as filecmp 35from shutil import copy 36 37def nfspath(dir): 38 if dir.startswith('/.automount/'): 39 dir = '/n/%s' % dir[12:] 40 elif not dir.startswith('/n/'): 41 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir) 42 return dir 43 44def syncdir(srcdir, destdir): 45 srcdir = normpath(srcdir) 46 destdir = normpath(destdir) 47 if not isdir(destdir): 48 sys.exit('destination directory "%s" does not exist' % destdir) 49 50 for root, dirs, files in os.walk(srcdir): 51 root = normpath(root) 52 prefix = os.path.commonprefix([root, srcdir]) 53 root = root[len(prefix):] 54 if root.startswith('/'): 55 root = root[1:] 56 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']: 57 dirs.remove(rem) 58 59 for entry in dirs: 60 newdir = joinpath(destdir, root, entry) 61 if not isdir(newdir): 62 os.mkdir(newdir) 63 print 'mkdir', newdir 64 65 for i,d in enumerate(dirs): 66 if islink(joinpath(srcdir, root, d)): 67 dirs[i] = joinpath(d, '.') 68 69 for entry in files: 70 dest = normpath(joinpath(destdir, root, entry)) 71 src = normpath(joinpath(srcdir, root, entry)) 72 if not isfile(dest) or not filecmp(src, dest): 73 print 'copy %s %s' % (dest, src) 74 copy(src, dest) 75 76progpath = nfspath(sys.path[0]) 77progname = basename(sys.argv[0]) 78usage = """\ 79Usage: 80 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp> 81 -c clean directory if job can be run 82 -e only echo pbs command info, don't actually send the job 83 -f force the job to run regardless of state 84 -q <queue> submit job to the named queue 85 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 86 -v be verbose 87 88 %(progname)s [-j <jobfile>] -l [-v] <regexp> 89 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 90 -l list job names, don't submit 91 -v be verbose (list job parameters) 92 93 %(progname)s -h 94 -h display this help 95""" % locals() 96 97try: 98 import getopt 99 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lq:Rt:v') 100except getopt.GetoptError: 101 sys.exit(usage) 102 103depend = False 104clean = False 105onlyecho = False 106exprs = [] 107force = False 108listonly = False 109queue = '' 110verbose = False 111jfile = 'Test.py' 112docpts = False 113doruns = True 114runflag = False 115node_type = 'FAST' 116 117for opt,arg in opts: 118 if opt == '-C': 119 docpts = True 120 if opt == '-c': 121 clean = True 122 if opt == '-d': 123 depend = True 124 if opt == '-e': 125 onlyecho = True 126 if opt == '-f': 127 force = True 128 if opt == '-h': 129 print usage 130 sys.exit(0) 131 if opt == '-j': 132 jfile = arg 133 if opt == '-l': 134 listonly = True 135 if opt == '-q': 136 queue = arg 137 if opt == '-R': 138 runflag = True 139 if opt == '-t': 140 node_type = arg 141 if opt == '-v': 142 verbose = True 143 144if docpts: 145 doruns = runflag 146 147for arg in args: 148 exprs.append(re.compile(arg)) 149 150import jobfile, pbs 151from job import JobDir, date 152 153conf = jobfile.JobFile(jfile) 154 155if not listonly and not onlyecho and isdir(conf.linkdir): 156 if verbose: 157 print 'Checking for outdated files in Link directory' 158 if not isdir(conf.basedir): 159 os.mkdir(conf.basedir) 160 syncdir(conf.linkdir, conf.basedir) 161 162jobnames = {} 163joblist = [] 164 165if docpts and doruns: 166 gen = conf.alljobs() 167elif docpts: 168 gen = conf.checkpoints() 169elif doruns: 170 gen = conf.jobs() 171 172for job in gen: 173 if job.name in jobnames: 174 continue 175 176 if exprs: 177 for expr in exprs: 178 if expr.match(job.name): 179 joblist.append(job) 180 break 181 else: 182 joblist.append(job) 183 184if listonly: 185 if verbose: 186 for job in joblist: 187 job.printinfo() 188 else: 189 for job in joblist: 190 print job.name 191 sys.exit(0) 192 193if not onlyecho: 194 newlist = [] 195 for job in joblist: 196 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 197 if jobdir.exists(): 198 if not force: 199 status = jobdir.getstatus() 200 if status == 'queued': 201 continue 202 203 if status == 'running': 204 continue 205 206 if status == 'success': 207 continue 208 209 if not clean: 210 sys.exit('job directory %s not clean!' % jobdir) 211 212 jobdir.clean() 213 newlist.append(job) 214 joblist = newlist 215 216class NameHack(object): 217 def __init__(self, host='pbs.pool', port=24465): 218 self.host = host 219 self.port = port 220 self.socket = None 221 222 def setname(self, jobid, jobname): 223 try: 224 jobid = int(jobid) 225 except ValueError: 226 jobid = int(jobid.strip().split('.')[0]) 227 228 jobname = jobname.strip() 229 # since pbs can handle jobnames of 15 characters or less, 230 # don't use the raj hack. 231 if len(jobname) <= 15: 232 return 233 234 if self.socket is None: 235 import socket 236 self.socket = socket.socket() 237 # Connect to pbs.pool and send the jobid/jobname pair to port 238 # 24465 (Raj didn't realize that there are only 64k ports and 239 # setup inetd to point to port 90001) 240 self.socket.connect((self.host, self.port)) 241 242 self.socket.send("%s %s\n" % (jobid, jobname)) 243 244namehack = NameHack() 245 246for job in joblist: 247 jobdir = JobDir(joinpath(conf.rootdir, job.name)) 248 if depend: 249 cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name)) 250 cptjob = cptdir.readval('.pbs_jobid') 251 252 if not onlyecho: 253 jobdir.create() 254 255 print 'Job name: %s' % job.name 256 print 'Job directory: %s' % jobdir 257 258 qsub = pbs.qsub() 259 qsub.pbshost = 'simpool.eecs.umich.edu' 260 qsub.stdout = jobdir.file('jobout') 261 qsub.name = job.name[:15] 262 qsub.join = True 263 qsub.node_type = node_type 264 qsub.env['ROOTDIR'] = conf.rootdir 265 qsub.env['JOBNAME'] = job.name 266 if depend: 267 qsub.afterok = cptjob 268 if queue: 269 qsub.queue = queue 270 qsub.build(joinpath(progpath, 'job.py')) 271 272 if verbose: 273 print 'PBS Command: %s' % qsub.command 274 275 if not onlyecho: 276 ec = qsub.do() 277 if ec == 0: 278 jobid = qsub.result 279 print 'PBS Jobid: %s' % jobid 280 namehack.setname(jobid, job.name) 281 queued = date() 282 jobdir.echofile('.pbs_jobid', jobid) 283 jobdir.echofile('.pbs_jobname', job.name) 284 jobdir.echofile('.queued', queued) 285 jobdir.setstatus('queued on %s' % queued) 286 else: 287 print 'PBS Failed' 288