Commit 1a96dba2 authored by Peter-Bernd Otte's avatar Peter-Bernd Otte

Added functionality for mutiple parameters.

parent 0a03d70a
......@@ -20,6 +20,7 @@ import time
import os
import datetime
import logging
#from numpy import arange #not possible to use due to conflicts with sundials module
comm = MPI.COMM_WORLD
size = comm.Get_size()
......@@ -34,13 +35,16 @@ parser.add_argument("-v", "--verbosity", help="increase output verbosity", defau
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
version='%(prog)s 0.4')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive jobs to help "
"distributing load", default=50, type=int)
"distributing load", default=100, type=int)
parser.add_argument('-a', "--arg-variable", help="add variables to use as execname argument.", action="append", nargs='?')
parser.add_argument("-i", "--input-dir", help="specifies the directory with files to process", default=".")
parser.add_argument("-o", "--output-dir", help="output directory, default = output[datetime]", default=os.getcwd() +
"/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
now.hour, now.minute, now.second) )
parser.add_argument("-s", "--single-argument", help="do not add the default placeholders to the execname", action="store_true")
parser.add_argument('execname', help='name of executable to call OR complete shell command line. If no'
parser.add_argument("-ni", "--no-inputfiles", help="set this flag if no iteration over input files requested.", action="store_true", default=False)
parser.add_argument("-n", "--dry-run", help="Do not launch workers, just build test settings.", action="store_true", default=False)
parser.add_argument('execname', help='name of executable to call OR complete shell command line. If no '
'placeholders are provided, " {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt" '
'is added.')
parser.add_argument('remainderargs', nargs=argparse.REMAINDER) #the remainder shall be added to the args.execname
......@@ -56,32 +60,51 @@ def join_l(l, sep):
if rank == 0:
if size < 2:
logger.warning(
"Number of workers (MPI ranks) = 1, to fully unleash computer resources, please run this script with the "
"Number of workers (MPI ranks) = 1, to fully unleash computer resources, run with the "
"appropriate launcher and a higher number of MPI ranks. eg mpirun -n 4 ./wkmgr.py [execname]")
args = parser.parse_args()
if args.verbosity > 1:
logger.setLevel(logging.INFO)
if args.verbosity > 0:
logger.setLevel(logging.DEBUG)
logger.setLevel(logging.WARNING-(args.verbosity*10 if args.verbosity <=2 else 20) )
inputdir = args.input_dir+"/"
if not os.path.isdir(inputdir):
logger.error("Input directory does not exist. Exiting.")
exit(1)
# perpare arg-variable
logger.debug("Arg_Variable: ")
logger.debug(args.arg_variable)
argvariables = []
if args.arg_variable:
for v in args.arg_variable:
argvariables.append(v.split(','))
for i, v in enumerate(argvariables):
if len(v)==1: argvariables[i] = [v[0], 0, 1, 1]
if len(v)==2: argvariables[i] = [v[0], 0, float(v[1]), 1]
if len(v)==3: argvariables[i] = [v[0], float(v[1]), float(v[2]), 1]
if len(v)==4: argvariables[i] = [v[0], float(v[1]), float(v[2]), float(v[3])]
logger.debug("Interpreted as:")
logger.debug(argvariables)
else:
logger.debug("No Arg_Variables provided.")
# prepare execname
execname = args.execname+join_l(args.remainderargs," ")
if execname.find('{') == -1:
if args.single_argument:
logger.info("Single-Argument option: no placeholders are added to the execname.")
logger.debug("Single-Argument option: no placeholders are added to the execname.")
else:
logger.info("No placeholders are provided with the execname. Default will be added.")
execname += " {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt"
for v2 in argvariables:
execname += " {"+str(v2[0])+"}"
if not args.no_inputfiles:
execname += " {inputdir}{inputfilename}"
execname += " {outputdir}{jobid}/outfile.txt"
logger.debug("execname: "+execname)
logger.info("Input dir: " + inputdir)
logger.info("Delay between files: " + str(args.delay) + " ms")
logger.info("Executing: \"" + execname +"\"")
logger.debug("Delay between files: " + str(args.delay) + " ms")
logger.debug("Executing: \"" + execname +"\"")
# prepare list of files to process
inputfilelist = [f for f in os.listdir(inputdir) if os.path.isfile(os.path.join(inputdir, f))]
......@@ -94,18 +117,49 @@ if rank == 0:
if os.path.isdir("./" + outputdir):
logger.error("Output directory already exists. Exiting.")
exit(2)
os.mkdir(outputdir)
logger.info("output directory created.")
if not args.dry_run:
os.mkdir(outputdir)
logger.debug("output directory created.")
#prepare worklist
worklist = []
for i, v in enumerate(inputfilelist):
data = {'execname':args.execname, 'inputdir':inputdir, 'jobid': i, 'inputfilename':v, 'outputdir':outputdir}
# more hints: see https://pyformat.info
# worklist.append( ('{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt').format(**data) )
worklist.append( execname.format(**data) )
logger.info("List if jobs:")
logger.info(worklist)
jobid = 0
varData = {}
def myarange(start, stop, step):
tv = []
i = start
while i < stop:
tv.append(i)
i += step
return tv
def buildWorklist(NArgVar, fvarData):
global worklist
global jobid
if NArgVar<len(argvariables):
v = argvariables[NArgVar]
for i3 in myarange(v[1], v[2], v[3]):
fvarData.update({v[0]:i3})
buildWorklist(NArgVar+1, fvarData)
else:
data = fvarData
data.update({'execname':args.execname, 'inputdir':inputdir, 'jobid': jobid, 'outputdir':outputdir})
if args.no_inputfiles:
logger.debug("Flag --no-inputfiles provided.")
logger.debug(data)
worklist.append( execname.format(**data) )
jobid += 1
else:
for i, v in enumerate(inputfilelist):
data.update({'jobid': jobid, 'inputfilename':v})
logger.debug(data)
worklist.append( execname.format(**data) )
jobid += 1
buildWorklist(0, varData)
logger.debug("List of jobs (total={}):".format(len(worklist)))
logger.debug(worklist)
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
......@@ -118,10 +172,8 @@ if rank == 0:
data = {"verbosity": args.verbosity}
settings = comm.bcast(data, root=0)
if settings["verbosity"] >1:
logger.setLevel(logging.INFO)
if settings["verbosity"] >0:
logger.setLevel(logging.DEBUG)
logger.setLevel(logging.WARNING-(settings["verbosity"]*10 if settings["verbosity"] <=2 else 20) )
logger.info("Worker fully initialised.")
if rank == 0:
logger.info("All clients (total "+str(size)+") active. Printing clients details:")
......@@ -131,6 +183,10 @@ if rank == 0:
# Start of main loop
WLPosition = 0 #Position in list of files
SenderLastJobTimeStamp = 0 # to allow for delays between launched jobs
if rank == 0:
if args.dry_run:
logger.info("Skipping execution loop, dry-run activated.")
WLPosition = len(worklist)
# --- Variables for FSM of sender and worker
SenderFSM = 0
......@@ -207,10 +263,13 @@ while True:
logger.info("Job ended, ret code: " + str(p.returncode))
WorkerFSM = 0
time.sleep(0.001) # to take some load off the node
if rank == 0:
time.sleep(0.01) # to take some load off the node
else:
time.sleep(1)
logger.info("Worker ended.")
if rank == 0:
logger.info("Controlling master ended.")
logger.info("Controlling master ended. It might take some time until the last worker (MPI rank) completed too.")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment