wkmgr.py 6.26 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
5
# Version 0.2
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
6 7 8 9 10 11 12 13 14 15
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
import multiprocessing #to get number of cores
import argparse
import subprocess
16
import time
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
17 18
import os
import datetime
19 20 21
import logging
from threading import Thread

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
22 23 24 25

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
26 27 28 29

logging.basicConfig(level=logging.WARNING,format='%(asctime)-15s %(name)-6s %(levelname)-8s  %(message)s')
logger = logging.getLogger('rank'+str(rank))

30
if size<2:
31
  logger.error("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname]")
32
  exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
33 34 35 36

parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
37
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.2')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
38
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int)
39 40 41
parser.add_argument("inputdir", help="specifies the directory with files to process", default=".")

clients = {}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
42 43 44

if rank == 0:
  args = parser.parse_args()
45 46
  if args.verbose:
    logger.setLevel(logging.DEBUG)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
47

48 49 50 51
  logger.info("Verbosity turned on")
  logger.info("Input dir: "+args.inputdir)
  logger.info("Delay between files: "+str(args.delay)+" ms")
  logger.info("Executing: "+str(args.execname))
52 53 54

  #prepare list of files to process
  worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
55 56 57
  logger.info("Number of files to process: "+str(len(worklist)))
  logger.info("List of files to process:")
  logger.info(worklist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
58 59
  
  #prepare output directory
60 61 62
  now = datetime.datetime.now()
  outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
  if args.verbose:
63
    logger.info("Output dir: "+outputdir )
64
  if os.path.isdir("./"+outputdir):
65
    logger.error("Output directory already exists. Exiting.")
66
    exit(1)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
67
  os.mkdir(outputdir)
68
  if args.verbose:
69
    logger.info("output directory created.")
70 71 72 73 74
    logger.info('Total number of workers '+str(size) )

##### Send Data to master rank
# replace later with: MPI_Gathering
nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
75

76 77 78 79 80 81 82 83 84 85 86
h5 = comm.isend(nodeinfo, dest=0, tag=5)
if rank ==0:
  h3 = {}
  for i in range(0,size):
    clients[i] = comm.recv(source=i, tag=5)
h5.wait()

### Distribute Settings
# Replace later with MPI_Broadcasting
if rank ==0:
  for i in range(0,size):
87
    #distribute settings
88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
    h3[i]=comm.isend({"verbose":args.verbose},dest=i,tag=3)

settings=comm.recv(source=0,tag=3)
if settings["verbose"]:
  logger.setLevel(logging.DEBUG)
logger.info("Worker "+str(rank)+" initialised.")
if rank ==0:
  for i in range(0,size):
    h3[i].wait()

  logger.info("All clients active. Printing clients details:")
  logger.info(clients)


#### Go into working loop

WLPosition = 0

SenderFSM = 0 #0=Preparing for receive, 1=waiting between jobs, 2=waiting for rank to ask for taks,  5=Send empty taks to all
WorkerFSM = 0 #0=Send a request to Master, 1=wait for sending to comnplete, prepare for answer, 2=waiting for answer, start processing, 3=running,  4=completed and reset
SenderLastJobTimeStamp = 0 # to allow for delays between launched jobs

while True:
  ####
  # Distribution stuff

  if (rank == 0):
    if SenderFSM==0:
      if (WLPosition < len(worklist)): # still some work to be distributed
        r2 = comm.irecv(tag=2) # wait for a worker to connect and get the rank of the free worker
        logger.info("Waiting for some worker to connect...")
        SenderFSM = 1
      else: #no more work in the list
        SenderFSM = 5
        for i in range(0,size): # End all workers by sending an empty task
          logger.info("Sending an empty task to rank "+str(i)+"...")
          comm.send({"workstr":""}, i, tag=4)

    if SenderFSM == 1: #delay time between jobs
      if (time.time() - SenderLastJobTimeStamp > args.delay/1000):
        SenderFSM = 2

    if SenderFSM == 2:
      if r2.Get_status():
        rec = r2.wait()
        SenderFSM = 0

        i = WLPosition
        logger.info("ToDo: "+worklist[i])
        logger.info("Send a new job to rank "+str(rec)+"...")
        workstr = args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt"
#        workstr = args.execname
        s4 = comm.isend({"workstr":workstr, "jobid":i,"outputdir":outputdir}, dest=rec, tag=4)
        WLPosition += 1
        SenderLastJobTimeStamp = time.time()


  ###
  # Worker itself
  if WorkerFSM==0:
    h2 = comm.isend(rank, dest=0, tag=2) # send a data package asking for more work
    WorkerFSM = 1
  if WorkerFSM==1:
    if h2.Test():
      h2.wait()
      r4 = comm.irecv(source=0, tag=4)
      WorkerFSM = 2
  if WorkerFSM==2:
    if r4.Get_status():
      dic = r4.wait()
      workstr = dic["workstr"]
      if len(workstr)==0: #check if no more to do from master 
        break
      outputdir = dic["outputdir"]
      jobid = dic["jobid"]
      logger.info("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
      os.mkdir(outputdir+"/"+str(jobid))

      bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt"
      try:
        p = subprocess.Popen(bashCommand, shell=True)
        logger.info('Worker startet job. ')
      except subprocess.CalledProcessError as err:
        logger.info('Worker '+str(rank)+' ERROR:', err)

      WorkerFSM = 3
  if WorkerFSM == 3: # Running
    if p.poll() is not None:
      logger.info("Process ended, ret code: "+str(p.returncode))
      WorkerFSM = 4

  if WorkerFSM==4:
    logger.info("Worker "+str(rank)+": Work completed and reset")
    WorkerFSM=0
182
  
183 184 185 186 187 188
  time.sleep(0.001) #to take some load off the node

logger.info("Worker "+str(rank)+" ended.")

if rank ==0:
  logger.info("Controlling master ended.")