From 217b487d09cd85552cf0efe0517d492a0c73d743 Mon Sep 17 00:00:00 2001 From: Peter-Bernd Otte Date: Wed, 1 May 2019 19:29:14 +0200 Subject: [PATCH] included all basic functionality --- wkmgr.py | 49 ++++++++++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 19 deletions(-) diff --git a/wkmgr.py b/wkmgr.py index f033b29..edeb7d8 100755 --- a/wkmgr.py +++ b/wkmgr.py @@ -31,11 +31,15 @@ if rank == 0: args = parser.parse_args() print("Delay: "+str(args.delay)+" ms") - clients = {} +worklist = ['file1.txt', 'file2.txt'] worklist = ['file1.txt', 'file2.txt', 'file3.txt'] +worklist = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt'] + +if rank ==0: + print ("Number of files to process: "+str(len(worklist))) now = datetime.datetime.now() -outputdir = os.getcwd() + "output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second) +outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second) if os.path.isdir("./"+outputdir): print("Output directory already exists. Exiting.") exit(1) @@ -52,32 +56,39 @@ if rank == 0: print('Total number of workers ',size-1) for i in range(1,size): clients[i] = comm.recv(source=i) + print("All clients active. Printing clients details:") print(clients) - for i in range(1,size): - comm.send(args.execname, dest=i) + for i in range(len(worklist)): + print("ToDo: "+worklist[i]) + print("Wait for process...") + rec = comm.recv(tag=2) # get the rank of free worker + comm.send({"workstr":args.execname,"jobid":i}, dest=rec) sleep(args.delay/1000) + for i in range(1,size): # End all workers + comm.send({"workstr":"","jobid":i}, i) else: print ("rank "+str(rank)) nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() } comm.send(nodeinfo, dest=0) - workstr = comm.recv(source=0) - print("Rank "+str(rank)+" recieved: "+workstr) - - bashCommand = workstr + " > output"+str(rank)+".txt" - try: - completed = subprocess.run(bashCommand, shell=True) - print('Rank '+str(rank)+' returncode: ',completed.returncode) - except subprocess.CalledProcessError as err: - print('Rank '+str(rank)+' ERROR:', err) + while True: + comm.send(rank, dest=0, tag=2) # send a data package asking for more work + dic = comm.recv(source=0) + workstr = dic["workstr"] + if len(workstr)==0: #check if no more to do from master + break + jobid = dic["jobid"] + print("Rank "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr) + os.mkdir(outputdir+"/"+str(jobid)) + + bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt" + try: + completed = subprocess.run(bashCommand, shell=True) + print('Rank '+str(rank)+' returncode: ',completed.returncode) + except subprocess.CalledProcessError as err: + print('Rank '+str(rank)+' ERROR:', err) if rank == 0: -# print(clients) print(worklist) - -# run commands - -#cd into subdirector -# catch exceptions \ No newline at end of file -- GitLab