wkmgr.py 7.8 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
5
# Version 0.3
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
6 7 8 9 10
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
11 12 13 14
try:
  from mpi4py import MPI
except ImportError:
  print("mpi4py module not loaded. On Mogon2 / HIMster2 type\n  module load lang/Python/3.6.6-foss-2018b\nfirst.\n\n")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
15
import platform
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
16
import multiprocessing  # to get number of cores
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
17 18
import argparse
import subprocess
19
import time
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
20 21
import os
import datetime
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
22
import logging
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
23 24 25 26

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
27

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
28 29
logging.basicConfig(level=logging.WARNING, format='%(asctime)-15s %(name)-6s %(levelname)-8s  %(message)s')
logger = logging.getLogger('rank' + str(rank))
30

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
31
now = datetime.datetime.now()
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
32 33
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
34
parser.add_argument("-v", "--verbosity", help="increase output verbosity", default=0, action="count")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
35
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
36 37 38 39 40 41 42 43 44 45
                    version='%(prog)s 0.3')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive jobs to help "
                    "distributing load", default=50, type=int)
parser.add_argument("-i", "--input-dir", help="specifies the directory with files to process", default=".")
parser.add_argument("-o", "--output-dir", help="output directory, default = output[datetime]", default=os.getcwd() + 
                    "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
                                                                           now.hour, now.minute, now.second) )
parser.add_argument("-a", "--argument", help="line called for each job. Default = "
                    "'{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt'", 
                   default='{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt')
46 47

clients = {}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
48 49

if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
50 51 52 53 54
    if size < 2:
        logger.warning(
            "Number of workers (MPI ranks) = 1, to fully unleash computer resources, please run this script with the "
            "appropriate launcher and a higher number of MPI ranks. eg mpirun -n 4 ./wkmgr.py [execname]")

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
55
    args = parser.parse_args()
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
56 57 58
    if args.verbosity > 1:
        logger.setLevel(logging.INFO)
    if args.verbosity > 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
59 60
        logger.setLevel(logging.DEBUG)

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
61 62 63 64 65
    inputdir = args.input_dir+"/"
    if not os.path.isdir(inputdir):
        logger.error("Input directory does not exist. Exiting.")
        exit(1)
    logger.info("Input dir: " + inputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
66 67 68 69
    logger.info("Delay between files: " + str(args.delay) + " ms")
    logger.info("Executing: " + str(args.execname))

    # prepare list of files to process
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
70 71 72
    inputfilelist = [f for f in os.listdir(inputdir) if os.path.isfile(os.path.join(inputdir, f))]
    logger.info("List of files to process (total: "+str(len(inputfilelist))+"):")
    logger.info(inputfilelist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
73 74

    # prepare output directory
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
75 76
    outputdir = args.output_dir+"/"
    logger.info("Output dir: " + outputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
77 78
    if os.path.isdir("./" + outputdir):
        logger.error("Output directory already exists. Exiting.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
79
        exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
80
    os.mkdir(outputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
81 82 83 84 85 86 87 88 89 90 91
    logger.info("output directory created.")

    #prepare worklist
    worklist = []
    for i, v in enumerate(inputfilelist):
      data = {'execname':args.execname, 'inputdir':inputdir, 'jobid': i, 'inputfilename':v, 'outputdir':outputdir}
      # more hints: see https://pyformat.info
#      worklist.append( ('{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt').format(**data) )
      worklist.append( args.argument.format(**data) )
    logger.info("List if jobs:")
    logger.info(worklist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
92

93 94
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
95
            'os': platform.system(), 'pythonVersion': platform.python_version()}
96
clients = comm.gather(data, root=0)
97

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
98
# --- Distribute Settings
99
data = None
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
100
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
101
    data = {"verbosity": args.verbosity}
102
settings = comm.bcast(data, root=0)
103

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
104 105 106
if settings["verbosity"] >1:
    logger.setLevel(logging.INFO)
if settings["verbosity"] >0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
107
    logger.setLevel(logging.DEBUG)
108
logger.info("Worker fully initialised.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
109
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
110
    logger.info("All clients (total "+str(size)+") active. Printing clients details:")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
111
    logger.info(clients)
112

113 114 115 116
# --- 
# Start of main loop
WLPosition = 0 #Position in list of files
SenderLastJobTimeStamp = 0  # to allow for delays between launched jobs
117

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
118 119 120
# --- Variables for FSM of sender and worker
SenderFSM = 0
WorkerFSM = 0
121 122

while True:
123
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
124 125
    # Distribution stuff
    if (rank == 0):
126
        if SenderFSM == 0: #State 0: Preparing for receive
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
127 128 129 130 131 132 133 134 135 136
            if WLPosition < len(worklist):  # still some work to be distributed
                r2 = comm.irecv(tag=2)  # wait for a worker to connect and get the rank of the free worker
                logger.info("Waiting for some worker to connect...")
                SenderFSM = 1
            else:  # no more work in the list
                SenderFSM = 5
                for i in range(0, size):  # End all workers by sending an empty task
                    logger.info("Sending an empty task to rank " + str(i) + "...")
                    comm.send({"workstr": ""}, i, tag=4)

137
        if SenderFSM == 1: #State 1: delay time between jobs
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
138 139 140
            if time.time() - SenderLastJobTimeStamp > args.delay / 1000:
                SenderFSM = 2

141
        if SenderFSM == 2: #State 2: waiting for rank to ask for taks
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
142 143 144 145 146
            if r2.Get_status():
                rec = r2.wait()
                SenderFSM = 0

                i = WLPosition
147
                logger.info("Send a new job ("+worklist[i]+") to rank " + str(rec) + "...")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
148 149
                #workstr = args.execname + " " + inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
                workstr = worklist[i]
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
150 151 152
                s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
                WLPosition += 1
                SenderLastJobTimeStamp = time.time()
153 154
        if SenderFSM == 5: #State 5: Send empty taks to all and do no further work
            pass
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
155

156
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
157
    # Worker itself
158
    if WorkerFSM == 0: #State 0: Send a request to Master
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
159 160
        h2 = comm.isend(rank, dest=0, tag=2)  # send a data package asking for more work
        WorkerFSM = 1
161 162

    if WorkerFSM == 1: #State 1: wait for sending to comnplete, prepare for answer,
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
163 164 165 166
        if h2.Test():
            h2.wait()
            r4 = comm.irecv(source=0, tag=4)
            WorkerFSM = 2
167 168

    if WorkerFSM == 2: #State 2: waiting for answer, start processing
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
169 170 171 172 173 174 175
        if r4.Get_status():
            dic = r4.wait()
            workstr = dic["workstr"]
            if len(workstr) == 0:  # check if no more to do from master
                break
            outputdir = dic["outputdir"]
            jobid = dic["jobid"]
176 177
            logger.info("Worker recieved (internal job nr=" + str(jobid) + "): " + workstr)
            os.mkdir(outputdir + str(jobid))
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
178

179
            bashCommand = workstr + " > " + outputdir + str(jobid) + "/std_out.txt 2> " + outputdir + str(jobid) + "/err_out.txt"
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
180 181
            try:
                p = subprocess.Popen(bashCommand, shell=True)
182
                logger.info('Worker startet job.')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
183 184 185 186 187
            except subprocess.CalledProcessError as err:
                logger.info('Worker ' + str(rank) + ' ERROR:', err)

            WorkerFSM = 3

188 189 190 191
    if WorkerFSM == 3: #State 3: Running
        if p.poll() is not None:
            logger.info("Job ended, ret code: " + str(p.returncode))
            WorkerFSM = 0
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
192 193 194

    time.sleep(0.001)  # to take some load off the node

195 196

logger.info("Worker ended.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
197 198 199

if rank == 0:
    logger.info("Controlling master ended.")