wkmgr.py 11 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
5
# Version 0.4
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
6 7 8 9 10
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
11 12 13 14
try:
  from mpi4py import MPI
except ImportError:
  print("mpi4py module not loaded. On Mogon2 / HIMster2 type\n  module load lang/Python/3.6.6-foss-2018b\nfirst.\n\n")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
15
import platform
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
16
import multiprocessing  # to get number of cores
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
17 18
import argparse
import subprocess
19
import time
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
20 21
import os
import datetime
22
import logging
23
#from numpy import arange #not possible to use due to conflicts with sundials module
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
24 25 26 27

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
28

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
29 30
logging.basicConfig(level=logging.WARNING, format='%(asctime)-15s %(name)-6s %(levelname)-8s  %(message)s')
logger = logging.getLogger('rank' + str(rank))
31

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
32
now = datetime.datetime.now()
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
33
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
34
parser.add_argument("-v", "--verbosity", help="increase output verbosity", default=0, action="count")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
35
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
36
                    version='%(prog)s 0.4')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
37
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive jobs to help "
38 39
                    "distributing load", default=100, type=int)
parser.add_argument('-a', "--arg-variable", help="add variables to use as execname argument.", action="append", nargs='?')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
40 41 42 43
parser.add_argument("-i", "--input-dir", help="specifies the directory with files to process", default=".")
parser.add_argument("-o", "--output-dir", help="output directory, default = output[datetime]", default=os.getcwd() + 
                    "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
                                                                           now.hour, now.minute, now.second) )
44
parser.add_argument("-s", "--single-argument", help="do not add the default placeholders to the execname", action="store_true")
45 46 47
parser.add_argument("-ni", "--no-inputfiles", help="set this flag if no iteration over input files requested.", action="store_true", default=False)
parser.add_argument("-n", "--dry-run", help="Do not launch workers, just build test settings.", action="store_true", default=False)
parser.add_argument('execname', help='name of executable to call OR complete shell command line. If no '
48 49 50
                    'placeholders are provided, " {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt" '
                    'is added.')
parser.add_argument('remainderargs', nargs=argparse.REMAINDER) #the remainder shall be added to the args.execname
51 52

clients = {}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
53

54 55 56 57 58 59
def join_l(l, sep):
    out_str = ''
    for i, el in enumerate(l):
        out_str += '{}{}'.format(sep, el)
    return out_str #[:-len(sep)]

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
60
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
61 62
    if size < 2:
        logger.warning(
63
            "Number of workers (MPI ranks) = 1, to fully unleash computer resources, run with the "
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
64 65
            "appropriate launcher and a higher number of MPI ranks. eg mpirun -n 4 ./wkmgr.py [execname]")

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
66
    args = parser.parse_args()
67
    logger.setLevel(logging.WARNING-(args.verbosity*10 if args.verbosity <=2 else 20) )
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
68

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
69 70 71 72
    inputdir = args.input_dir+"/"
    if not os.path.isdir(inputdir):
        logger.error("Input directory does not exist. Exiting.")
        exit(1)
73 74 75 76 77 78 79 80 81 82 83 84
    
    # perpare arg-variable
    logger.debug("Arg_Variable: ")
    logger.debug(args.arg_variable)
    argvariables = []
    if args.arg_variable:
      for v in args.arg_variable:
        argvariables.append(v.split(','))
        for i, v in enumerate(argvariables):
            if len(v)==1: argvariables[i] = [v[0], 0, 1, 1]
            if len(v)==2: argvariables[i] =	[v[0], 0, float(v[1]), 1]
            if len(v)==3: argvariables[i] = [v[0], float(v[1]), float(v[2]), 1]
85
            if len(v)>=4: argvariables[i] = [v[0], float(v[1]), float(v[2]), float(v[3])]
86 87 88 89
      logger.debug("Interpreted as:")
      logger.debug(argvariables)
    else:
        logger.debug("No Arg_Variables provided.")
90 91 92 93 94

    # prepare execname
    execname = args.execname+join_l(args.remainderargs," ")
    if execname.find('{') == -1:
        if args.single_argument:
95
            logger.debug("Single-Argument option: no placeholders are added to the execname.")
96 97
        else:
            logger.info("No placeholders are provided with the execname. Default will be added.")
98 99 100 101 102 103
            for v2 in argvariables:
                execname += " {"+str(v2[0])+"}"
            if not args.no_inputfiles:
              execname += " {inputdir}{inputfilename}"
            execname += " {outputdir}{jobid}/outfile.txt"
            logger.debug("execname: "+execname)
104

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
105
    logger.info("Input dir: " + inputdir)
106 107
    logger.debug("Delay between files: " + str(args.delay) + " ms")
    logger.debug("Executing: \"" + execname +"\"")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
108 109

    # prepare list of files to process
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
110 111 112
    inputfilelist = [f for f in os.listdir(inputdir) if os.path.isfile(os.path.join(inputdir, f))]
    logger.info("List of files to process (total: "+str(len(inputfilelist))+"):")
    logger.info(inputfilelist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
113 114

    # prepare output directory
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
115 116
    outputdir = args.output_dir+"/"
    logger.info("Output dir: " + outputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
117 118
    if os.path.isdir("./" + outputdir):
        logger.error("Output directory already exists. Exiting.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
119
        exit(2)
120 121 122
    if not args.dry_run:
      os.mkdir(outputdir)
      logger.debug("output directory created.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
123 124 125

    #prepare worklist
    worklist = []
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162
    jobid = 0
    varData = {}
    def myarange(start, stop, step):
      tv = []
      i = start
      while i < stop:
        tv.append(i)
        i += step
      return tv

    def buildWorklist(NArgVar, fvarData):
      global worklist
      global jobid
      if NArgVar<len(argvariables):
        v = argvariables[NArgVar]
        for i3 in myarange(v[1], v[2], v[3]):
          fvarData.update({v[0]:i3})
          buildWorklist(NArgVar+1, fvarData)
      else:
        data = fvarData
        data.update({'execname':args.execname, 'inputdir':inputdir, 'jobid': jobid, 'outputdir':outputdir})
        if args.no_inputfiles:
          logger.debug("Flag --no-inputfiles provided.")
          logger.debug(data)
          worklist.append( execname.format(**data) )
          jobid += 1
        else:
          for i, v in enumerate(inputfilelist):
            data.update({'jobid': jobid, 'inputfilename':v})
            logger.debug(data)
            worklist.append( execname.format(**data) )
            jobid += 1
    
    buildWorklist(0, varData)
      
    logger.debug("List of jobs (total={}):".format(len(worklist)))
    logger.debug(worklist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
163

164 165
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
166
            'os': platform.system(), 'pythonVersion': platform.python_version()}
167
clients = comm.gather(data, root=0)
168

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
169
# --- Distribute Settings
170
data = None
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
171
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
172
    data = {"verbosity": args.verbosity}
173
settings = comm.bcast(data, root=0)
174

175 176
logger.setLevel(logging.WARNING-(settings["verbosity"]*10 if settings["verbosity"] <=2 else 20) )

177
logger.info("Worker fully initialised.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
178
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
179
    logger.info("All clients (total "+str(size)+") active. Printing clients details:")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
180
    logger.info(clients)
181

182 183 184 185
# --- 
# Start of main loop
WLPosition = 0 #Position in list of files
SenderLastJobTimeStamp = 0  # to allow for delays between launched jobs
186 187 188 189
if rank == 0:
  if args.dry_run:
    logger.info("Skipping execution loop, dry-run activated.")
    WLPosition = len(worklist)
190

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
191 192 193
# --- Variables for FSM of sender and worker
SenderFSM = 0
WorkerFSM = 0
194 195

while True:
196
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
197 198
    # Distribution stuff
    if (rank == 0):
199
        if SenderFSM == 0: #State 0: Preparing for receive
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
200 201 202 203 204 205 206 207 208 209
            if WLPosition < len(worklist):  # still some work to be distributed
                r2 = comm.irecv(tag=2)  # wait for a worker to connect and get the rank of the free worker
                logger.info("Waiting for some worker to connect...")
                SenderFSM = 1
            else:  # no more work in the list
                SenderFSM = 5
                for i in range(0, size):  # End all workers by sending an empty task
                    logger.info("Sending an empty task to rank " + str(i) + "...")
                    comm.send({"workstr": ""}, i, tag=4)

210
        if SenderFSM == 1: #State 1: delay time between jobs
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
211 212 213
            if time.time() - SenderLastJobTimeStamp > args.delay / 1000:
                SenderFSM = 2

214
        if SenderFSM == 2: #State 2: waiting for rank to ask for taks
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
215 216 217 218 219
            if r2.Get_status():
                rec = r2.wait()
                SenderFSM = 0

                i = WLPosition
220
                logger.info("Send a new job ("+worklist[i]+") to rank " + str(rec) + "...")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
221 222
                #workstr = args.execname + " " + inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
                workstr = worklist[i]
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
223 224 225
                s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
                WLPosition += 1
                SenderLastJobTimeStamp = time.time()
226 227
        if SenderFSM == 5: #State 5: Send empty taks to all and do no further work
            pass
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
228

229
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
230
    # Worker itself
231
    if WorkerFSM == 0: #State 0: Send a request to Master
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
232 233
        h2 = comm.isend(rank, dest=0, tag=2)  # send a data package asking for more work
        WorkerFSM = 1
234 235

    if WorkerFSM == 1: #State 1: wait for sending to comnplete, prepare for answer,
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
236 237 238 239
        if h2.Test():
            h2.wait()
            r4 = comm.irecv(source=0, tag=4)
            WorkerFSM = 2
240 241

    if WorkerFSM == 2: #State 2: waiting for answer, start processing
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
242 243 244 245 246 247 248
        if r4.Get_status():
            dic = r4.wait()
            workstr = dic["workstr"]
            if len(workstr) == 0:  # check if no more to do from master
                break
            outputdir = dic["outputdir"]
            jobid = dic["jobid"]
249 250
            logger.info("Worker recieved (internal job nr=" + str(jobid) + "): " + workstr)
            os.mkdir(outputdir + str(jobid))
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
251

252
            bashCommand = workstr + " > " + outputdir + str(jobid) + "/std_out.txt 2> " + outputdir + str(jobid) + "/err_out.txt"
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
253
            try:
254 255 256 257 258 259
                if rank == 0:
                    p = subprocess.Popen(bashCommand, shell=True) # start job in background
                    logger.info('Worker startet job.')
                else:
                    completed = subprocess.run(bashCommand, shell=True) # start job and wait for return
                    logger.info('Job ended, return code: '+str(completed.returncode) )
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
260 261 262
            except subprocess.CalledProcessError as err:
                logger.info('Worker ' + str(rank) + ' ERROR:', err)

263
            WorkerFSM = (3 if rank == 0 else 0)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
264

265
    if WorkerFSM == 3: #State 3: Running, used only for rank = 0 with subprocess.Popen
266
        if p.poll() is not None:
267
            logger.info("Job ended, return code: " + str(p.returncode))
268
            WorkerFSM = 0
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
269

270
    time.sleep(0.01)  # to take some load off the node
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
271

272 273

logger.info("Worker ended.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
274

275
comm.Barrier()
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
276
if rank == 0:
277
    logger.info("Controlling master and all workers ended.")