Commit 13300436 authored by Peter-Bernd Otte's avatar Peter-Bernd Otte

inputdir functionality added

parent e7a08f72
......@@ -20,58 +20,76 @@ import datetime
comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
if size<2:
print("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname|]")
exit(2)
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.1')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int)
parser.add_argument("inputdir", help="specifies the directory with files to process")
if rank == 0:
args = parser.parse_args()
print("Delay: "+str(args.delay)+" ms")
clients = {}
worklist = ['file1.txt', 'file2.txt']
worklist = ['file1.txt', 'file2.txt', 'file3.txt']
worklist = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']
if rank ==0:
print ("Number of files to process: "+str(len(worklist)))
now = datetime.datetime.now()
outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
if os.path.isdir("./"+outputdir):
print("Output directory already exists. Exiting.")
exit(1)
if rank == 0:
if args.verbose:
print("Verbosity turned on")
print("Input dir: "+args.inputdir)
print("Delay between files: "+str(args.delay)+" ms")
print("Executing: ",args.execname)
#prepare list of files to process
worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
if args.verbose:
print("verbosity turned on")
print("Executing: ",args.execname)
print("Output dir: ",outputdir)
print("Number of files to process: "+str(len(worklist)))
print("List of files to process:")
print(worklist)
#prepare output directory
now = datetime.datetime.now()
outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
if args.verbose:
print("Output dir: ",outputdir)
if os.path.isdir("./"+outputdir):
print("Output directory already exists. Exiting.")
exit(1)
os.mkdir(outputdir)
if args.verbose:
print("output directory created.")
print('Total number of workers ',size-1)
print('Total number of workers ',size-1)
for i in range(1,size):
clients[i] = comm.recv(source=i)
print("All clients active. Printing clients details:")
print(clients)
#distribute settings
comm.send({"verbose":args.verbose},dest=i,tag=3)
if args.verbose:
print("All clients active. Printing clients details:")
print(clients)
for i in range(len(worklist)):
print("ToDo: "+worklist[i])
print("Wait for process...")
if args.verbose:
print("ToDo: "+worklist[i])
print("Wait for process...")
rec = comm.recv(tag=2) # get the rank of free worker
comm.send({"workstr":args.execname,"jobid":i}, dest=rec)
comm.send({"workstr":args.execname + " " + worklist[i] ,"jobid":i,"outputdir":outputdir}, dest=rec)
sleep(args.delay/1000)
for i in range(1,size): # End all workers
comm.send({"workstr":"","jobid":i}, i)
for i in range(1,size): # End all workers by sending an empty task
comm.send({"workstr":""}, i)
if args.verbose:
print("Controlling master ended.")
else:
print ("rank "+str(rank))
nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
comm.send(nodeinfo, dest=0)
settings=comm.recv(source=0,tag=3)
if settings["verbose"]:
print ("Worker "+str(rank)+" started.")
while True:
comm.send(rank, dest=0, tag=2) # send a data package asking for more work
......@@ -79,16 +97,20 @@ else:
workstr = dic["workstr"]
if len(workstr)==0: #check if no more to do from master
break
outputdir = dic["outputdir"]
jobid = dic["jobid"]
print("Rank "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
if settings["verbose"]:
print("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
os.mkdir(outputdir+"/"+str(jobid))
bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt"
try:
completed = subprocess.run(bashCommand, shell=True)
print('Rank '+str(rank)+' returncode: ',completed.returncode)
if settings["verbose"]:
print('Worker '+str(rank)+' returncode: ',completed.returncode)
except subprocess.CalledProcessError as err:
print('Rank '+str(rank)+' ERROR:', err)
if settings["verbose"]:
print('Worker '+str(rank)+' ERROR:', err)
if rank == 0:
print(worklist)
if settings["verbose"]:
print("Worker "+str(rank)+" ended.")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment