diff --git a/wkmgr.py b/wkmgr.py index a6f389a21cfb30b08abcffa96fe93be56b12e485..0f81d4c18e78360451549fbff2c3aba0159ce9e8 100755 --- a/wkmgr.py +++ b/wkmgr.py @@ -2,7 +2,7 @@ # -*- coding: utf-8 -*- # # Workload Distributor -# Version 0.1 +# Version 0.2 # # Minimal python version 3.6 # on Mogon 2: run first @@ -13,18 +13,20 @@ import platform import multiprocessing #to get number of cores import argparse import subprocess -from time import sleep +import time import os import datetime import logging from threading import Thread -logging.basicConfig(level=logging.WARNING,format='%(asctime)-15s %(name)-5s %(levelname)-8s %(message)s') -logger = logging.getLogger('workload-manager') comm = MPI.COMM_WORLD size = comm.Get_size() rank = comm.Get_rank() + +logging.basicConfig(level=logging.WARNING,format='%(asctime)-15s %(name)-6s %(levelname)-8s %(message)s') +logger = logging.getLogger('rank'+str(rank)) + if size<2: logger.error("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname]") exit(2) @@ -32,18 +34,17 @@ if size<2: parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.') parser.add_argument('execname', help='name of executable to call') parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true") -parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.1') +parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.2') parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int) -parser.add_argument("inputdir", help="specifies the directory with files to process") +parser.add_argument("inputdir", help="specifies the directory with files to process", default=".") + +clients = {} if rank == 0: args = parser.parse_args() if args.verbose: logger.setLevel(logging.DEBUG) -clients = {} - -if rank == 0: logger.info("Verbosity turned on") logger.info("Input dir: "+args.inputdir) logger.info("Delay between files: "+str(args.delay)+" ms") @@ -66,56 +67,122 @@ if rank == 0: os.mkdir(outputdir) if args.verbose: logger.info("output directory created.") - logger.info('Total number of workers '+str(size-1) ) + logger.info('Total number of workers '+str(size) ) + +##### Send Data to master rank +# replace later with: MPI_Gathering +nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() } - for i in range(1,size): - clients[i] = comm.recv(source=i) +h5 = comm.isend(nodeinfo, dest=0, tag=5) +if rank ==0: + h3 = {} + for i in range(0,size): + clients[i] = comm.recv(source=i, tag=5) +h5.wait() + +### Distribute Settings +# Replace later with MPI_Broadcasting +if rank ==0: + for i in range(0,size): #distribute settings - comm.send({"verbose":args.verbose},dest=i,tag=3) - if args.verbose: - logger.info("All clients active. Printing clients details:") - logger.info(clients) - - for i in range(len(worklist)): - if args.verbose: - logger.info("ToDo: "+worklist[i]) - logger.info("Wait for process...") - rec = comm.recv(tag=2) # get the rank of free worker - comm.send({"workstr":args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt" ,"jobid":i,"outputdir":outputdir}, dest=rec) - sleep(args.delay/1000) - for i in range(1,size): # End all workers by sending an empty task - comm.send({"workstr":""}, i) + h3[i]=comm.isend({"verbose":args.verbose},dest=i,tag=3) + +settings=comm.recv(source=0,tag=3) +if settings["verbose"]: + logger.setLevel(logging.DEBUG) +logger.info("Worker "+str(rank)+" initialised.") +if rank ==0: + for i in range(0,size): + h3[i].wait() + + logger.info("All clients active. Printing clients details:") + logger.info(clients) + + +#### Go into working loop + +WLPosition = 0 + +SenderFSM = 0 #0=Preparing for receive, 1=waiting between jobs, 2=waiting for rank to ask for taks, 5=Send empty taks to all +WorkerFSM = 0 #0=Send a request to Master, 1=wait for sending to comnplete, prepare for answer, 2=waiting for answer, start processing, 3=running, 4=completed and reset +SenderLastJobTimeStamp = 0 # to allow for delays between launched jobs + +while True: + #### + # Distribution stuff + + if (rank == 0): + if SenderFSM==0: + if (WLPosition < len(worklist)): # still some work to be distributed + r2 = comm.irecv(tag=2) # wait for a worker to connect and get the rank of the free worker + logger.info("Waiting for some worker to connect...") + SenderFSM = 1 + else: #no more work in the list + SenderFSM = 5 + for i in range(0,size): # End all workers by sending an empty task + logger.info("Sending an empty task to rank "+str(i)+"...") + comm.send({"workstr":""}, i, tag=4) + + if SenderFSM == 1: #delay time between jobs + if (time.time() - SenderLastJobTimeStamp > args.delay/1000): + SenderFSM = 2 + + if SenderFSM == 2: + if r2.Get_status(): + rec = r2.wait() + SenderFSM = 0 + + i = WLPosition + logger.info("ToDo: "+worklist[i]) + logger.info("Send a new job to rank "+str(rec)+"...") + workstr = args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt" +# workstr = args.execname + s4 = comm.isend({"workstr":workstr, "jobid":i,"outputdir":outputdir}, dest=rec, tag=4) + WLPosition += 1 + SenderLastJobTimeStamp = time.time() + + + ### + # Worker itself + if WorkerFSM==0: + h2 = comm.isend(rank, dest=0, tag=2) # send a data package asking for more work + WorkerFSM = 1 + if WorkerFSM==1: + if h2.Test(): + h2.wait() + r4 = comm.irecv(source=0, tag=4) + WorkerFSM = 2 + if WorkerFSM==2: + if r4.Get_status(): + dic = r4.wait() + workstr = dic["workstr"] + if len(workstr)==0: #check if no more to do from master + break + outputdir = dic["outputdir"] + jobid = dic["jobid"] + logger.info("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr) + os.mkdir(outputdir+"/"+str(jobid)) + + bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt" + try: + p = subprocess.Popen(bashCommand, shell=True) + logger.info('Worker startet job. ') + except subprocess.CalledProcessError as err: + logger.info('Worker '+str(rank)+' ERROR:', err) + + WorkerFSM = 3 + if WorkerFSM == 3: # Running + if p.poll() is not None: + logger.info("Process ended, ret code: "+str(p.returncode)) + WorkerFSM = 4 + + if WorkerFSM==4: + logger.info("Worker "+str(rank)+": Work completed and reset") + WorkerFSM=0 - if args.verbose: - logger.info("Controlling master ended.") - -else: - nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() } - comm.send(nodeinfo, dest=0) - settings=comm.recv(source=0,tag=3) - if settings["verbose"]: - print ("Worker "+str(rank)+" started.") - - while True: - comm.send(rank, dest=0, tag=2) # send a data package asking for more work - dic = comm.recv(source=0) - workstr = dic["workstr"] - if len(workstr)==0: #check if no more to do from master - break - outputdir = dic["outputdir"] - jobid = dic["jobid"] - if settings["verbose"]: - print("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr) - os.mkdir(outputdir+"/"+str(jobid)) - - bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt" - try: - completed = subprocess.run(bashCommand, shell=True) - if settings["verbose"]: - print('Worker '+str(rank)+' returncode: ',completed.returncode) - except subprocess.CalledProcessError as err: - if settings["verbose"]: - print('Worker '+str(rank)+' ERROR:', err) - - if settings["verbose"]: - print("Worker "+str(rank)+" ended.") + time.sleep(0.001) #to take some load off the node + +logger.info("Worker "+str(rank)+" ended.") + +if rank ==0: + logger.info("Controlling master ended.")