1#!/usr/bin/env python
2# Copyright (c) 2005 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Ali Saidi
29# Nathan Binkert
30
31import os, os.path, re, socket, sys
32from os import environ as env, listdir
33from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
34from filecmp import cmp as filecmp
35from shutil import copy
36
37def nfspath(dir):
38 if dir.startswith('/.automount/'):
39 dir = '/n/%s' % dir[12:]
40 elif not dir.startswith('/n/'):
41 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
42 return dir
43
44def syncdir(srcdir, destdir):
45 srcdir = normpath(srcdir)
46 destdir = normpath(destdir)
47 if not isdir(destdir):
48 sys.exit('destination directory "%s" does not exist' % destdir)
49
50 for root, dirs, files in os.walk(srcdir):
51 root = normpath(root)
52 prefix = os.path.commonprefix([root, srcdir])
53 root = root[len(prefix):]
54 if root.startswith('/'):
55 root = root[1:]
56 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
57 dirs.remove(rem)
58
59 for entry in dirs:
60 newdir = joinpath(destdir, root, entry)
61 if not isdir(newdir):
62 os.mkdir(newdir)
63 print 'mkdir', newdir
64
65 for i,d in enumerate(dirs):
66 if islink(joinpath(srcdir, root, d)):
67 dirs[i] = joinpath(d, '.')
68
69 for entry in files:
70 dest = normpath(joinpath(destdir, root, entry))
71 src = normpath(joinpath(srcdir, root, entry))
72 if not isfile(dest) or not filecmp(src, dest):
73 print 'copy %s %s' % (dest, src)
74 copy(src, dest)
75
76progpath = nfspath(sys.path[0])
77progname = basename(sys.argv[0])
78usage = """\
79Usage:
80 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
81 -c clean directory if job can be run
82 -C submit the checkpointing runs
83 -d Make jobs be dependent on the completion of the checkpoint runs
84 -e only echo pbs command info, don't actually send the job
85 -f force the job to run regardless of state
86 -q <queue> submit job to the named queue
87 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
88 -v be verbose
89
90 %(progname)s [-j <jobfile>] -l [-v] <regexp>
91 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
92 -l list job names, don't submit
93 -v be verbose (list job parameters)
94
95 %(progname)s -h
96 -h display this help
97""" % locals()
98
99try:
100 import getopt
101 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lq:Rt:v')
101 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
102except getopt.GetoptError:
103 sys.exit(usage)
104
105depend = False
106clean = False
107onlyecho = False
108exprs = []
109force = False
110listonly = False
111queue = ''
112verbose = False
113jfile = 'Test.py'
114docpts = False
115doruns = True
116runflag = False
117node_type = 'FAST'
118update = True
119
120for opt,arg in opts:
121 if opt == '-C':
122 docpts = True
123 if opt == '-c':
124 clean = True
125 if opt == '-d':
126 depend = True
127 if opt == '-e':
128 onlyecho = True
129 if opt == '-f':
130 force = True
131 if opt == '-h':
132 print usage
133 sys.exit(0)
134 if opt == '-j':
135 jfile = arg
136 if opt == '-l':
137 listonly = True
138 if opt == '-n':
139 update = False
140 if opt == '-q':
141 queue = arg
142 if opt == '-R':
143 runflag = True
144 if opt == '-t':
145 node_type = arg
146 if opt == '-v':
147 verbose = True
148
149if docpts:
150 doruns = runflag
151
152for arg in args:
153 exprs.append(re.compile(arg))
154
155import jobfile, pbs
156from job import JobDir, date
157
158conf = jobfile.JobFile(jfile)
159
157if not listonly and not onlyecho and isdir(conf.linkdir):
160if update and not listonly and not onlyecho and isdir(conf.linkdir):
161 if verbose:
162 print 'Checking for outdated files in Link directory'
163 if not isdir(conf.basedir):
164 os.mkdir(conf.basedir)
165 syncdir(conf.linkdir, conf.basedir)
166
167jobnames = {}
168joblist = []
169
170if docpts and doruns:
171 gen = conf.alljobs()
172elif docpts:
173 gen = conf.checkpoints()
174elif doruns:
175 gen = conf.jobs()
176
177for job in gen:
178 if job.name in jobnames:
179 continue
180
181 if exprs:
182 for expr in exprs:
183 if expr.match(job.name):
184 joblist.append(job)
185 break
186 else:
187 joblist.append(job)
188
189if listonly:
190 if verbose:
191 for job in joblist:
192 job.printinfo()
193 else:
194 for job in joblist:
195 print job.name
196 sys.exit(0)
197
198if not onlyecho:
199 newlist = []
200 for job in joblist:
201 jobdir = JobDir(joinpath(conf.rootdir, job.name))
202 if jobdir.exists():
203 if not force:
204 status = jobdir.getstatus()
205 if status == 'queued':
206 continue
207
208 if status == 'running':
209 continue
210
211 if status == 'success':
212 continue
213
214 if not clean:
215 sys.exit('job directory %s not clean!' % jobdir)
216
217 jobdir.clean()
218 newlist.append(job)
219 joblist = newlist
220
221class NameHack(object):
222 def __init__(self, host='pbs.pool', port=24465):
223 self.host = host
224 self.port = port
225 self.socket = None
226
227 def setname(self, jobid, jobname):
228 try:
229 jobid = int(jobid)
230 except ValueError:
231 jobid = int(jobid.strip().split('.')[0])
232
233 jobname = jobname.strip()
234 # since pbs can handle jobnames of 15 characters or less,
235 # don't use the raj hack.
236 if len(jobname) <= 15:
237 return
238
239 if self.socket is None:
240 import socket
241 self.socket = socket.socket()
242 # Connect to pbs.pool and send the jobid/jobname pair to port
243 # 24465 (Raj didn't realize that there are only 64k ports and
244 # setup inetd to point to port 90001)
245 self.socket.connect((self.host, self.port))
246
247 self.socket.send("%s %s\n" % (jobid, jobname))
248
249namehack = NameHack()
250
251for job in joblist:
252 jobdir = JobDir(joinpath(conf.rootdir, job.name))
253 if depend:
254 cptdir = JobDir(joinpath(conf.rootdir, job.checkpoint.name))
255 cptjob = cptdir.readval('.pbs_jobid')
256
257 if not onlyecho:
258 jobdir.create()
259
260 print 'Job name: %s' % job.name
261 print 'Job directory: %s' % jobdir
262
263 qsub = pbs.qsub()
264 qsub.pbshost = 'simpool.eecs.umich.edu'
265 qsub.stdout = jobdir.file('jobout')
266 qsub.name = job.name[:15]
267 qsub.join = True
268 qsub.node_type = node_type
269 qsub.env['ROOTDIR'] = conf.rootdir
270 qsub.env['JOBNAME'] = job.name
271 if depend:
272 qsub.afterok = cptjob
273 if queue:
274 qsub.queue = queue
275 qsub.build(joinpath(progpath, 'job.py'))
276
277 if verbose:
278 print 'PBS Command: %s' % qsub.command
279
280 if not onlyecho:
281 ec = qsub.do()
282 if ec == 0:
283 jobid = qsub.result
284 print 'PBS Jobid: %s' % jobid
285 namehack.setname(jobid, job.name)
286 queued = date()
287 jobdir.echofile('.pbs_jobid', jobid)
288 jobdir.echofile('.pbs_jobname', job.name)
289 jobdir.echofile('.queued', queued)
290 jobdir.setstatus('queued on %s' % queued)
291 else:
292 print 'PBS Failed'