send.py revision 1916
11689SN/A#!/usr/bin/env python 22326SN/A# Copyright (c) 2005 The Regents of The University of Michigan 31689SN/A# All rights reserved. 41689SN/A# 51689SN/A# Redistribution and use in source and binary forms, with or without 61689SN/A# modification, are permitted provided that the following conditions are 71689SN/A# met: redistributions of source code must retain the above copyright 81689SN/A# notice, this list of conditions and the following disclaimer; 91689SN/A# redistributions in binary form must reproduce the above copyright 101689SN/A# notice, this list of conditions and the following disclaimer in the 111689SN/A# documentation and/or other materials provided with the distribution; 121689SN/A# neither the name of the copyright holders nor the names of its 131689SN/A# contributors may be used to endorse or promote products derived from 141689SN/A# this software without specific prior written permission. 151689SN/A# 161689SN/A# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 171689SN/A# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 181689SN/A# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 191689SN/A# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 201689SN/A# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 211689SN/A# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 221689SN/A# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 231689SN/A# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 241689SN/A# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 251689SN/A# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 261689SN/A# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 272665Ssaidi@eecs.umich.edu# 282665Ssaidi@eecs.umich.edu# Authors: Ali Saidi 292831Sksewell@umich.edu# Nathan Binkert 301689SN/A 311689SN/Aimport os, os.path, re, socket, sys 322064SN/Afrom os import environ as env, listdir 331060SN/Afrom os.path import basename, isdir, isfile, islink, join as joinpath, normpath 341060SN/Afrom filecmp import cmp as filecmp 351696SN/Afrom shutil import copy 361689SN/A 372292SN/Adef nfspath(dir): 381717SN/A if dir.startswith('/.automount/'): 391060SN/A dir = '/n/%s' % dir[12:] 401061SN/A elif not dir.startswith('/n/'): 412292SN/A dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir) 422292SN/A return dir 432292SN/A 442292SN/Adef syncdir(srcdir, destdir): 452326SN/A srcdir = normpath(srcdir) 461060SN/A destdir = normpath(destdir) 472292SN/A if not isdir(destdir): 482292SN/A sys.exit('destination directory "%s" does not exist' % destdir) 492292SN/A 502292SN/A for root, dirs, files in os.walk(srcdir): 512292SN/A root = normpath(root) 522292SN/A prefix = os.path.commonprefix([root, srcdir]) 532292SN/A root = root[len(prefix):] 542326SN/A if root.startswith('/'): 552292SN/A root = root[1:] 562292SN/A for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']: 572292SN/A dirs.remove(rem) 582292SN/A 592292SN/A for entry in dirs: 602292SN/A newdir = joinpath(destdir, root, entry) 612292SN/A if not isdir(newdir): 622292SN/A os.mkdir(newdir) 632292SN/A print 'mkdir', newdir 642292SN/A 652292SN/A for i,d in enumerate(dirs): 662292SN/A if islink(joinpath(srcdir, root, d)): 672292SN/A dirs[i] = joinpath(d, '.') 682669Sktlim@umich.edu 692292SN/A for entry in files: 702292SN/A dest = normpath(joinpath(destdir, root, entry)) 712292SN/A src = normpath(joinpath(srcdir, root, entry)) 722292SN/A if not isfile(dest) or not filecmp(src, dest): 732292SN/A print 'copy %s %s' % (dest, src) 742292SN/A copy(src, dest) 752292SN/A 762292SN/Aprogpath = nfspath(sys.path[0]) 772307SN/Aprogname = basename(sys.argv[0]) 782307SN/Ausage = """\ 792292SN/AUsage: 801060SN/A %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp> 811060SN/A -c clean directory if job can be run 821060SN/A -e only echo pbs command info, don't actually send the job 831060SN/A -f force the job to run regardless of state 842292SN/A -q <queue> submit job to the named queue 851060SN/A -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 861060SN/A -v be verbose 871060SN/A 882326SN/A %(progname)s [-j <jobfile>] -l [-v] <regexp> 891060SN/A -j <jobfile> specify the jobfile (default is <rootdir>/Test.py) 901060SN/A -l list job names, don't submit 911060SN/A -v be verbose (list job parameters) 921060SN/A 932292SN/A %(progname)s -h 942292SN/A -h display this help 952292SN/A""" % locals() 962292SN/A 971060SN/Atry: 981060SN/A import getopt 992307SN/A opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lq:Rt:v') 1002292SN/Aexcept getopt.GetoptError: 1012980Sgblack@eecs.umich.edu sys.exit(usage) 1022292SN/A 1032292SN/Adepend = False 1042292SN/Aclean = False 1052292SN/Aonlyecho = False 1062292SN/Aexprs = [] 1072292SN/Aforce = False 1082292SN/Alistonly = False 1092292SN/Aqueue = '' 1102292SN/Averbose = False 1112292SN/Ajfile = 'Test.py' 1122292SN/Adocpts = False 1132292SN/Adoruns = True 1142292SN/Arunflag = False 1152292SN/Anode_type = 'FAST' 1162292SN/A 1172292SN/Afor opt,arg in opts: 1182292SN/A if opt == '-C': 1192292SN/A docpts = True 1202292SN/A if opt == '-c': 1212292SN/A clean = True 1222292SN/A if opt == '-d': 1232292SN/A depend = True 1242292SN/A if opt == '-e': 1252292SN/A onlyecho = True 1262292SN/A if opt == '-f': 1272831Sksewell@umich.edu force = True 1282292SN/A if opt == '-h': 1292292SN/A print usage 1302292SN/A sys.exit(0) 1312292SN/A if opt == '-j': 1322292SN/A jfile = arg 1332292SN/A if opt == '-l': 1342292SN/A listonly = True 1352292SN/A if opt == '-q': 1362292SN/A queue = arg 1372292SN/A if opt == '-R': 1382292SN/A runflag = True 1392292SN/A if opt == '-t': 1402292SN/A node_type = arg 1412292SN/A if opt == '-v': 1422831Sksewell@umich.edu verbose = True 1432292SN/A 1442292SN/Aif docpts: 1452292SN/A doruns = runflag 1462292SN/A 1472292SN/Afor arg in args: 1482292SN/A exprs.append(re.compile(arg)) 1492292SN/A 1502292SN/Aimport jobfile, pbs 1512292SN/Afrom job import JobDir, date 1522292SN/A 1532326SN/Aconf = jobfile.JobFile(jfile) 1542348SN/A 1552326SN/Aif not listonly and not onlyecho and isdir(conf.linkdir): 1562326SN/A if verbose: 1572348SN/A print 'Checking for outdated files in Link directory' 1582292SN/A if not isdir(conf.basedir): 1592292SN/A os.mkdir(conf.basedir) 1602292SN/A syncdir(conf.linkdir, conf.basedir) 1612292SN/A 1622292SN/Ajobnames = {} 1632292SN/Ajoblist = [] 1642292SN/A 1651060SN/Aif docpts and doruns: 1661060SN/A gen = conf.alljobs() 1671061SN/Aelif docpts: 1681060SN/A gen = conf.checkpoints() 1691062SN/Aelif doruns: 1701062SN/A gen = conf.jobs() 1712301SN/A 1721062SN/Afor job in gen: 1731062SN/A if job.name in jobnames: 1741062SN/A continue 1751062SN/A 1761062SN/A if exprs: 1771062SN/A for expr in exprs: 1781062SN/A if expr.match(job.name): 1791062SN/A joblist.append(job) 1801062SN/A break 1811062SN/A else: 1822301SN/A joblist.append(job) 1832301SN/A 1842301SN/Aif listonly: 1852301SN/A if verbose: 1861062SN/A for job in joblist: 1871062SN/A job.printinfo() 1881062SN/A else: 1891062SN/A for job in joblist: 1901062SN/A print job.name 1911062SN/A sys.exit(0) 1921062SN/A 1931062SN/Aif not onlyecho: 1941062SN/A newlist = [] 1951062SN/A for job in joblist: 1961062SN/A jobdir = JobDir(joinpath(conf.rootdir, job.name)) 1971062SN/A if jobdir.exists(): 1981062SN/A if not force: 1991062SN/A status = jobdir.getstatus() 2001062SN/A if status == 'queued': 2011062SN/A continue 2021062SN/A 2031062SN/A if status == 'running': 2041062SN/A continue 2051062SN/A 2061062SN/A if status == 'success': 2071062SN/A continue 2081062SN/A 2091062SN/A if not clean: 2101062SN/A sys.exit('job directory %s not clean!' % jobdir) 2111062SN/A 2121062SN/A jobdir.clean() 2131062SN/A newlist.append(job) 2141062SN/A joblist = newlist 2151062SN/A 2161062SN/Aclass NameHack(object): 2171062SN/A def __init__(self, host='pbs.pool', port=24465): 2181062SN/A self.host = host 2191062SN/A self.port = port 2201062SN/A self.socket = None 2211062SN/A 2221062SN/A def setname(self, jobid, jobname): 2231062SN/A try: 2241062SN/A jobid = int(jobid) 2251062SN/A except ValueError: 2261062SN/A jobid = int(jobid.strip().split('.')[0]) 2271062SN/A 2281062SN/A jobname = jobname.strip() 2291062SN/A # since pbs can handle jobnames of 15 characters or less, 2301062SN/A # don't use the raj hack. 2311062SN/A if len(jobname) <= 15: 2321062SN/A return 2331062SN/A 2342326SN/A if self.socket is None: 2352301SN/A import socket 2362301SN/A self.socket = socket.socket() 2372301SN/A # Connect to pbs.pool and send the jobid/jobname pair to port 2382301SN/A # 24465 (Raj didn't realize that there are only 64k ports and 2392301SN/A # setup inetd to point to port 90001) 2402301SN/A self.socket.connect((self.host, self.port)) 2412326SN/A 2422301SN/A self.socket.send("%s %s\n" % (jobid, jobname)) 2432326SN/A 2442307SN/Anamehack = NameHack() 2452301SN/A 2462301SN/Afor job in joblist: 2472307SN/A jobdir = JobDir(joinpath(conf.rootdir, job.name)) 2482301SN/A if depend: 2492301SN/A cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name)) 2502301SN/A cptjob = cptdir.readval('.pbs_jobid') 2512301SN/A 2522301SN/A if not onlyecho: 2532301SN/A jobdir.create() 2542301SN/A 2552301SN/A print 'Job name: %s' % job.name 2562301SN/A print 'Job directory: %s' % jobdir 2572301SN/A 2582301SN/A qsub = pbs.qsub() 2592301SN/A qsub.pbshost = 'simpool.eecs.umich.edu' 2602326SN/A qsub.stdout = jobdir.file('jobout') 2612301SN/A qsub.name = job.name[:15] 2622301SN/A qsub.join = True 2632301SN/A qsub.node_type = node_type 2642301SN/A qsub.env['ROOTDIR'] = conf.rootdir 2652301SN/A qsub.env['JOBNAME'] = job.name 2662326SN/A if depend: 2672301SN/A qsub.afterok = cptjob 2682301SN/A if queue: 2692301SN/A qsub.queue = queue 2702301SN/A qsub.build(joinpath(progpath, 'job.py')) 2712301SN/A 2722326SN/A if verbose: 2732301SN/A print 'PBS Command: %s' % qsub.command 2742301SN/A 2752301SN/A if not onlyecho: 2762301SN/A ec = qsub.do() 2772301SN/A if ec == 0: 2782301SN/A jobid = qsub.result 2792301SN/A print 'PBS Jobid: %s' % jobid 2802980Sgblack@eecs.umich.edu namehack.setname(jobid, job.name) 2812301SN/A queued = date() 2822326SN/A jobdir.echofile('.pbs_jobid', jobid) 2832301SN/A jobdir.echofile('.pbs_jobname', job.name) 2842301SN/A jobdir.echofile('.queued', queued) 2852326SN/A jobdir.setstatus('queued on %s' % queued) 2862301SN/A else: 2872301SN/A print 'PBS Failed' 2882301SN/A