send.py revision 1908
1#!/usr/bin/env python
2# Copyright (c) 2005 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Ali Saidi
29#          Nathan Binkert
30
31import os, os.path, re, socket, sys
32from os import environ as env, listdir
33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
34from filecmp import cmp as filecmp
35from shutil import copy
36
37def nfspath(dir):
38    if dir.startswith('/.automount/'):
39        dir = '/n/%s' % dir[12:]
40    elif not dir.startswith('/n/'):
41        dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
42    return dir
43
44def syncdir(srcdir, destdir):
45    srcdir = normpath(srcdir)
46    destdir = normpath(destdir)
47    if not isdir(destdir):
48        sys.exit('destination directory "%s" does not exist' % destdir)
49
50    for root, dirs, files in os.walk(srcdir):
51        root = normpath(root)
52        prefix = os.path.commonprefix([root, srcdir])
53        root = root[len(prefix):]
54        if root.startswith('/'):
55            root = root[1:]
56        for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57            dirs.remove(rem)
58
59        for entry in dirs:
60            newdir = joinpath(destdir, root, entry)
61            if not isdir(newdir):
62                os.mkdir(newdir)
63                print 'mkdir', newdir
64
65        for i,d in enumerate(dirs):
66            if islink(joinpath(srcdir, root, d)):
67                dirs[i] = joinpath(d, '.')
68
69        for entry in files:
70            dest = normpath(joinpath(destdir, root, entry))
71            src = normpath(joinpath(srcdir, root, entry))
72            if not isfile(dest) or not filecmp(src, dest):
73                print 'copy %s %s' % (dest, src)
74                copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80    %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81    -c           clean directory if job can be run
82    -e           only echo pbs command info, don't actually send the job
83    -f           force the job to run regardless of state
84    -q <queue>   submit job to the named queue
85    -j <jobfile> specify the jobfile (default is <basedir>/test.py)
86    -v           be verbose
87
88    %(progname)s [-j <jobfile>] -l [-v] <regexp>
89    -j <jobfile> specify the jobfile (default is <basedir>/test.py)
90    -l           list job names, don't submit
91    -v           be verbose (list job parameters)
92
93    %(progname)s -h
94    -h           display this help
95""" % locals()
96
97try:
98    import getopt
99    opts, args = getopt.getopt(sys.argv[1:], '-CRcd:efhj:lq:v')
100except getopt.GetoptError:
101    sys.exit(usage)
102
103clean = False
104onlyecho = False
105exprs = []
106force = False
107listonly = False
108queue = ''
109verbose = False
110jfile = 'Base/test.py'
111docpts = False
112doruns = True
113runflag = False
114
115for opt,arg in opts:
116    if opt == '-C':
117        docpts = True
118    if opt == '-R':
119        runflag = True
120    if opt == '-c':
121        clean = True
122    if opt == '-e':
123        onlyecho = True
124    if opt == '-f':
125        force = True
126    if opt == '-h':
127        print usage
128        sys.exit(0)
129    if opt == '-j':
130        jfile = arg
131    if opt == '-l':
132        listonly = True
133    if opt == '-q':
134        queue = arg
135    if opt == '-v':
136        verbose = True
137
138if docpts:
139    doruns = runflag
140
141for arg in args:
142    exprs.append(re.compile(arg))
143
144import jobfile, pbs
145from job import JobDir, date
146
147conf = jobfile.JobFile(jfile)
148
149if not listonly and not onlyecho and isdir(conf.linkdir):
150    if verbose:
151        print 'Checking for outdated files in Link directory'
152    syncdir(conf.linkdir, conf.basedir)
153
154jobnames = {}
155joblist = []
156
157if docpts and doruns:
158    gen = conf.alljobs()
159elif docpts:
160    gen = conf.checkpoints()
161elif doruns:
162    gen = conf.jobs()
163
164for job in gen:
165    if job.name in jobnames:
166        continue
167
168    if exprs:
169        for expr in exprs:
170            if expr.match(job.name):
171                joblist.append(job)
172                break
173    else:
174        joblist.append(job)
175
176if listonly:
177    if verbose:
178        for job in joblist:
179            job.printinfo()
180    else:
181        for job in joblist:
182            print job.name
183    sys.exit(0)
184
185if not onlyecho:
186    newlist = []
187    for job in joblist:
188        jobdir = JobDir(joinpath(conf.rootdir, job.name))
189        if jobdir.exists():
190            if not force:
191                status = jobdir.getstatus()
192                if status == 'queued':
193                    continue
194
195                if status == 'running':
196                    continue
197
198                if status == 'success':
199                    continue
200
201            if not clean:
202                sys.exit('job directory %s not clean!' % jobdir)
203
204            jobdir.clean()
205        newlist.append(job)
206    joblist = newlist
207
208class NameHack(object):
209    def __init__(self, host='pbs.pool', port=24465):
210        self.host = host
211        self.port = port
212        self.socket = None
213
214    def setname(self, jobid, jobname):
215        try:
216            jobid = int(jobid)
217        except ValueError:
218            jobid = int(jobid.strip().split('.')[0])
219
220        jobname = jobname.strip()
221        # since pbs can handle jobnames of 15 characters or less,
222        # don't use the raj hack.
223        if len(jobname) <= 15:
224            return
225
226        if self.socket is None:
227            import socket
228            self.socket = socket.socket()
229            # Connect to pbs.pool and send the jobid/jobname pair to port
230            # 24465 (Raj didn't realize that there are only 64k ports and
231            # setup inetd to point to port 90001)
232            self.socket.connect((self.host, self.port))
233
234        self.socket.send("%s %s\n" % (jobid, jobname))
235
236namehack = NameHack()
237
238for job in joblist:
239    jobdir = JobDir(joinpath(conf.rootdir, job.name))
240
241    if not onlyecho:
242        jobdir.create()
243
244    print 'Job name:       %s' % job.name
245    print 'Job directory:  %s' % jobdir
246
247    qsub = pbs.qsub()
248    qsub.pbshost = 'simpool.eecs.umich.edu'
249    qsub.stdout = jobdir.file('jobout')
250    qsub.name = job.name[:15]
251    qsub.join = True
252    qsub.node_type = 'FAST'
253    qsub.env['ROOTDIR'] = conf.rootdir
254    qsub.env['JOBNAME'] = job.name
255    if len(queue):
256        qsub.queue = queue
257    qsub.build(joinpath(progpath, 'job.py'))
258
259    if verbose:
260        print 'PBS Command:    %s' % qsub.command
261
262    if not onlyecho:
263        ec = qsub.do()
264        if ec == 0:
265            jobid = qsub.result
266            print 'PBS Jobid:      %s' % jobid
267            namehack.setname(jobid, job.name)
268            queued = date()
269            jobdir.echofile('.pbs_jobid', jobid)
270            jobdir.echofile('.pbs_jobname', job.name)
271            jobdir.echofile('.queued', queued)
272            jobdir.setstatus('queued on %s' % queued)
273        else:
274            print 'PBS Failed'
275