send.py revision 1956:e9447a75c009
1#!/usr/bin/env python
2# Copyright (c) 2005 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Ali Saidi
29#          Nathan Binkert
30
31import os, os.path, re, socket, sys
32from os import environ as env, listdir
33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
34from filecmp import cmp as filecmp
35from shutil import copy
36
37def nfspath(dir):
38    if dir.startswith('/.automount/'):
39        dir = '/n/%s' % dir[12:]
40    elif not dir.startswith('/n/'):
41        dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
42    return dir
43
44def syncdir(srcdir, destdir):
45    srcdir = normpath(srcdir)
46    destdir = normpath(destdir)
47    if not isdir(destdir):
48        sys.exit('destination directory "%s" does not exist' % destdir)
49
50    for root, dirs, files in os.walk(srcdir):
51        root = normpath(root)
52        prefix = os.path.commonprefix([root, srcdir])
53        root = root[len(prefix):]
54        if root.startswith('/'):
55            root = root[1:]
56        for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57            dirs.remove(rem)
58
59        for entry in dirs:
60            newdir = joinpath(destdir, root, entry)
61            if not isdir(newdir):
62                os.mkdir(newdir)
63                print 'mkdir', newdir
64
65        for i,d in enumerate(dirs):
66            if islink(joinpath(srcdir, root, d)):
67                dirs[i] = joinpath(d, '.')
68
69        for entry in files:
70            dest = normpath(joinpath(destdir, root, entry))
71            src = normpath(joinpath(srcdir, root, entry))
72            if not isfile(dest) or not filecmp(src, dest):
73                print 'copy %s %s' % (dest, src)
74                copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80    %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81    -c           clean directory if job can be run
82    -C           submit the checkpointing runs
83    -d           Make jobs be dependent on the completion of the checkpoint runs
84    -e           only echo pbs command info, don't actually send the job
85    -f           force the job to run regardless of state
86    -q <queue>   submit job to the named queue
87    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
88    -v           be verbose
89
90    %(progname)s [-j <jobfile>] -l [-v] <regexp>
91    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
92    -l           list job names, don't submit
93    -v           be verbose (list job parameters)
94
95    %(progname)s -h
96    -h           display this help
97""" % locals()
98
99try:
100    import getopt
101    opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lq:Rt:v')
102except getopt.GetoptError:
103    sys.exit(usage)
104
105depend = False
106clean = False
107onlyecho = False
108exprs = []
109force = False
110listonly = False
111queue = ''
112verbose = False
113jfile = 'Test.py'
114docpts = False
115doruns = True
116runflag = False
117node_type = 'FAST'
118
119for opt,arg in opts:
120    if opt == '-C':
121        docpts = True
122    if opt == '-c':
123        clean = True
124    if opt == '-d':
125        depend = True
126    if opt == '-e':
127        onlyecho = True
128    if opt == '-f':
129        force = True
130    if opt == '-h':
131        print usage
132        sys.exit(0)
133    if opt == '-j':
134        jfile = arg
135    if opt == '-l':
136        listonly = True
137    if opt == '-q':
138        queue = arg
139    if opt == '-R':
140        runflag = True
141    if opt == '-t':
142        node_type = arg
143    if opt == '-v':
144        verbose = True
145
146if docpts:
147    doruns = runflag
148
149for arg in args:
150    exprs.append(re.compile(arg))
151
152import jobfile, pbs
153from job import JobDir, date
154
155conf = jobfile.JobFile(jfile)
156
157if not listonly and not onlyecho and isdir(conf.linkdir):
158    if verbose:
159        print 'Checking for outdated files in Link directory'
160    if not isdir(conf.basedir):
161        os.mkdir(conf.basedir)
162    syncdir(conf.linkdir, conf.basedir)
163
164jobnames = {}
165joblist = []
166
167if docpts and doruns:
168    gen = conf.alljobs()
169elif docpts:
170    gen = conf.checkpoints()
171elif doruns:
172    gen = conf.jobs()
173
174for job in gen:
175    if job.name in jobnames:
176        continue
177
178    if exprs:
179        for expr in exprs:
180            if expr.match(job.name):
181                joblist.append(job)
182                break
183    else:
184        joblist.append(job)
185
186if listonly:
187    if verbose:
188        for job in joblist:
189            job.printinfo()
190    else:
191        for job in joblist:
192            print job.name
193    sys.exit(0)
194
195if not onlyecho:
196    newlist = []
197    for job in joblist:
198        jobdir = JobDir(joinpath(conf.rootdir, job.name))
199        if jobdir.exists():
200            if not force:
201                status = jobdir.getstatus()
202                if status == 'queued':
203                    continue
204
205                if status == 'running':
206                    continue
207
208                if status == 'success':
209                    continue
210
211            if not clean:
212                sys.exit('job directory %s not clean!' % jobdir)
213
214            jobdir.clean()
215        newlist.append(job)
216    joblist = newlist
217
218class NameHack(object):
219    def __init__(self, host='pbs.pool', port=24465):
220        self.host = host
221        self.port = port
222        self.socket = None
223
224    def setname(self, jobid, jobname):
225        try:
226            jobid = int(jobid)
227        except ValueError:
228            jobid = int(jobid.strip().split('.')[0])
229
230        jobname = jobname.strip()
231        # since pbs can handle jobnames of 15 characters or less,
232        # don't use the raj hack.
233        if len(jobname) <= 15:
234            return
235
236        if self.socket is None:
237            import socket
238            self.socket = socket.socket()
239            # Connect to pbs.pool and send the jobid/jobname pair to port
240            # 24465 (Raj didn't realize that there are only 64k ports and
241            # setup inetd to point to port 90001)
242            self.socket.connect((self.host, self.port))
243
244        self.socket.send("%s %s\n" % (jobid, jobname))
245
246namehack = NameHack()
247
248for job in joblist:
249    jobdir = JobDir(joinpath(conf.rootdir, job.name))
250    if depend:
251        cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name))
252        cptjob = cptdir.readval('.pbs_jobid')
253
254    if not onlyecho:
255        jobdir.create()
256
257    print 'Job name:       %s' % job.name
258    print 'Job directory:  %s' % jobdir
259
260    qsub = pbs.qsub()
261    qsub.pbshost = 'simpool.eecs.umich.edu'
262    qsub.stdout = jobdir.file('jobout')
263    qsub.name = job.name[:15]
264    qsub.join = True
265    qsub.node_type = node_type
266    qsub.env['ROOTDIR'] = conf.rootdir
267    qsub.env['JOBNAME'] = job.name
268    if depend:
269        qsub.afterok = cptjob
270    if queue:
271        qsub.queue = queue
272    qsub.build(joinpath(progpath, 'job.py'))
273
274    if verbose:
275        print 'PBS Command:    %s' % qsub.command
276
277    if not onlyecho:
278        ec = qsub.do()
279        if ec == 0:
280            jobid = qsub.result
281            print 'PBS Jobid:      %s' % jobid
282            namehack.setname(jobid, job.name)
283            queued = date()
284            jobdir.echofile('.pbs_jobid', jobid)
285            jobdir.echofile('.pbs_jobname', job.name)
286            jobdir.echofile('.queued', queued)
287            jobdir.setstatus('queued on %s' % queued)
288        else:
289            print 'PBS Failed'
290