send.py revision 1948:04aebfd9acf3
12SN/A#!/usr/bin/env python
21762SN/A# Copyright (c) 2005 The Regents of The University of Michigan
32SN/A# All rights reserved.
42SN/A#
52SN/A# Redistribution and use in source and binary forms, with or without
62SN/A# modification, are permitted provided that the following conditions are
72SN/A# met: redistributions of source code must retain the above copyright
82SN/A# notice, this list of conditions and the following disclaimer;
92SN/A# redistributions in binary form must reproduce the above copyright
102SN/A# notice, this list of conditions and the following disclaimer in the
112SN/A# documentation and/or other materials provided with the distribution;
122SN/A# neither the name of the copyright holders nor the names of its
132SN/A# contributors may be used to endorse or promote products derived from
142SN/A# this software without specific prior written permission.
152SN/A#
162SN/A# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
172SN/A# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
182SN/A# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
192SN/A# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
202SN/A# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
212SN/A# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
222SN/A# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
232SN/A# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
242SN/A# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
252SN/A# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
262SN/A# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
272665Ssaidi@eecs.umich.edu#
282665Ssaidi@eecs.umich.edu# Authors: Ali Saidi
292665Ssaidi@eecs.umich.edu#          Nathan Binkert
302SN/A
312SN/Aimport os, os.path, re, socket, sys
322SN/Afrom os import environ as env, listdir
332SN/Afrom os.path import basename, isdir, isfile, islink, join as joinpath, normpath
342SN/Afrom filecmp import cmp as filecmp
352SN/Afrom shutil import copy
362SN/A
372SN/Adef nfspath(dir):
382SN/A    if dir.startswith('/.automount/'):
392SN/A        dir = '/n/%s' % dir[12:]
402SN/A    elif not dir.startswith('/n/'):
412SN/A        dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
422SN/A    return dir
432SN/A
442SN/Adef syncdir(srcdir, destdir):
452SN/A    srcdir = normpath(srcdir)
462SN/A    destdir = normpath(destdir)
472SN/A    if not isdir(destdir):
482SN/A        sys.exit('destination directory "%s" does not exist' % destdir)
492SN/A
502SN/A    for root, dirs, files in os.walk(srcdir):
512SN/A        root = normpath(root)
522SN/A        prefix = os.path.commonprefix([root, srcdir])
53        root = root[len(prefix):]
54        if root.startswith('/'):
55            root = root[1:]
56        for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57            dirs.remove(rem)
58
59        for entry in dirs:
60            newdir = joinpath(destdir, root, entry)
61            if not isdir(newdir):
62                os.mkdir(newdir)
63                print 'mkdir', newdir
64
65        for i,d in enumerate(dirs):
66            if islink(joinpath(srcdir, root, d)):
67                dirs[i] = joinpath(d, '.')
68
69        for entry in files:
70            dest = normpath(joinpath(destdir, root, entry))
71            src = normpath(joinpath(srcdir, root, entry))
72            if not isfile(dest) or not filecmp(src, dest):
73                print 'copy %s %s' % (dest, src)
74                copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80    %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81    -c           clean directory if job can be run
82    -e           only echo pbs command info, don't actually send the job
83    -f           force the job to run regardless of state
84    -q <queue>   submit job to the named queue
85    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
86    -v           be verbose
87
88    %(progname)s [-j <jobfile>] -l [-v] <regexp>
89    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
90    -l           list job names, don't submit
91    -v           be verbose (list job parameters)
92
93    %(progname)s -h
94    -h           display this help
95""" % locals()
96
97try:
98    import getopt
99    opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
100except getopt.GetoptError:
101    sys.exit(usage)
102
103depend = False
104clean = False
105onlyecho = False
106exprs = []
107force = False
108listonly = False
109queue = ''
110verbose = False
111jfile = 'Test.py'
112docpts = False
113doruns = True
114runflag = False
115node_type = 'FAST'
116update = True
117
118for opt,arg in opts:
119    if opt == '-C':
120        docpts = True
121    if opt == '-c':
122        clean = True
123    if opt == '-d':
124        depend = True
125    if opt == '-e':
126        onlyecho = True
127    if opt == '-f':
128        force = True
129    if opt == '-h':
130        print usage
131        sys.exit(0)
132    if opt == '-j':
133        jfile = arg
134    if opt == '-l':
135        listonly = True
136    if opt == '-n':
137        update = False
138    if opt == '-q':
139        queue = arg
140    if opt == '-R':
141        runflag = True
142    if opt == '-t':
143        node_type = arg
144    if opt == '-v':
145        verbose = True
146
147if docpts:
148    doruns = runflag
149
150for arg in args:
151    exprs.append(re.compile(arg))
152
153import jobfile, pbs
154from job import JobDir, date
155
156conf = jobfile.JobFile(jfile)
157
158if update and not listonly and not onlyecho and isdir(conf.linkdir):
159    if verbose:
160        print 'Checking for outdated files in Link directory'
161    if not isdir(conf.basedir):
162        os.mkdir(conf.basedir)
163    syncdir(conf.linkdir, conf.basedir)
164
165jobnames = {}
166joblist = []
167
168if docpts and doruns:
169    gen = conf.alljobs()
170elif docpts:
171    gen = conf.checkpoints()
172elif doruns:
173    gen = conf.jobs()
174
175for job in gen:
176    if job.name in jobnames:
177        continue
178
179    if exprs:
180        for expr in exprs:
181            if expr.match(job.name):
182                joblist.append(job)
183                break
184    else:
185        joblist.append(job)
186
187if listonly:
188    if verbose:
189        for job in joblist:
190            job.printinfo()
191    else:
192        for job in joblist:
193            print job.name
194    sys.exit(0)
195
196if not onlyecho:
197    newlist = []
198    for job in joblist:
199        jobdir = JobDir(joinpath(conf.rootdir, job.name))
200        if jobdir.exists():
201            if not force:
202                status = jobdir.getstatus()
203                if status == 'queued':
204                    continue
205
206                if status == 'running':
207                    continue
208
209                if status == 'success':
210                    continue
211
212            if not clean:
213                sys.exit('job directory %s not clean!' % jobdir)
214
215            jobdir.clean()
216        newlist.append(job)
217    joblist = newlist
218
219class NameHack(object):
220    def __init__(self, host='pbs.pool', port=24465):
221        self.host = host
222        self.port = port
223        self.socket = None
224
225    def setname(self, jobid, jobname):
226        try:
227            jobid = int(jobid)
228        except ValueError:
229            jobid = int(jobid.strip().split('.')[0])
230
231        jobname = jobname.strip()
232        # since pbs can handle jobnames of 15 characters or less,
233        # don't use the raj hack.
234        if len(jobname) <= 15:
235            return
236
237        if self.socket is None:
238            import socket
239            self.socket = socket.socket()
240            # Connect to pbs.pool and send the jobid/jobname pair to port
241            # 24465 (Raj didn't realize that there are only 64k ports and
242            # setup inetd to point to port 90001)
243            self.socket.connect((self.host, self.port))
244
245        self.socket.send("%s %s\n" % (jobid, jobname))
246
247namehack = NameHack()
248
249for job in joblist:
250    jobdir = JobDir(joinpath(conf.rootdir, job.name))
251    if depend:
252        cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name))
253        cptjob = cptdir.readval('.pbs_jobid')
254
255    if not onlyecho:
256        jobdir.create()
257
258    print 'Job name:       %s' % job.name
259    print 'Job directory:  %s' % jobdir
260
261    qsub = pbs.qsub()
262    qsub.pbshost = 'simpool.eecs.umich.edu'
263    qsub.stdout = jobdir.file('jobout')
264    qsub.name = job.name[:15]
265    qsub.join = True
266    qsub.node_type = node_type
267    qsub.env['ROOTDIR'] = conf.rootdir
268    qsub.env['JOBNAME'] = job.name
269    if depend:
270        qsub.afterok = cptjob
271    if queue:
272        qsub.queue = queue
273    qsub.build(joinpath(progpath, 'job.py'))
274
275    if verbose:
276        print 'PBS Command:    %s' % qsub.command
277
278    if not onlyecho:
279        ec = qsub.do()
280        if ec == 0:
281            jobid = qsub.result
282            print 'PBS Jobid:      %s' % jobid
283            namehack.setname(jobid, job.name)
284            queued = date()
285            jobdir.echofile('.pbs_jobid', jobid)
286            jobdir.echofile('.pbs_jobname', job.name)
287            jobdir.echofile('.queued', queued)
288            jobdir.setstatus('queued on %s' % queued)
289        else:
290            print 'PBS Failed'
291