send.py (2357:add41108b549) send.py (11828:36b064696175)
1#!/usr/bin/env python
1#!/usr/bin/env python2
2# Copyright (c) 2006 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Kevin Lim
29
30import os, os.path, re, socket, sys
31from os import environ as env, listdir
32from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
33from filecmp import cmp as filecmp
34from shutil import copy
35
36def nfspath(dir):
37 if dir.startswith('/.automount/'):
38 dir = '/n/%s' % dir[12:]
39 elif not dir.startswith('/n/'):
40 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
41 return dir
42
43def syncdir(srcdir, destdir):
44 srcdir = normpath(srcdir)
45 destdir = normpath(destdir)
46 if not isdir(destdir):
47 sys.exit('destination directory "%s" does not exist' % destdir)
48
49 for root, dirs, files in os.walk(srcdir):
50 root = normpath(root)
51 prefix = os.path.commonprefix([root, srcdir])
52 root = root[len(prefix):]
53 if root.startswith('/'):
54 root = root[1:]
55 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
56 dirs.remove(rem)
57
58 for entry in dirs:
59 newdir = joinpath(destdir, root, entry)
60 if not isdir(newdir):
61 os.mkdir(newdir)
62 print 'mkdir', newdir
63
64 for i,d in enumerate(dirs):
65 if islink(joinpath(srcdir, root, d)):
66 dirs[i] = joinpath(d, '.')
67
68 for entry in files:
69 dest = normpath(joinpath(destdir, root, entry))
70 src = normpath(joinpath(srcdir, root, entry))
71 if not isfile(dest) or not filecmp(src, dest):
72 print 'copy %s %s' % (dest, src)
73 copy(src, dest)
74
75progpath = nfspath(sys.path[0])
76progname = basename(sys.argv[0])
77usage = """\
78Usage:
79 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
80 -c clean directory if job can be run
81 -C submit the checkpointing runs
82 -d Make jobs be dependent on the completion of the checkpoint runs
83 -e only echo pbs command info, don't actually send the job
84 -f force the job to run regardless of state
85 -q <queue> submit job to the named queue
86 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
87 -v be verbose
88
89 %(progname)s [-j <jobfile>] -l [-v] <regexp>
90 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
91 -l list job names, don't submit
92 -v be verbose (list job parameters)
93
94 %(progname)s -h
95 -h display this help
96""" % locals()
97
98try:
99 import getopt
100 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
101except getopt.GetoptError:
102 sys.exit(usage)
103
104depend = False
105clean = False
106onlyecho = False
107exprs = []
108force = False
109listonly = False
110queue = ''
111verbose = False
112jfile = 'Test.py'
113docpts = False
114doruns = True
115runflag = False
116node_type = 'FAST'
117update = True
118
119for opt,arg in opts:
120 if opt == '-C':
121 docpts = True
122 if opt == '-c':
123 clean = True
124 if opt == '-d':
125 depend = True
126 if opt == '-e':
127 onlyecho = True
128 if opt == '-f':
129 force = True
130 if opt == '-h':
131 print usage
132 sys.exit(0)
133 if opt == '-j':
134 jfile = arg
135 if opt == '-l':
136 listonly = True
137 if opt == '-n':
138 update = False
139 if opt == '-q':
140 queue = arg
141 if opt == '-R':
142 runflag = True
143 if opt == '-t':
144 node_type = arg
145 if opt == '-v':
146 verbose = True
147
148if docpts:
149 doruns = runflag
150
151for arg in args:
152 exprs.append(re.compile(arg))
153
154import jobfile, batch
155from job import JobDir, date
156
157conf = jobfile.JobFile(jfile)
158
159if update and not listonly and not onlyecho and isdir(conf.linkdir):
160 if verbose:
161 print 'Checking for outdated files in Link directory'
162 if not isdir(conf.basedir):
163 os.mkdir(conf.basedir)
164 syncdir(conf.linkdir, conf.basedir)
165
166jobnames = {}
167joblist = []
168
169if docpts and doruns:
170 gen = conf.alljobs()
171elif docpts:
172 gen = conf.checkpoints()
173elif doruns:
174 gen = conf.jobs()
175
176for job in gen:
177 if job.name in jobnames:
178 continue
179
180 if exprs:
181 for expr in exprs:
182 if expr.match(job.name):
183 joblist.append(job)
184 break
185 else:
186 joblist.append(job)
187
188if listonly:
189 if verbose:
190 for job in joblist:
191 job.printinfo()
192 else:
193 for job in joblist:
194 print job.name
195 sys.exit(0)
196
197if not onlyecho:
198 newlist = []
199 for job in joblist:
200 jobdir = JobDir(joinpath(conf.rootdir, job.name))
201 if jobdir.exists():
202 if not force:
203 status = jobdir.getstatus()
204 if status == 'queued':
205 continue
206
207 if status == 'running':
208 continue
209
210 if status == 'success':
211 continue
212
213 if not clean:
214 sys.exit('job directory %s not clean!' % jobdir)
215
216 jobdir.clean()
217 newlist.append(job)
218 joblist = newlist
219
220class NameHack(object):
221 def __init__(self, host='pbs.pool', port=24465):
222 self.host = host
223 self.port = port
224 self.socket = None
225
226 def setname(self, jobid, jobname):
227 try:
228 jobid = int(jobid)
229 except ValueError:
230 jobid = int(jobid.strip().split('.')[0])
231
232 jobname = jobname.strip()
233 # since pbs can handle jobnames of 15 characters or less,
234 # don't use the raj hack.
235 if len(jobname) <= 15:
236 return
237
238 if self.socket is None:
239 import socket
240 self.socket = socket.socket()
241 # Connect to pbs.pool and send the jobid/jobname pair to port
242 # 24465 (Raj didn't realize that there are only 64k ports and
243 # setup inetd to point to port 90001)
244 self.socket.connect((self.host, self.port))
245
246 self.socket.send("%s %s\n" % (jobid, jobname))
247
248namehack = NameHack()
249
250rootdir = conf.rootdir
251script = joinpath(rootdir, 'Base', 'job.py')
252
253for job in joblist:
254 jobdir = JobDir(joinpath(rootdir, job.name))
255 if depend:
256 cptdir = JobDir(joinpath(rootdir, job.checkpoint.name))
257 path = str(cptdir)
258 if not isdir(path) or not isfile(joinpath(path, '.success')):
259 continue
260
261 cptjob = cptdir.readval('.batch_jobid')
262
263 if not onlyecho:
264 jobdir.create()
265 os.chdir(str(jobdir))
266 os.environ['PWD'] = str(jobdir)
267
268 print 'Job name: %s' % job.name
269 print 'Job directory: %s' % jobdir
270
271
272 qsub = batch.oarsub()
273 qsub.oarhost = 'poolfs.eecs.umich.edu'
274 #qsub.stdout = jobdir.file('jobout')
275 qsub.name = job.name
276 qsub.walltime = '50'
277 #qsub.join = True
278 #qsub.node_type = node_type
279 #qsub.env['ROOTDIR'] = conf.rootdir
280 #qsub.env['JOBNAME'] = job.name
281 #if depend:
282 # qsub.afterok = cptjob
283 #if queue:
284 # qsub.queue = queue
285 qsub.properties = "64bit = 'Yes' or 64bit = 'No'"
286 qsub.build(script)
287
288 if verbose:
289 print 'cwd: %s' % qsub.command
290 print 'PBS Command: %s' % qsub.command
291
292 if not onlyecho:
293 ec = qsub.do()
294 if ec == 0:
295 jobid = qsub.result
296 print 'OAR Jobid: %s' % jobid
297 #namehack.setname(jobid, job.name)
298 queued = date()
299 jobdir.echofile('.batch_jobid', jobid)
300 jobdir.echofile('.batch_jobname', job.name)
301 jobdir.echofile('.queued', queued)
302 jobdir.setstatus('queued on %s' % queued)
303 else:
304 print 'OAR Failed'
305 print
306 print
2# Copyright (c) 2006 The Regents of The University of Michigan
3# All rights reserved.
4#
5# Redistribution and use in source and binary forms, with or without
6# modification, are permitted provided that the following conditions are
7# met: redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer;
9# redistributions in binary form must reproduce the above copyright
10# notice, this list of conditions and the following disclaimer in the
11# documentation and/or other materials provided with the distribution;
12# neither the name of the copyright holders nor the names of its
13# contributors may be used to endorse or promote products derived from
14# this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
17# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
18# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
19# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
20# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
26# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27#
28# Authors: Kevin Lim
29
30import os, os.path, re, socket, sys
31from os import environ as env, listdir
32from os.path import basename, isdir, isfile, islink, join as joinpath, normpath
33from filecmp import cmp as filecmp
34from shutil import copy
35
36def nfspath(dir):
37 if dir.startswith('/.automount/'):
38 dir = '/n/%s' % dir[12:]
39 elif not dir.startswith('/n/'):
40 dir = '/n/%s%s' % (socket.gethostname().split('.')[0], dir)
41 return dir
42
43def syncdir(srcdir, destdir):
44 srcdir = normpath(srcdir)
45 destdir = normpath(destdir)
46 if not isdir(destdir):
47 sys.exit('destination directory "%s" does not exist' % destdir)
48
49 for root, dirs, files in os.walk(srcdir):
50 root = normpath(root)
51 prefix = os.path.commonprefix([root, srcdir])
52 root = root[len(prefix):]
53 if root.startswith('/'):
54 root = root[1:]
55 for rem in [ d for d in dirs if d.startswith('.') or d == 'SCCS']:
56 dirs.remove(rem)
57
58 for entry in dirs:
59 newdir = joinpath(destdir, root, entry)
60 if not isdir(newdir):
61 os.mkdir(newdir)
62 print 'mkdir', newdir
63
64 for i,d in enumerate(dirs):
65 if islink(joinpath(srcdir, root, d)):
66 dirs[i] = joinpath(d, '.')
67
68 for entry in files:
69 dest = normpath(joinpath(destdir, root, entry))
70 src = normpath(joinpath(srcdir, root, entry))
71 if not isfile(dest) or not filecmp(src, dest):
72 print 'copy %s %s' % (dest, src)
73 copy(src, dest)
74
75progpath = nfspath(sys.path[0])
76progname = basename(sys.argv[0])
77usage = """\
78Usage:
79 %(progname)s [-c] [-e] [-f] [-j <jobfile>] [-q queue] [-v] <regexp>
80 -c clean directory if job can be run
81 -C submit the checkpointing runs
82 -d Make jobs be dependent on the completion of the checkpoint runs
83 -e only echo pbs command info, don't actually send the job
84 -f force the job to run regardless of state
85 -q <queue> submit job to the named queue
86 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
87 -v be verbose
88
89 %(progname)s [-j <jobfile>] -l [-v] <regexp>
90 -j <jobfile> specify the jobfile (default is <rootdir>/Test.py)
91 -l list job names, don't submit
92 -v be verbose (list job parameters)
93
94 %(progname)s -h
95 -h display this help
96""" % locals()
97
98try:
99 import getopt
100 opts, args = getopt.getopt(sys.argv[1:], '-Ccdefhj:lnq:Rt:v')
101except getopt.GetoptError:
102 sys.exit(usage)
103
104depend = False
105clean = False
106onlyecho = False
107exprs = []
108force = False
109listonly = False
110queue = ''
111verbose = False
112jfile = 'Test.py'
113docpts = False
114doruns = True
115runflag = False
116node_type = 'FAST'
117update = True
118
119for opt,arg in opts:
120 if opt == '-C':
121 docpts = True
122 if opt == '-c':
123 clean = True
124 if opt == '-d':
125 depend = True
126 if opt == '-e':
127 onlyecho = True
128 if opt == '-f':
129 force = True
130 if opt == '-h':
131 print usage
132 sys.exit(0)
133 if opt == '-j':
134 jfile = arg
135 if opt == '-l':
136 listonly = True
137 if opt == '-n':
138 update = False
139 if opt == '-q':
140 queue = arg
141 if opt == '-R':
142 runflag = True
143 if opt == '-t':
144 node_type = arg
145 if opt == '-v':
146 verbose = True
147
148if docpts:
149 doruns = runflag
150
151for arg in args:
152 exprs.append(re.compile(arg))
153
154import jobfile, batch
155from job import JobDir, date
156
157conf = jobfile.JobFile(jfile)
158
159if update and not listonly and not onlyecho and isdir(conf.linkdir):
160 if verbose:
161 print 'Checking for outdated files in Link directory'
162 if not isdir(conf.basedir):
163 os.mkdir(conf.basedir)
164 syncdir(conf.linkdir, conf.basedir)
165
166jobnames = {}
167joblist = []
168
169if docpts and doruns:
170 gen = conf.alljobs()
171elif docpts:
172 gen = conf.checkpoints()
173elif doruns:
174 gen = conf.jobs()
175
176for job in gen:
177 if job.name in jobnames:
178 continue
179
180 if exprs:
181 for expr in exprs:
182 if expr.match(job.name):
183 joblist.append(job)
184 break
185 else:
186 joblist.append(job)
187
188if listonly:
189 if verbose:
190 for job in joblist:
191 job.printinfo()
192 else:
193 for job in joblist:
194 print job.name
195 sys.exit(0)
196
197if not onlyecho:
198 newlist = []
199 for job in joblist:
200 jobdir = JobDir(joinpath(conf.rootdir, job.name))
201 if jobdir.exists():
202 if not force:
203 status = jobdir.getstatus()
204 if status == 'queued':
205 continue
206
207 if status == 'running':
208 continue
209
210 if status == 'success':
211 continue
212
213 if not clean:
214 sys.exit('job directory %s not clean!' % jobdir)
215
216 jobdir.clean()
217 newlist.append(job)
218 joblist = newlist
219
220class NameHack(object):
221 def __init__(self, host='pbs.pool', port=24465):
222 self.host = host
223 self.port = port
224 self.socket = None
225
226 def setname(self, jobid, jobname):
227 try:
228 jobid = int(jobid)
229 except ValueError:
230 jobid = int(jobid.strip().split('.')[0])
231
232 jobname = jobname.strip()
233 # since pbs can handle jobnames of 15 characters or less,
234 # don't use the raj hack.
235 if len(jobname) <= 15:
236 return
237
238 if self.socket is None:
239 import socket
240 self.socket = socket.socket()
241 # Connect to pbs.pool and send the jobid/jobname pair to port
242 # 24465 (Raj didn't realize that there are only 64k ports and
243 # setup inetd to point to port 90001)
244 self.socket.connect((self.host, self.port))
245
246 self.socket.send("%s %s\n" % (jobid, jobname))
247
248namehack = NameHack()
249
250rootdir = conf.rootdir
251script = joinpath(rootdir, 'Base', 'job.py')
252
253for job in joblist:
254 jobdir = JobDir(joinpath(rootdir, job.name))
255 if depend:
256 cptdir = JobDir(joinpath(rootdir, job.checkpoint.name))
257 path = str(cptdir)
258 if not isdir(path) or not isfile(joinpath(path, '.success')):
259 continue
260
261 cptjob = cptdir.readval('.batch_jobid')
262
263 if not onlyecho:
264 jobdir.create()
265 os.chdir(str(jobdir))
266 os.environ['PWD'] = str(jobdir)
267
268 print 'Job name: %s' % job.name
269 print 'Job directory: %s' % jobdir
270
271
272 qsub = batch.oarsub()
273 qsub.oarhost = 'poolfs.eecs.umich.edu'
274 #qsub.stdout = jobdir.file('jobout')
275 qsub.name = job.name
276 qsub.walltime = '50'
277 #qsub.join = True
278 #qsub.node_type = node_type
279 #qsub.env['ROOTDIR'] = conf.rootdir
280 #qsub.env['JOBNAME'] = job.name
281 #if depend:
282 # qsub.afterok = cptjob
283 #if queue:
284 # qsub.queue = queue
285 qsub.properties = "64bit = 'Yes' or 64bit = 'No'"
286 qsub.build(script)
287
288 if verbose:
289 print 'cwd: %s' % qsub.command
290 print 'PBS Command: %s' % qsub.command
291
292 if not onlyecho:
293 ec = qsub.do()
294 if ec == 0:
295 jobid = qsub.result
296 print 'OAR Jobid: %s' % jobid
297 #namehack.setname(jobid, job.name)
298 queued = date()
299 jobdir.echofile('.batch_jobid', jobid)
300 jobdir.echofile('.batch_jobname', job.name)
301 jobdir.echofile('.queued', queued)
302 jobdir.setstatus('queued on %s' % queued)
303 else:
304 print 'OAR Failed'
305 print
306 print