send.py revision 1948
1#!/usr/bin/env python
2# Copyright (c) 2005 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Ali Saidi
29#          Nathan Binkert
30
31import os, os.path, re, socket, sys
32from os import environ as env, listdir
33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
34from filecmp import cmp as filecmp
35from shutil import copy
36
37def nfspath(dir):
38    if dir.startswith('/.automount/'):
39        dir = '/n/%s' % dir[12:]
40    elif not dir.startswith('/n/'):
41        dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
42    return dir
43
44def syncdir(srcdir, destdir):
45    srcdir = normpath(srcdir)
46    destdir = normpath(destdir)
47    if not isdir(destdir):
48        sys.exit('destination directory "%s" does not exist' % destdir)
49
50    for root, dirs, files in os.walk(srcdir):
51        root = normpath(root)
52        prefix = os.path.commonprefix([root, srcdir])
53        root = root[len(prefix):]
54        if root.startswith('/'):
55            root = root[1:]
56        for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57            dirs.remove(rem)
58
59        for entry in dirs:
60            newdir = joinpath(destdir, root, entry)
61            if not isdir(newdir):
62                os.mkdir(newdir)
63                print 'mkdir', newdir
64
65        for i,d in enumerate(dirs):
66            if islink(joinpath(srcdir, root, d)):
67                dirs[i] = joinpath(d, '.')
68
69        for entry in files:
70            dest = normpath(joinpath(destdir, root, entry))
71            src = normpath(joinpath(srcdir, root, entry))
72            if not isfile(dest) or not filecmp(src, dest):
73                print 'copy %s %s' % (dest, src)
74                copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80    %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81    -c           clean directory if job can be run
82    -e           only echo pbs command info, don't actually send the job
83    -f           force the job to run regardless of state
84    -q <queue>   submit job to the named queue
85    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
86    -v           be verbose
87
88    %(progname)s [-j <jobfile>] -l [-v] <regexp>
89    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
90    -l           list job names, don't submit
91    -v           be verbose (list job parameters)
92
93    %(progname)s -h
94    -h           display this help
95""" % locals()
96
97try:
98    import getopt
99    opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
100except getopt.GetoptError:
101    sys.exit(usage)
102
103depend = False
104clean = False
105onlyecho = False
106exprs = []
107force = False
108listonly = False
109queue = ''
110verbose = False
111jfile = 'Test.py'
112docpts = False
113doruns = True
114runflag = False
115node_type = 'FAST'
116update = True
117
118for opt,arg in opts:
119    if opt == '-C':
120        docpts = True
121    if opt == '-c':
122        clean = True
123    if opt == '-d':
124        depend = True
125    if opt == '-e':
126        onlyecho = True
127    if opt == '-f':
128        force = True
129    if opt == '-h':
130        print usage
131        sys.exit(0)
132    if opt == '-j':
133        jfile = arg
134    if opt == '-l':
135        listonly = True
136    if opt == '-n':
137        update = False
138    if opt == '-q':
139        queue = arg
140    if opt == '-R':
141        runflag = True
142    if opt == '-t':
143        node_type = arg
144    if opt == '-v':
145        verbose = True
146
147if docpts:
148    doruns = runflag
149
150for arg in args:
151    exprs.append(re.compile(arg))
152
153import jobfile, pbs
154from job import JobDir, date
155
156conf = jobfile.JobFile(jfile)
157
158if update and not listonly and not onlyecho and isdir(conf.linkdir):
159    if verbose:
160        print 'Checking for outdated files in Link directory'
161    if not isdir(conf.basedir):
162        os.mkdir(conf.basedir)
163    syncdir(conf.linkdir, conf.basedir)
164
165jobnames = {}
166joblist = []
167
168if docpts and doruns:
169    gen = conf.alljobs()
170elif docpts:
171    gen = conf.checkpoints()
172elif doruns:
173    gen = conf.jobs()
174
175for job in gen:
176    if job.name in jobnames:
177        continue
178
179    if exprs:
180        for expr in exprs:
181            if expr.match(job.name):
182                joblist.append(job)
183                break
184    else:
185        joblist.append(job)
186
187if listonly:
188    if verbose:
189        for job in joblist:
190            job.printinfo()
191    else:
192        for job in joblist:
193            print job.name
194    sys.exit(0)
195
196if not onlyecho:
197    newlist = []
198    for job in joblist:
199        jobdir = JobDir(joinpath(conf.rootdir, job.name))
200        if jobdir.exists():
201            if not force:
202                status = jobdir.getstatus()
203                if status == 'queued':
204                    continue
205
206                if status == 'running':
207                    continue
208
209                if status == 'success':
210                    continue
211
212            if not clean:
213                sys.exit('job directory %s not clean!' % jobdir)
214
215            jobdir.clean()
216        newlist.append(job)
217    joblist = newlist
218
219class NameHack(object):
220    def __init__(self, host='pbs.pool', port=24465):
221        self.host = host
222        self.port = port
223        self.socket = None
224
225    def setname(self, jobid, jobname):
226        try:
227            jobid = int(jobid)
228        except ValueError:
229            jobid = int(jobid.strip().split('.')[0])
230
231        jobname = jobname.strip()
232        # since pbs can handle jobnames of 15 characters or less,
233        # don't use the raj hack.
234        if len(jobname) <= 15:
235            return
236
237        if self.socket is None:
238            import socket
239            self.socket = socket.socket()
240            # Connect to pbs.pool and send the jobid/jobname pair to port
241            # 24465 (Raj didn't realize that there are only 64k ports and
242            # setup inetd to point to port 90001)
243            self.socket.connect((self.host, self.port))
244
245        self.socket.send("%s %s\n" % (jobid, jobname))
246
247namehack = NameHack()
248
249for job in joblist:
250    jobdir = JobDir(joinpath(conf.rootdir, job.name))
251    if depend:
252        cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name))
253        cptjob = cptdir.readval('.pbs_jobid')
254
255    if not onlyecho:
256        jobdir.create()
257
258    print 'Job name:       %s' % job.name
259    print 'Job directory:  %s' % jobdir
260
261    qsub = pbs.qsub()
262    qsub.pbshost = 'simpool.eecs.umich.edu'
263    qsub.stdout = jobdir.file('jobout')
264    qsub.name = job.name[:15]
265    qsub.join = True
266    qsub.node_type = node_type
267    qsub.env['ROOTDIR'] = conf.rootdir
268    qsub.env['JOBNAME'] = job.name
269    if depend:
270        qsub.afterok = cptjob
271    if queue:
272        qsub.queue = queue
273    qsub.build(joinpath(progpath, 'job.py'))
274
275    if verbose:
276        print 'PBS Command:    %s' % qsub.command
277
278    if not onlyecho:
279        ec = qsub.do()
280        if ec == 0:
281            jobid = qsub.result
282            print 'PBS Jobid:      %s' % jobid
283            namehack.setname(jobid, job.name)
284            queued = date()
285            jobdir.echofile('.pbs_jobid', jobid)
286            jobdir.echofile('.pbs_jobname', job.name)
287            jobdir.echofile('.queued', queued)
288            jobdir.setstatus('queued on %s' % queued)
289        else:
290            print 'PBS Failed'
291