Last week there was a temporary error with email sending that lead to GitLab notification emails not being delivered. The problem should be solved now - however, failed notification emails are not being resent!

Commit 5c9b304c authored by Peter-Bernd Otte's avatar Peter-Bernd Otte

change job argument list

parent 925a11a4
......@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*-
#
# Workload Distributor
# Version 0.2
# Version 0.3
#
# Minimal python version 3.6
# on Mogon 2: run first
......@@ -25,51 +25,65 @@ rank = comm.Get_rank()
logging.basicConfig(level=logging.WARNING, format='%(asctime)-15s %(name)-6s %(levelname)-8s %(message)s')
logger = logging.getLogger('rank' + str(rank))
if size < 2:
logger.error(
"Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 "
"./wkmgr.py [execname]")
exit(2)
now = datetime.datetime.now()
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
version='%(prog)s 0.2')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50,
type=int)
parser.add_argument("inputdir", help="specifies the directory with files to process", default=".")
version='%(prog)s 0.3')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive jobs to help "
"distributing load", default=50, type=int)
parser.add_argument("-i", "--input-dir", help="specifies the directory with files to process", default=".")
parser.add_argument("-o", "--output-dir", help="output directory, default = output[datetime]", default=os.getcwd() +
"/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
now.hour, now.minute, now.second) )
parser.add_argument("-a", "--argument", help="line called for each job. Default = "
"'{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt'",
default='{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt')
clients = {}
if rank == 0:
if size < 2:
logger.warning(
"Number of workers (MPI ranks) = 1, to fully unleash computer resources, please run this script with the "
"appropriate launcher and a higher number of MPI ranks. eg mpirun -n 4 ./wkmgr.py [execname]")
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
logger.info("Input dir: " + args.inputdir)
inputdir = args.input_dir+"/"
if not os.path.isdir(inputdir):
logger.error("Input directory does not exist. Exiting.")
exit(1)
logger.info("Input dir: " + inputdir)
logger.info("Delay between files: " + str(args.delay) + " ms")
logger.info("Executing: " + str(args.execname))
# prepare list of files to process
worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
logger.info("Number of files to process: " + str(len(worklist)))
logger.info("List of files to process:")
logger.info(worklist)
inputfilelist = [f for f in os.listdir(inputdir) if os.path.isfile(os.path.join(inputdir, f))]
logger.info("List of files to process (total: "+str(len(inputfilelist))+"):")
logger.info(inputfilelist)
# prepare output directory
now = datetime.datetime.now()
outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}/".format(now.year, now.month, now.day,
now.hour, now.minute, now.second)
if args.verbose:
logger.info("Output dir: " + outputdir)
outputdir = args.output_dir+"/"
logger.info("Output dir: " + outputdir)
if os.path.isdir("./" + outputdir):
logger.error("Output directory already exists. Exiting.")
exit(1)
exit(2)
os.mkdir(outputdir)
if args.verbose:
logger.info("output directory created.")
logger.info('Total number of workers ' + str(size))
logger.info("output directory created.")
#prepare worklist
worklist = []
for i, v in enumerate(inputfilelist):
data = {'execname':args.execname, 'inputdir':inputdir, 'jobid': i, 'inputfilename':v, 'outputdir':outputdir}
# more hints: see https://pyformat.info
# worklist.append( ('{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt').format(**data) )
worklist.append( args.argument.format(**data) )
logger.info("List if jobs:")
logger.info(worklist)
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
......@@ -86,7 +100,7 @@ if settings["verbose"]:
logger.setLevel(logging.DEBUG)
logger.info("Worker fully initialised.")
if rank == 0:
logger.info("All clients active. Printing clients details:")
logger.info("All clients (total "+str(size)+") active. Printing clients details:")
logger.info(clients)
# ---
......@@ -124,7 +138,8 @@ while True:
i = WLPosition
logger.info("Send a new job ("+worklist[i]+") to rank " + str(rec) + "...")
workstr = args.execname + " " + args.inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
#workstr = args.execname + " " + inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
workstr = worklist[i]
s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
WLPosition += 1
SenderLastJobTimeStamp = time.time()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment