send.py revision 13540
1#!/usr/bin/env python2.7
2# Copyright (c) 2005 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Ali Saidi
29#          Nathan Binkert
30
31import os, os.path, re, socket, sys
32from os import environ as env, listdir
33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
34from filecmp import cmp as filecmp
35from shutil import copy
36
37def nfspath(dir):
38    if dir.startswith('/.automount/'):
39        dir = '/n/%s' % dir[12:]
40    elif not dir.startswith('/n/'):
41        dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
42    return dir
43
44def syncdir(srcdir, destdir):
45    srcdir = normpath(srcdir)
46    destdir = normpath(destdir)
47    if not isdir(destdir):
48        sys.exit('destination directory "%s" does not exist' % destdir)
49
50    for root, dirs, files in os.walk(srcdir):
51        root = normpath(root)
52        prefix = os.path.commonprefix([root, srcdir])
53        root = root[len(prefix):]
54        if root.startswith('/'):
55            root = root[1:]
56        for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57            dirs.remove(rem)
58
59        for entry in dirs:
60            newdir = joinpath(destdir, root, entry)
61            if not isdir(newdir):
62                os.mkdir(newdir)
63                print 'mkdir', newdir
64
65        for i,d in enumerate(dirs):
66            if islink(joinpath(srcdir, root, d)):
67                dirs[i] = joinpath(d, '.')
68
69        for entry in files:
70            dest = normpath(joinpath(destdir, root, entry))
71            src = normpath(joinpath(srcdir, root, entry))
72            if not isfile(dest) or not filecmp(src, dest):
73                print 'copy %s %s' % (dest, src)
74                copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80    %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81    -c           clean directory if job can be run
82    -C           submit the checkpointing runs
83    -d           Make jobs be dependent on the completion of the checkpoint runs
84    -e           only echo pbs command info, don't actually send the job
85    -f           force the job to run regardless of state
86    -q <queue>   submit job to the named queue
87    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
88    -v           be verbose
89
90    %(progname)s [-j <jobfile>] -l [-v] <regexp>
91    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
92    -l           list job names, don't submit
93    -v           be verbose (list job parameters)
94
95    %(progname)s -h
96    -h           display this help
97""" % locals()
98
99try:
100    import getopt
101    opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
102except getopt.GetoptError:
103    sys.exit(usage)
104
105depend = False
106clean = False
107onlyecho = False
108exprs = []
109force = False
110listonly = False
111queue = ''
112verbose = False
113jfile = 'Test.py'
114docpts = False
115doruns = True
116runflag = False
117node_type = 'FAST'
118update = True
119
120for opt,arg in opts:
121    if opt == '-C':
122        docpts = True
123    if opt == '-c':
124        clean = True
125    if opt == '-d':
126        depend = True
127    if opt == '-e':
128        onlyecho = True
129    if opt == '-f':
130        force = True
131    if opt == '-h':
132        print usage
133        sys.exit(0)
134    if opt == '-j':
135        jfile = arg
136    if opt == '-l':
137        listonly = True
138    if opt == '-n':
139        update = False
140    if opt == '-q':
141        queue = arg
142    if opt == '-R':
143        runflag = True
144    if opt == '-t':
145        node_type = arg
146    if opt == '-v':
147        verbose = True
148
149if docpts:
150    doruns = runflag
151
152for arg in args:
153    exprs.append(re.compile(arg))
154
155import jobfile, pbs
156from job import JobDir, date
157
158conf = jobfile.JobFile(jfile)
159
160if update and not listonly and not onlyecho and isdir(conf.linkdir):
161    if verbose:
162        print 'Checking for outdated files in Link directory'
163    if not isdir(conf.basedir):
164        os.mkdir(conf.basedir)
165    syncdir(conf.linkdir, conf.basedir)
166
167jobnames = {}
168joblist = []
169
170if docpts and doruns:
171    gen = conf.alljobs()
172elif docpts:
173    gen = conf.checkpoints()
174elif doruns:
175    gen = conf.jobs()
176
177for job in gen:
178    if job.name in jobnames:
179        continue
180
181    if exprs:
182        for expr in exprs:
183            if expr.match(job.name):
184                joblist.append(job)
185                break
186    else:
187        joblist.append(job)
188
189if listonly:
190    if verbose:
191        for job in joblist:
192            job.printinfo()
193    else:
194        for job in joblist:
195            print job.name
196    sys.exit(0)
197
198if not onlyecho:
199    newlist = []
200    for job in joblist:
201        jobdir = JobDir(joinpath(conf.rootdir, job.name))
202        if jobdir.exists():
203            if not force:
204                status = jobdir.getstatus()
205                if status == 'queued':
206                    continue
207
208                if status == 'running':
209                    continue
210
211                if status == 'success':
212                    continue
213
214            if not clean:
215                sys.exit('job directory %s not clean!' % jobdir)
216
217            jobdir.clean()
218        newlist.append(job)
219    joblist = newlist
220
221class NameHack(object):
222    def __init__(self, host='pbs.pool', port=24465):
223        self.host = host
224        self.port = port
225        self.socket = None
226
227    def setname(self, jobid, jobname):
228        try:
229            jobid = int(jobid)
230        except ValueError:
231            jobid = int(jobid.strip().split('.')[0])
232
233        jobname = jobname.strip()
234        # since pbs can handle jobnames of 15 characters or less,
235        # don't use the raj hack.
236        if len(jobname) <= 15:
237            return
238
239        if self.socket is None:
240            import socket
241            self.socket = socket.socket()
242            # Connect to pbs.pool and send the jobid/jobname pair to port
243            # 24465 (Raj didn't realize that there are only 64k ports and
244            # setup inetd to point to port 90001)
245            self.socket.connect((self.host, self.port))
246
247        self.socket.send("%s %s\n" % (jobid, jobname))
248
249namehack = NameHack()
250
251for job in joblist:
252    jobdir = JobDir(joinpath(conf.rootdir, job.name))
253    if depend:
254        cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name))
255        cptjob = cptdir.readval('.pbs_jobid')
256
257    if not onlyecho:
258        jobdir.create()
259
260    print 'Job name:       %s' % job.name
261    print 'Job directory:  %s' % jobdir
262
263    qsub = pbs.qsub()
264    qsub.pbshost = 'simpool.eecs.umich.edu'
265    qsub.stdout = jobdir.file('jobout')
266    qsub.name = job.name[:15]
267    qsub.join = True
268    qsub.node_type = node_type
269    qsub.env['ROOTDIR'] = conf.rootdir
270    qsub.env['JOBNAME'] = job.name
271    if depend:
272        qsub.afterok = cptjob
273    if queue:
274        qsub.queue = queue
275    qsub.build(joinpath(progpath, 'job.py'))
276
277    if verbose:
278        print 'PBS Command:    %s' % qsub.command
279
280    if not onlyecho:
281        ec = qsub.do()
282        if ec == 0:
283            jobid = qsub.result
284            print 'PBS Jobid:      %s' % jobid
285            namehack.setname(jobid, job.name)
286            queued = date()
287            jobdir.echofile('.pbs_jobid', jobid)
288            jobdir.echofile('.pbs_jobname', job.name)
289            jobdir.echofile('.queued', queued)
290            jobdir.setstatus('queued on %s' % queued)
291        else:
292            print 'PBS Failed'
293