Jump to content
  • Announcements

    • admin

      PBS Forum Has Closed   06/12/17

      The PBS Works Support Forum is no longer active.  For PBS community-oriented questions and support, please join the discussion at http://community.pbspro.org.  Any new security advisories related to commercially-licensed products will be posted in the PBS User Area (https://secure.altair.com/UserArea/). 
Sign in to follow this  
adiaz

Splitting chunks across nodes

Recommended Posts

This is a core PBS hook that will split up a single chunk resource request based on the value of an environment variable MPROC_SPLIT.

So if you did:


qsub -vMPROC_SPLIT=4 -l select=1:ncpus=8:mem=512mb

You end up with a select statement of


-l select=2:ncpus=4:mem=256mb:mpiprocs=4

once the job actually gets in the queue. This is handy for use in conjunction with AIF Application Definitions or anywhere you would like to manipulate the submission of "incorrect" resource requests by a user. in the case where you dont have any resources that contain 8 cpus in one machine this script would prevent a user from queuing a job that would never run but adjusting the selection directive to one that would.


#!/usr/bin/python
'''
$Revision: 1.1 $

The goal of this script is to take any chunk with an ncpus value over 'mproc'
and make sure that the ncpus value and chunks values get divided correctly.

'''

import pbs
import sys
import re

my_name = pbs.event().hook_name
debug_me = False

# If resources_available.debug_hooks contains the name of this hook, then we
# turn on the debug flag.
if ( "debug_hooks" in pbs.server().resources_available and
my_name in str(pbs.server().resources_available['debug_hooks']).split(',')):
debug_me=True

def dbg_svr_log(string):
'''quick function to wrap debug logging to the server'''
# Abort if the hook_debug value is not set
if(debug_me):
header = "DEBUG"+"".join(["%s" % "*" for s in range(19)])
footer = "".join(["%s" % "*" for s in range(79)])
pbs.logmsg(pbs.LOG_ERROR, "%s\n%s\n%s" % ( header, string, footer ))

myjob = pbs.event().job;
errstr = None

#debug_fd = open ("/tmp/mproc_split_debug","w")
# If there is no MPROC_SPLIT environment variable designated for the job, we
# don't need to do anything for this job.
if (not "MPROC_SPLIT" in myjob.Variable_List ):
#print >>debug_fd, "Aborting because there is no MPROC_SPLIT env variable"
#debug_fd.close()
pbs.event().accept()

mproc = myjob.Variable_List['MPROC_SPLIT']

if( myjob.Resource_List["select"] == None ):
myjob.Resource_List["select"] = pbs.select("1:ncpus=1")

select=str(myjob.Resource_List["select"])
s_re=re.compile(r'(??P<qty>\d+)?(?P<res>(?:[\w\d]+?=[\w\d]+:?)+)')
res_re=re.compile(r'([\w\d]+)=([\w\d]+)')
found_chunksets = s_re.findall(select)

# If we find more than one chunkset, it's time to quit because we are obviously
# not working on the type of select statement this hook is intended for.
if ( len(found_chunksets) > 2 ):
#print >>debug_fd, "Aborting because there is more than one chunkset %s" % str(found_chunksets)
#debug_fd.close()
pbs.event().accept()

# Check our resource match and create a new select statement
s_match = s_re.search(select)
common_resources = {}
new_chunks = []
rem = 0
if ( s_match ):
qty=s_match.group('qty')
new_qty = qty
total_chunks = qty
# Once again, if the quantity is already above 1, no point in continuing
# because this isn't the select statement we are looking for.
if ( int(qty) > 1 ):
#print >>debug_fd, "Aborting because there is a qty value of %d on the lone chunkset" % int(qty)
#debug_fd.close()
pbs.event().accept()


res=s_match.group('res')
#print >>debug_fd, "What did we find on the main re? %s" % res
for res_set in res_re.finditer(res):
if(res_set.group(1) == 'ncpus'):
ncpus = res_set.group(2)
new_ncpus = ncpus

if ( ncpus > mproc ):
new_ncpus = mproc
new_qty = int(int(ncpus) / int(mproc))
rem = int(ncpus) % int(mproc)
total_chunks = new_qty
if ( rem > 0 ):
total_chunks += 1
else:
#print >>debug_fd, "found resource: %s = %s" % ( res_set.group(1), res_set.group(2) )
common_resources[res_set.group(1)]=res_set.group(2)

#print >>debug_fd, "Common resources: %s" % common_resources
#print >>debug_fd, "this far?"
#debug_fd.flush()

# Figure out if any of our common resources need to be divided by
# quantity.
common_res_string = ""
for k,v in common_resources.items():
if ( k == "mem" and ncpus > mproc ):
# Break out the units
memsplit = re.search(r'(\d+)([mwt][b])',v)
mem_val = memsplit.group(1)
mem_unit = memsplit.group(2)

new_memval = int(mem_val) / int(total_chunks)
new_mem = "%d%s" % ( new_memval, mem_unit )
common_res_string+= ":%s=%s" % ( k , str(new_mem) )
else:
common_res_string+= ":%s=%s" % ( k,v )

new_chunks = "%d:ncpus=%d:mpiprocs=%d%s" % ( int(new_qty), int(new_ncpus), int(new_ncpus), common_res_string)
if ( rem > 0 ):
new_chunks+="+1:ncpus=%d:mpiprocs=%d%s" % ( rem, rem, common_res_string )

#print >>debug_fd , "new select string: %s" % new_chunks
myjob.Resource_List['select'] = pbs.select(new_chunks)


if ( errstr != None ):
pbs.event().reject("You have the following errors, seek help: \n"+errstr)

#debug_fd.close()

Share this post


Link to post
Share on other sites

Please sign in to comment

You will be able to leave a comment after signing in



Sign In Now
Sign in to follow this  

×