send.py revision 1916:fe8d4e92c0a7
1#!/usr/bin/env python
2# Copyright (c) 2005 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Ali Saidi
29#          Nathan Binkert
30
31import os, os.path, re, socket, sys
32from os import environ as env, listdir
33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
34from filecmp import cmp as filecmp
35from shutil import copy
36
37def nfspath(dir):
38    if dir.startswith('/.automount/'):
39        dir = '/n/%s' % dir[12:]
40    elif not dir.startswith('/n/'):
41        dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
42    return dir
43
44def syncdir(srcdir, destdir):
45    srcdir = normpath(srcdir)
46    destdir = normpath(destdir)
47    if not isdir(destdir):
48        sys.exit('destination directory "%s" does not exist' % destdir)
49
50    for root, dirs, files in os.walk(srcdir):
51        root = normpath(root)
52        prefix = os.path.commonprefix([root, srcdir])
53        root = root[len(prefix):]
54        if root.startswith('/'):
55            root = root[1:]
56        for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57            dirs.remove(rem)
58
59        for entry in dirs:
60            newdir = joinpath(destdir, root, entry)
61            if not isdir(newdir):
62                os.mkdir(newdir)
63                print 'mkdir', newdir
64
65        for i,d in enumerate(dirs):
66            if islink(joinpath(srcdir, root, d)):
67                dirs[i] = joinpath(d, '.')
68
69        for entry in files:
70            dest = normpath(joinpath(destdir, root, entry))
71            src = normpath(joinpath(srcdir, root, entry))
72            if not isfile(dest) or not filecmp(src, dest):
73                print 'copy %s %s' % (dest, src)
74                copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80    %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81    -c           clean directory if job can be run
82    -e           only echo pbs command info, don't actually send the job
83    -f           force the job to run regardless of state
84    -q <queue>   submit job to the named queue
85    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
86    -v           be verbose
87
88    %(progname)s [-j <jobfile>] -l [-v] <regexp>
89    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
90    -l           list job names, don't submit
91    -v           be verbose (list job parameters)
92
93    %(progname)s -h
94    -h           display this help
95""" % locals()
96
97try:
98    import getopt
99    opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lq:Rt:v')
100except getopt.GetoptError:
101    sys.exit(usage)
102
103depend = False
104clean = False
105onlyecho = False
106exprs = []
107force = False
108listonly = False
109queue = ''
110verbose = False
111jfile = 'Test.py'
112docpts = False
113doruns = True
114runflag = False
115node_type = 'FAST'
116
117for opt,arg in opts:
118    if opt == '-C':
119        docpts = True
120    if opt == '-c':
121        clean = True
122    if opt == '-d':
123        depend = True
124    if opt == '-e':
125        onlyecho = True
126    if opt == '-f':
127        force = True
128    if opt == '-h':
129        print usage
130        sys.exit(0)
131    if opt == '-j':
132        jfile = arg
133    if opt == '-l':
134        listonly = True
135    if opt == '-q':
136        queue = arg
137    if opt == '-R':
138        runflag = True
139    if opt == '-t':
140        node_type = arg
141    if opt == '-v':
142        verbose = True
143
144if docpts:
145    doruns = runflag
146
147for arg in args:
148    exprs.append(re.compile(arg))
149
150import jobfile, pbs
151from job import JobDir, date
152
153conf = jobfile.JobFile(jfile)
154
155if not listonly and not onlyecho and isdir(conf.linkdir):
156    if verbose:
157        print 'Checking for outdated files in Link directory'
158    if not isdir(conf.basedir):
159        os.mkdir(conf.basedir)
160    syncdir(conf.linkdir, conf.basedir)
161
162jobnames = {}
163joblist = []
164
165if docpts and doruns:
166    gen = conf.alljobs()
167elif docpts:
168    gen = conf.checkpoints()
169elif doruns:
170    gen = conf.jobs()
171
172for job in gen:
173    if job.name in jobnames:
174        continue
175
176    if exprs:
177        for expr in exprs:
178            if expr.match(job.name):
179                joblist.append(job)
180                break
181    else:
182        joblist.append(job)
183
184if listonly:
185    if verbose:
186        for job in joblist:
187            job.printinfo()
188    else:
189        for job in joblist:
190            print job.name
191    sys.exit(0)
192
193if not onlyecho:
194    newlist = []
195    for job in joblist:
196        jobdir = JobDir(joinpath(conf.rootdir, job.name))
197        if jobdir.exists():
198            if not force:
199                status = jobdir.getstatus()
200                if status == 'queued':
201                    continue
202
203                if status == 'running':
204                    continue
205
206                if status == 'success':
207                    continue
208
209            if not clean:
210                sys.exit('job directory %s not clean!' % jobdir)
211
212            jobdir.clean()
213        newlist.append(job)
214    joblist = newlist
215
216class NameHack(object):
217    def __init__(self, host='pbs.pool', port=24465):
218        self.host = host
219        self.port = port
220        self.socket = None
221
222    def setname(self, jobid, jobname):
223        try:
224            jobid = int(jobid)
225        except ValueError:
226            jobid = int(jobid.strip().split('.')[0])
227
228        jobname = jobname.strip()
229        # since pbs can handle jobnames of 15 characters or less,
230        # don't use the raj hack.
231        if len(jobname) <= 15:
232            return
233
234        if self.socket is None:
235            import socket
236            self.socket = socket.socket()
237            # Connect to pbs.pool and send the jobid/jobname pair to port
238            # 24465 (Raj didn't realize that there are only 64k ports and
239            # setup inetd to point to port 90001)
240            self.socket.connect((self.host, self.port))
241
242        self.socket.send("%s %s\n" % (jobid, jobname))
243
244namehack = NameHack()
245
246for job in joblist:
247    jobdir = JobDir(joinpath(conf.rootdir, job.name))
248    if depend:
249        cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name))
250        cptjob = cptdir.readval('.pbs_jobid')
251
252    if not onlyecho:
253        jobdir.create()
254
255    print 'Job name:       %s' % job.name
256    print 'Job directory:  %s' % jobdir
257
258    qsub = pbs.qsub()
259    qsub.pbshost = 'simpool.eecs.umich.edu'
260    qsub.stdout = jobdir.file('jobout')
261    qsub.name = job.name[:15]
262    qsub.join = True
263    qsub.node_type = node_type
264    qsub.env['ROOTDIR'] = conf.rootdir
265    qsub.env['JOBNAME'] = job.name
266    if depend:
267        qsub.afterok = cptjob
268    if queue:
269        qsub.queue = queue
270    qsub.build(joinpath(progpath, 'job.py'))
271
272    if verbose:
273        print 'PBS Command:    %s' % qsub.command
274
275    if not onlyecho:
276        ec = qsub.do()
277        if ec == 0:
278            jobid = qsub.result
279            print 'PBS Jobid:      %s' % jobid
280            namehack.setname(jobid, job.name)
281            queued = date()
282            jobdir.echofile('.pbs_jobid', jobid)
283            jobdir.echofile('.pbs_jobname', job.name)
284            jobdir.echofile('.queued', queued)
285            jobdir.setstatus('queued on %s' % queued)
286        else:
287            print 'PBS Failed'
288