Commit 5728586c authored by Peter-Bernd Otte's avatar Peter-Bernd Otte

Changed for logging module

parent 652248bd
......@@ -16,12 +16,17 @@ import subprocess
from time import sleep
import os
import datetime
import logging
from threading import Thread
logging.basicConfig(level=logging.WARNING,format='%(asctime)-15s %(name)-5s %(levelname)-8s %(message)s')
logger = logging.getLogger('workload-manager')
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if size<2:
print("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname|]")
logger.error("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname]")
exit(2)
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
......@@ -33,48 +38,48 @@ parser.add_argument("inputdir", help="specifies the directory with files to proc
if rank == 0:
args = parser.parse_args()
if args.verbose:
logger.setLevel(logging.DEBUG)
clients = {}
if rank == 0:
if args.verbose:
print("Verbosity turned on")
print("Input dir: "+args.inputdir)
print("Delay between files: "+str(args.delay)+" ms")
print("Executing: ",args.execname)
logger.info("Verbosity turned on")
logger.info("Input dir: "+args.inputdir)
logger.info("Delay between files: "+str(args.delay)+" ms")
logger.info("Executing: "+str(args.execname))
#prepare list of files to process
worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
if args.verbose:
print("Number of files to process: "+str(len(worklist)))
print("List of files to process:")
print(worklist)
logger.info("Number of files to process: "+str(len(worklist)))
logger.info("List of files to process:")
logger.info(worklist)
#prepare output directory
now = datetime.datetime.now()
outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
if args.verbose:
print("Output dir: ",outputdir)
logger.info("Output dir: "+outputdir )
if os.path.isdir("./"+outputdir):
print("Output directory already exists. Exiting.")
logger.error("Output directory already exists. Exiting.")
exit(1)
os.mkdir(outputdir)
if args.verbose:
print("output directory created.")
print('Total number of workers ',size-1)
logger.info("output directory created.")
logger.info('Total number of workers '+str(size-1) )
for i in range(1,size):
clients[i] = comm.recv(source=i)
#distribute settings
comm.send({"verbose":args.verbose},dest=i,tag=3)
if args.verbose:
print("All clients active. Printing clients details:")
print(clients)
logger.info("All clients active. Printing clients details:")
logger.info(clients)
for i in range(len(worklist)):
if args.verbose:
print("ToDo: "+worklist[i])
print("Wait for process...")
logger.info("ToDo: "+worklist[i])
logger.info("Wait for process...")
rec = comm.recv(tag=2) # get the rank of free worker
comm.send({"workstr":args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt" ,"jobid":i,"outputdir":outputdir}, dest=rec)
sleep(args.delay/1000)
......@@ -82,7 +87,7 @@ if rank == 0:
comm.send({"workstr":""}, i)
if args.verbose:
print("Controlling master ended.")
logger.info("Controlling master ended.")
else:
nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment