1#!/usr/bin/env python
2# Copyright (c) 2005 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Ali Saidi
29# Nathan Binkert
30
31import os, os.path, re, socket, sys
32from os import environ as env, listdir
33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
34from filecmp import cmp as filecmp
35from shutil import copy
36
37def nfspath(dir):
38 if dir.startswith('/.automount/'):
39 dir = '/n/%s' % dir[12:]
40 elif not dir.startswith('/n/'):
41 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
42 return dir
43
44def syncdir(srcdir, destdir):
45 srcdir = normpath(srcdir)
46 destdir = normpath(destdir)
47 if not isdir(destdir):
48 sys.exit('destination directory "%s" does not exist' % destdir)
49
50 for root, dirs, files in os.walk(srcdir):
51 root = normpath(root)
52 prefix = os.path.commonprefix([root, srcdir])
53 root = root[len(prefix):]
54 if root.startswith('/'):
55 root = root[1:]
56 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57 dirs.remove(rem)
58
59 for entry in dirs:
60 newdir = joinpath(destdir, root, entry)
61 if not isdir(newdir):
62 os.mkdir(newdir)
63 print 'mkdir', newdir
64
65 for i,d in enumerate(dirs):
66 if islink(joinpath(srcdir, root, d)):
67 dirs[i] = joinpath(d, '.')
68
69 for entry in files:
70 dest = normpath(joinpath(destdir, root, entry))
71 src = normpath(joinpath(srcdir, root, entry))
72 if not isfile(dest) or not filecmp(src, dest):
73 print 'copy %s %s' % (dest, src)
74 copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81 -c clean directory if job can be run
82 -e only echo pbs command info, don't actually send the job
83 -f force the job to run regardless of state
84 -q <queue> submit job to the named queue
85 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
86 -v be verbose
87
88 %(progname)s [-j <jobfile>] -l [-v] <regexp>
89 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
90 -l list job names, don't submit
91 -v be verbose (list job parameters)
92
93 %(progname)s -h
94 -h display this help
95""" % locals()
96
97try:
98 import getopt
99 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lq:Rt:v')
99 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
100except getopt.GetoptError:
101 sys.exit(usage)
102
103depend = False
104clean = False
105onlyecho = False
106exprs = []
107force = False
108listonly = False
109queue = ''
110verbose = False
111jfile = 'Test.py'
112docpts = False
113doruns = True
114runflag = False
115node_type = 'FAST'
116update = True
117
118for opt,arg in opts:
119 if opt == '-C':
120 docpts = True
121 if opt == '-c':
122 clean = True
123 if opt == '-d':
124 depend = True
125 if opt == '-e':
126 onlyecho = True
127 if opt == '-f':
128 force = True
129 if opt == '-h':
130 print usage
131 sys.exit(0)
132 if opt == '-j':
133 jfile = arg
134 if opt == '-l':
135 listonly = True
136 if opt == '-n':
137 update = False
138 if opt == '-q':
139 queue = arg
140 if opt == '-R':
141 runflag = True
142 if opt == '-t':
143 node_type = arg
144 if opt == '-v':
145 verbose = True
146
147if docpts:
148 doruns = runflag
149
150for arg in args:
151 exprs.append(re.compile(arg))
152
153import jobfile, pbs
154from job import JobDir, date
155
156conf = jobfile.JobFile(jfile)
157
155if not listonly and not onlyecho and isdir(conf.linkdir):
158if update and not listonly and not onlyecho and isdir(conf.linkdir):
159 if verbose:
160 print 'Checking for outdated files in Link directory'
161 if not isdir(conf.basedir):
162 os.mkdir(conf.basedir)
163 syncdir(conf.linkdir, conf.basedir)
164
165jobnames = {}
166joblist = []
167
168if docpts and doruns:
169 gen = conf.alljobs()
170elif docpts:
171 gen = conf.checkpoints()
172elif doruns:
173 gen = conf.jobs()
174
175for job in gen:
176 if job.name in jobnames:
177 continue
178
179 if exprs:
180 for expr in exprs:
181 if expr.match(job.name):
182 joblist.append(job)
183 break
184 else:
185 joblist.append(job)
186
187if listonly:
188 if verbose:
189 for job in joblist:
190 job.printinfo()
191 else:
192 for job in joblist:
193 print job.name
194 sys.exit(0)
195
196if not onlyecho:
197 newlist = []
198 for job in joblist:
199 jobdir = JobDir(joinpath(conf.rootdir, job.name))
200 if jobdir.exists():
201 if not force:
202 status = jobdir.getstatus()
203 if status == 'queued':
204 continue
205
206 if status == 'running':
207 continue
208
209 if status == 'success':
210 continue
211
212 if not clean:
213 sys.exit('job directory %s not clean!' % jobdir)
214
215 jobdir.clean()
216 newlist.append(job)
217 joblist = newlist
218
219class NameHack(object):
220 def __init__(self, host='pbs.pool', port=24465):
221 self.host = host
222 self.port = port
223 self.socket = None
224
225 def setname(self, jobid, jobname):
226 try:
227 jobid = int(jobid)
228 except ValueError:
229 jobid = int(jobid.strip().split('.')[0])
230
231 jobname = jobname.strip()
232 # since pbs can handle jobnames of 15 characters or less,
233 # don't use the raj hack.
234 if len(jobname) <= 15:
235 return
236
237 if self.socket is None:
238 import socket
239 self.socket = socket.socket()
240 # Connect to pbs.pool and send the jobid/jobname pair to port
241 # 24465 (Raj didn't realize that there are only 64k ports and
242 # setup inetd to point to port 90001)
243 self.socket.connect((self.host, self.port))
244
245 self.socket.send("%s %s\n" % (jobid, jobname))
246
247namehack = NameHack()
248
249for job in joblist:
250 jobdir = JobDir(joinpath(conf.rootdir, job.name))
251 if depend:
252 cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name))
253 cptjob = cptdir.readval('.pbs_jobid')
254
255 if not onlyecho:
256 jobdir.create()
257
258 print 'Job name: %s' % job.name
259 print 'Job directory: %s' % jobdir
260
261 qsub = pbs.qsub()
262 qsub.pbshost = 'simpool.eecs.umich.edu'
263 qsub.stdout = jobdir.file('jobout')
264 qsub.name = job.name[:15]
265 qsub.join = True
266 qsub.node_type = node_type
267 qsub.env['ROOTDIR'] = conf.rootdir
268 qsub.env['JOBNAME'] = job.name
269 if depend:
270 qsub.afterok = cptjob
271 if queue:
272 qsub.queue = queue
273 qsub.build(joinpath(progpath, 'job.py'))
274
275 if verbose:
276 print 'PBS Command: %s' % qsub.command
277
278 if not onlyecho:
279 ec = qsub.do()
280 if ec == 0:
281 jobid = qsub.result
282 print 'PBS Jobid: %s' % jobid
283 namehack.setname(jobid, job.name)
284 queued = date()
285 jobdir.echofile('.pbs_jobid', jobid)
286 jobdir.echofile('.pbs_jobname', job.name)
287 jobdir.echofile('.queued', queued)
288 jobdir.setstatus('queued on %s' % queued)
289 else:
290 print 'PBS Failed'