Commit 5a749421 authored by Peter-Bernd Otte's avatar Peter-Bernd Otte

rank 0 participates as worker

parent 83deb81b
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
# Workload Distributor # Workload Distributor
# Version 0.1 # Version 0.2
# #
# Minimal python version 3.6 # Minimal python version 3.6
# on Mogon 2: run first # on Mogon 2: run first
...@@ -13,18 +13,20 @@ import platform ...@@ -13,18 +13,20 @@ import platform
import multiprocessing #to get number of cores import multiprocessing #to get number of cores
import argparse import argparse
import subprocess import subprocess
from time import sleep import time
import os import os
import datetime import datetime
import logging import logging
from threading import Thread from threading import Thread
logging.basicConfig(level=logging.WARNING,format='%(asctime)-15s %(name)-5s %(levelname)-8s %(message)s')
logger = logging.getLogger('workload-manager')
comm = MPI.COMM_WORLD comm = MPI.COMM_WORLD
size = comm.Get_size() size = comm.Get_size()
rank = comm.Get_rank() rank = comm.Get_rank()
logging.basicConfig(level=logging.WARNING,format='%(asctime)-15s %(name)-6s %(levelname)-8s %(message)s')
logger = logging.getLogger('rank'+str(rank))
if size<2: if size<2:
logger.error("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname]") logger.error("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname]")
exit(2) exit(2)
...@@ -32,18 +34,17 @@ if size<2: ...@@ -32,18 +34,17 @@ if size<2:
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.') parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call') parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true") parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.1') parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.2')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int) parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int)
parser.add_argument("inputdir", help="specifies the directory with files to process") parser.add_argument("inputdir", help="specifies the directory with files to process", default=".")
clients = {}
if rank == 0: if rank == 0:
args = parser.parse_args() args = parser.parse_args()
if args.verbose: if args.verbose:
logger.setLevel(logging.DEBUG) logger.setLevel(logging.DEBUG)
clients = {}
if rank == 0:
logger.info("Verbosity turned on") logger.info("Verbosity turned on")
logger.info("Input dir: "+args.inputdir) logger.info("Input dir: "+args.inputdir)
logger.info("Delay between files: "+str(args.delay)+" ms") logger.info("Delay between files: "+str(args.delay)+" ms")
...@@ -66,56 +67,122 @@ if rank == 0: ...@@ -66,56 +67,122 @@ if rank == 0:
os.mkdir(outputdir) os.mkdir(outputdir)
if args.verbose: if args.verbose:
logger.info("output directory created.") logger.info("output directory created.")
logger.info('Total number of workers '+str(size-1) ) logger.info('Total number of workers '+str(size) )
##### Send Data to master rank
# replace later with: MPI_Gathering
nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
for i in range(1,size): h5 = comm.isend(nodeinfo, dest=0, tag=5)
clients[i] = comm.recv(source=i) if rank ==0:
h3 = {}
for i in range(0,size):
clients[i] = comm.recv(source=i, tag=5)
h5.wait()
### Distribute Settings
# Replace later with MPI_Broadcasting
if rank ==0:
for i in range(0,size):
#distribute settings #distribute settings
comm.send({"verbose":args.verbose},dest=i,tag=3) h3[i]=comm.isend({"verbose":args.verbose},dest=i,tag=3)
if args.verbose:
logger.info("All clients active. Printing clients details:") settings=comm.recv(source=0,tag=3)
logger.info(clients) if settings["verbose"]:
logger.setLevel(logging.DEBUG)
for i in range(len(worklist)): logger.info("Worker "+str(rank)+" initialised.")
if args.verbose: if rank ==0:
logger.info("ToDo: "+worklist[i]) for i in range(0,size):
logger.info("Wait for process...") h3[i].wait()
rec = comm.recv(tag=2) # get the rank of free worker
comm.send({"workstr":args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt" ,"jobid":i,"outputdir":outputdir}, dest=rec) logger.info("All clients active. Printing clients details:")
sleep(args.delay/1000) logger.info(clients)
for i in range(1,size): # End all workers by sending an empty task
comm.send({"workstr":""}, i)
#### Go into working loop
WLPosition = 0
SenderFSM = 0 #0=Preparing for receive, 1=waiting between jobs, 2=waiting for rank to ask for taks, 5=Send empty taks to all
WorkerFSM = 0 #0=Send a request to Master, 1=wait for sending to comnplete, prepare for answer, 2=waiting for answer, start processing, 3=running, 4=completed and reset
SenderLastJobTimeStamp = 0 # to allow for delays between launched jobs
while True:
####
# Distribution stuff
if (rank == 0):
if SenderFSM==0:
if (WLPosition < len(worklist)): # still some work to be distributed
r2 = comm.irecv(tag=2) # wait for a worker to connect and get the rank of the free worker
logger.info("Waiting for some worker to connect...")
SenderFSM = 1
else: #no more work in the list
SenderFSM = 5
for i in range(0,size): # End all workers by sending an empty task
logger.info("Sending an empty task to rank "+str(i)+"...")
comm.send({"workstr":""}, i, tag=4)
if SenderFSM == 1: #delay time between jobs
if (time.time() - SenderLastJobTimeStamp > args.delay/1000):
SenderFSM = 2
if SenderFSM == 2:
if r2.Get_status():
rec = r2.wait()
SenderFSM = 0
i = WLPosition
logger.info("ToDo: "+worklist[i])
logger.info("Send a new job to rank "+str(rec)+"...")
workstr = args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt"
# workstr = args.execname
s4 = comm.isend({"workstr":workstr, "jobid":i,"outputdir":outputdir}, dest=rec, tag=4)
WLPosition += 1
SenderLastJobTimeStamp = time.time()
###
# Worker itself
if WorkerFSM==0:
h2 = comm.isend(rank, dest=0, tag=2) # send a data package asking for more work
WorkerFSM = 1
if WorkerFSM==1:
if h2.Test():
h2.wait()
r4 = comm.irecv(source=0, tag=4)
WorkerFSM = 2
if WorkerFSM==2:
if r4.Get_status():
dic = r4.wait()
workstr = dic["workstr"]
if len(workstr)==0: #check if no more to do from master
break
outputdir = dic["outputdir"]
jobid = dic["jobid"]
logger.info("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
os.mkdir(outputdir+"/"+str(jobid))
bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt"
try:
p = subprocess.Popen(bashCommand, shell=True)
logger.info('Worker startet job. ')
except subprocess.CalledProcessError as err:
logger.info('Worker '+str(rank)+' ERROR:', err)
WorkerFSM = 3
if WorkerFSM == 3: # Running
if p.poll() is not None:
logger.info("Process ended, ret code: "+str(p.returncode))
WorkerFSM = 4
if WorkerFSM==4:
logger.info("Worker "+str(rank)+": Work completed and reset")
WorkerFSM=0
if args.verbose: time.sleep(0.001) #to take some load off the node
logger.info("Controlling master ended.")
logger.info("Worker "+str(rank)+" ended.")
else:
nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() } if rank ==0:
comm.send(nodeinfo, dest=0) logger.info("Controlling master ended.")
settings=comm.recv(source=0,tag=3)
if settings["verbose"]:
print ("Worker "+str(rank)+" started.")
while True:
comm.send(rank, dest=0, tag=2) # send a data package asking for more work
dic = comm.recv(source=0)
workstr = dic["workstr"]
if len(workstr)==0: #check if no more to do from master
break
outputdir = dic["outputdir"]
jobid = dic["jobid"]
if settings["verbose"]:
print("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
os.mkdir(outputdir+"/"+str(jobid))
bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt"
try:
completed = subprocess.run(bashCommand, shell=True)
if settings["verbose"]:
print('Worker '+str(rank)+' returncode: ',completed.returncode)
except subprocess.CalledProcessError as err:
if settings["verbose"]:
print('Worker '+str(rank)+' ERROR:', err)
if settings["verbose"]:
print("Worker "+str(rank)+" ended.")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment