Last week there was a temporary error with email sending that lead to GitLab notification emails not being delivered. The problem should be solved now - however, failed notification emails are not being resent!

Commit 925a11a4 authored by Peter-Bernd Otte's avatar Peter-Bernd Otte

more compact code by introducing broadcasting and gathering

parent 850efb1e
......@@ -47,7 +47,6 @@ if rank == 0:
if args.verbose:
logger.setLevel(logging.DEBUG)
logger.info("Verbosity turned on")
logger.info("Input dir: " + args.inputdir)
logger.info("Delay between files: " + str(args.delay) + " ms")
logger.info("Executing: " + str(args.execname))
......@@ -60,7 +59,7 @@ if rank == 0:
# prepare output directory
now = datetime.datetime.now()
outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}/".format(now.year, now.month, now.day,
now.hour, now.minute, now.second)
if args.verbose:
logger.info("Output dir: " + outputdir)
......@@ -72,55 +71,38 @@ if rank == 0:
logger.info("output directory created.")
logger.info('Total number of workers ' + str(size))
# --- Send Data to master rank
# replace later with: MPI_Gathering
nodeinfo = {'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
'os': platform.system(), 'pythonVersion': platform.python_version()}
h5 = comm.isend(nodeinfo, dest=0, tag=5)
if rank == 0:
h3 = {}
for i in range(0, size):
clients[i] = comm.recv(source=i, tag=5)
h5.wait()
clients = comm.gather(data, root=0)
# --- Distribute Settings
# Replace later with MPI_Broadcasting
data = None
if rank == 0:
for i in range(0, size):
# distribute settings
h3[i] = comm.isend({"verbose": args.verbose}, dest=i, tag=3)
data = {"verbose": args.verbose}
settings = comm.bcast(data, root=0)
settings = comm.recv(source=0, tag=3)
if settings["verbose"]:
logger.setLevel(logging.DEBUG)
logger.info("Worker " + str(rank) + " initialised.")
logger.info("Worker fully initialised.")
if rank == 0:
for i in range(0, size):
h3[i].wait()
logger.info("All clients active. Printing clients details:")
logger.info(clients)
# --- Go into working loop
WLPosition = 0
# ---
# Start of main loop
WLPosition = 0 #Position in list of files
SenderLastJobTimeStamp = 0 # to allow for delays between launched jobs
# --- Variables for FSM of sender and worker
# 0=Preparing for receive, 1=waiting between jobs,
# 2=waiting for rank to ask for taks, 5=Send empty taks to all
SenderFSM = 0
# 0=Send a request to Master, 1=wait for sending to comnplete, prepare for answer,
# 2=waiting for answer, start processing, 3=running, 4=completed and reset
WorkerFSM = 0
SenderLastJobTimeStamp = 0 # to allow for delays between launched jobs
while True:
####
# ---
# Distribution stuff
if (rank == 0):
if SenderFSM == 0:
if SenderFSM == 0: #State 0: Preparing for receive
if WLPosition < len(worklist): # still some work to be distributed
r2 = comm.irecv(tag=2) # wait for a worker to connect and get the rank of the free worker
logger.info("Waiting for some worker to connect...")
......@@ -131,36 +113,37 @@ while True:
logger.info("Sending an empty task to rank " + str(i) + "...")
comm.send({"workstr": ""}, i, tag=4)
if SenderFSM == 1: # delay time between jobs
if SenderFSM == 1: #State 1: delay time between jobs
if time.time() - SenderLastJobTimeStamp > args.delay / 1000:
SenderFSM = 2
if SenderFSM == 2:
if SenderFSM == 2: #State 2: waiting for rank to ask for taks
if r2.Get_status():
rec = r2.wait()
SenderFSM = 0
i = WLPosition
logger.info("ToDo: " + worklist[i])
logger.info("Send a new job to rank " + str(rec) + "...")
workstr = args.execname + " " + args.inputdir + "/" + worklist[i] + " " + outputdir + "/" + str(
i) + "/outfile.txt"
# workstr = args.execname
logger.info("Send a new job ("+worklist[i]+") to rank " + str(rec) + "...")
workstr = args.execname + " " + args.inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
WLPosition += 1
SenderLastJobTimeStamp = time.time()
if SenderFSM == 5: #State 5: Send empty taks to all and do no further work
pass
###
# ---
# Worker itself
if WorkerFSM == 0:
if WorkerFSM == 0: #State 0: Send a request to Master
h2 = comm.isend(rank, dest=0, tag=2) # send a data package asking for more work
WorkerFSM = 1
if WorkerFSM == 1:
if WorkerFSM == 1: #State 1: wait for sending to comnplete, prepare for answer,
if h2.Test():
h2.wait()
r4 = comm.irecv(source=0, tag=4)
WorkerFSM = 2
if WorkerFSM == 2:
if WorkerFSM == 2: #State 2: waiting for answer, start processing
if r4.Get_status():
dic = r4.wait()
workstr = dic["workstr"]
......@@ -168,30 +151,27 @@ while True:
break
outputdir = dic["outputdir"]
jobid = dic["jobid"]
logger.info("Worker " + str(rank) + " recieved (internal job nr=" + str(jobid) + "): " + workstr)
os.mkdir(outputdir + "/" + str(jobid))
logger.info("Worker recieved (internal job nr=" + str(jobid) + "): " + workstr)
os.mkdir(outputdir + str(jobid))
bashCommand = workstr + " > " + outputdir + "/" + str(jobid) + "/std_out.txt 2> " + outputdir + "/" + str(
jobid) + "/err_out.txt"
bashCommand = workstr + " > " + outputdir + str(jobid) + "/std_out.txt 2> " + outputdir + str(jobid) + "/err_out.txt"
try:
p = subprocess.Popen(bashCommand, shell=True)
logger.info('Worker startet job. ')
logger.info('Worker startet job.')
except subprocess.CalledProcessError as err:
logger.info('Worker ' + str(rank) + ' ERROR:', err)
WorkerFSM = 3
if WorkerFSM == 3: # Running
if p.poll() is not None:
logger.info("Process ended, ret code: " + str(p.returncode))
WorkerFSM = 4
if WorkerFSM == 4:
logger.info("Worker " + str(rank) + ": Work completed and reset")
WorkerFSM = 0
if WorkerFSM == 3: #State 3: Running
if p.poll() is not None:
logger.info("Job ended, ret code: " + str(p.returncode))
WorkerFSM = 0
time.sleep(0.001) # to take some load off the node
logger.info("Worker " + str(rank) + " ended.")
logger.info("Worker ended.")
if rank == 0:
logger.info("Controlling master ended.")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment