send.py revision 2357:add41108b549
1#!/usr/bin/env python
2# Copyright (c) 2006 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Kevin Lim
29
30import os, os.path, re, socket, sys
31from os import environ as env, listdir
32from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
33from filecmp import cmp as filecmp
34from shutil import copy
35
36def nfspath(dir):
37    if dir.startswith('/.automount/'):
38        dir = '/n/%s' % dir[12:]
39    elif not dir.startswith('/n/'):
40        dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
41    return dir
42
43def syncdir(srcdir, destdir):
44    srcdir = normpath(srcdir)
45    destdir = normpath(destdir)
46    if not isdir(destdir):
47        sys.exit('destination directory "%s" does not exist' % destdir)
48
49    for root, dirs, files in os.walk(srcdir):
50        root = normpath(root)
51        prefix = os.path.commonprefix([root, srcdir])
52        root = root[len(prefix):]
53        if root.startswith('/'):
54            root = root[1:]
55        for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
56            dirs.remove(rem)
57
58        for entry in dirs:
59            newdir = joinpath(destdir, root, entry)
60            if not isdir(newdir):
61                os.mkdir(newdir)
62                print 'mkdir', newdir
63
64        for i,d in enumerate(dirs):
65            if islink(joinpath(srcdir, root, d)):
66                dirs[i] = joinpath(d, '.')
67
68        for entry in files:
69            dest = normpath(joinpath(destdir, root, entry))
70            src = normpath(joinpath(srcdir, root, entry))
71            if not isfile(dest) or not filecmp(src, dest):
72                print 'copy %s %s' % (dest, src)
73                copy(src, dest)
74
75progpath = nfspath(sys.path[0])
76progname = basename(sys.argv[0])
77usage = """\
78Usage:
79    %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
80    -c           clean directory if job can be run
81    -C           submit the checkpointing runs
82    -d           Make jobs be dependent on the completion of the checkpoint runs
83    -e           only echo pbs command info, don't actually send the job
84    -f           force the job to run regardless of state
85    -q <queue>   submit job to the named queue
86    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
87    -v           be verbose
88
89    %(progname)s [-j <jobfile>] -l [-v] <regexp>
90    -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
91    -l           list job names, don't submit
92    -v           be verbose (list job parameters)
93
94    %(progname)s -h
95    -h           display this help
96""" % locals()
97
98try:
99    import getopt
100    opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
101except getopt.GetoptError:
102    sys.exit(usage)
103
104depend = False
105clean = False
106onlyecho = False
107exprs = []
108force = False
109listonly = False
110queue = ''
111verbose = False
112jfile = 'Test.py'
113docpts = False
114doruns = True
115runflag = False
116node_type = 'FAST'
117update = True
118
119for opt,arg in opts:
120    if opt == '-C':
121        docpts = True
122    if opt == '-c':
123        clean = True
124    if opt == '-d':
125        depend = True
126    if opt == '-e':
127        onlyecho = True
128    if opt == '-f':
129        force = True
130    if opt == '-h':
131        print usage
132        sys.exit(0)
133    if opt == '-j':
134        jfile = arg
135    if opt == '-l':
136        listonly = True
137    if opt == '-n':
138        update = False
139    if opt == '-q':
140        queue = arg
141    if opt == '-R':
142        runflag = True
143    if opt == '-t':
144        node_type = arg
145    if opt == '-v':
146        verbose = True
147
148if docpts:
149    doruns = runflag
150
151for arg in args:
152    exprs.append(re.compile(arg))
153
154import jobfile, batch
155from job import JobDir, date
156
157conf = jobfile.JobFile(jfile)
158
159if update and not listonly and not onlyecho and isdir(conf.linkdir):
160    if verbose:
161        print 'Checking for outdated files in Link directory'
162    if not isdir(conf.basedir):
163        os.mkdir(conf.basedir)
164    syncdir(conf.linkdir, conf.basedir)
165
166jobnames = {}
167joblist = []
168
169if docpts and doruns:
170    gen = conf.alljobs()
171elif docpts:
172    gen = conf.checkpoints()
173elif doruns:
174    gen = conf.jobs()
175
176for job in gen:
177    if job.name in jobnames:
178        continue
179
180    if exprs:
181        for expr in exprs:
182            if expr.match(job.name):
183                joblist.append(job)
184                break
185    else:
186        joblist.append(job)
187
188if listonly:
189    if verbose:
190        for job in joblist:
191            job.printinfo()
192    else:
193        for job in joblist:
194            print job.name
195    sys.exit(0)
196
197if not onlyecho:
198    newlist = []
199    for job in joblist:
200        jobdir = JobDir(joinpath(conf.rootdir, job.name))
201        if jobdir.exists():
202            if not force:
203                status = jobdir.getstatus()
204                if status == 'queued':
205                    continue
206
207                if status == 'running':
208                    continue
209
210                if status == 'success':
211                    continue
212
213            if not clean:
214                sys.exit('job directory %s not clean!' % jobdir)
215
216            jobdir.clean()
217        newlist.append(job)
218    joblist = newlist
219
220class NameHack(object):
221    def __init__(self, host='pbs.pool', port=24465):
222        self.host = host
223        self.port = port
224        self.socket = None
225
226    def setname(self, jobid, jobname):
227        try:
228            jobid = int(jobid)
229        except ValueError:
230            jobid = int(jobid.strip().split('.')[0])
231
232        jobname = jobname.strip()
233        # since pbs can handle jobnames of 15 characters or less,
234        # don't use the raj hack.
235        if len(jobname) <= 15:
236            return
237
238        if self.socket is None:
239            import socket
240            self.socket = socket.socket()
241            # Connect to pbs.pool and send the jobid/jobname pair to port
242            # 24465 (Raj didn't realize that there are only 64k ports and
243            # setup inetd to point to port 90001)
244            self.socket.connect((self.host, self.port))
245
246        self.socket.send("%s %s\n" % (jobid, jobname))
247
248namehack = NameHack()
249
250rootdir = conf.rootdir
251script = joinpath(rootdir, 'Base', 'job.py')
252
253for job in joblist:
254    jobdir = JobDir(joinpath(rootdir, job.name))
255    if depend:
256        cptdir = JobDir(joinpath(rootdir, job.checkpoint.name))
257        path = str(cptdir)
258        if not isdir(path) or not isfile(joinpath(path, '.success')):
259            continue
260
261        cptjob = cptdir.readval('.batch_jobid')
262
263    if not onlyecho:
264        jobdir.create()
265        os.chdir(str(jobdir))
266        os.environ['PWD'] = str(jobdir)
267
268    print 'Job name:       %s' % job.name
269    print 'Job directory:  %s' % jobdir
270
271
272    qsub = batch.oarsub()
273    qsub.oarhost = 'poolfs.eecs.umich.edu'
274    #qsub.stdout = jobdir.file('jobout')
275    qsub.name = job.name
276    qsub.walltime = '50'
277    #qsub.join = True
278    #qsub.node_type = node_type
279    #qsub.env['ROOTDIR'] = conf.rootdir
280    #qsub.env['JOBNAME'] = job.name
281    #if depend:
282    #    qsub.afterok = cptjob
283    #if queue:
284    #    qsub.queue = queue
285    qsub.properties = "64bit = 'Yes' or 64bit = 'No'"
286    qsub.build(script)
287
288    if verbose:
289        print 'cwd:    %s' % qsub.command
290        print 'PBS Command:    %s' % qsub.command
291
292    if not onlyecho:
293        ec = qsub.do()
294        if ec == 0:
295            jobid = qsub.result
296            print 'OAR Jobid:      %s' % jobid
297            #namehack.setname(jobid, job.name)
298            queued = date()
299            jobdir.echofile('.batch_jobid', jobid)
300            jobdir.echofile('.batch_jobname', job.name)
301            jobdir.echofile('.queued', queued)
302            jobdir.setstatus('queued on %s' % queued)
303        else:
304            print 'OAR Failed'
305    print
306    print
307