wkmgr.py 8.51 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
5
# Version 0.4
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
6 7 8 9 10
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
11 12 13 14
try:
  from mpi4py import MPI
except ImportError:
  print("mpi4py module not loaded. On Mogon2 / HIMster2 type\n  module load lang/Python/3.6.6-foss-2018b\nfirst.\n\n")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
15
import platform
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
16
import multiprocessing  # to get number of cores
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
17 18
import argparse
import subprocess
19
import time
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
20 21
import os
import datetime
22
import logging
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
23 24 25 26

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
27

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
28 29
logging.basicConfig(level=logging.WARNING, format='%(asctime)-15s %(name)-6s %(levelname)-8s  %(message)s')
logger = logging.getLogger('rank' + str(rank))
30

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
31
now = datetime.datetime.now()
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
32
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
33
parser.add_argument("-v", "--verbosity", help="increase output verbosity", default=0, action="count")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
34
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
35
                    version='%(prog)s 0.4')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
36 37 38 39 40 41
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive jobs to help "
                    "distributing load", default=50, type=int)
parser.add_argument("-i", "--input-dir", help="specifies the directory with files to process", default=".")
parser.add_argument("-o", "--output-dir", help="output directory, default = output[datetime]", default=os.getcwd() + 
                    "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
                                                                           now.hour, now.minute, now.second) )
42 43 44 45 46
parser.add_argument("-s", "--single-argument", help="do not add the default placeholders to the execname", action="store_true")
parser.add_argument('execname', help='name of executable to call OR complete shell command line. If no'
                    'placeholders are provided, " {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt" '
                    'is added.')
parser.add_argument('remainderargs', nargs=argparse.REMAINDER) #the remainder shall be added to the args.execname
47 48

clients = {}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
49

50 51 52 53 54 55
def join_l(l, sep):
    out_str = ''
    for i, el in enumerate(l):
        out_str += '{}{}'.format(sep, el)
    return out_str #[:-len(sep)]

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
56
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
57 58 59 60 61
    if size < 2:
        logger.warning(
            "Number of workers (MPI ranks) = 1, to fully unleash computer resources, please run this script with the "
            "appropriate launcher and a higher number of MPI ranks. eg mpirun -n 4 ./wkmgr.py [execname]")

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
62
    args = parser.parse_args()
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
63 64 65
    if args.verbosity > 1:
        logger.setLevel(logging.INFO)
    if args.verbosity > 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
66 67
        logger.setLevel(logging.DEBUG)

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
68 69 70 71
    inputdir = args.input_dir+"/"
    if not os.path.isdir(inputdir):
        logger.error("Input directory does not exist. Exiting.")
        exit(1)
72 73 74 75 76 77 78 79 80 81

    # prepare execname
    execname = args.execname+join_l(args.remainderargs," ")
    if execname.find('{') == -1:
        if args.single_argument:
            logger.info("Single-Argument option: no placeholders are added to the execname.")
        else:
            logger.info("No placeholders are provided with the execname. Default will be added.")
            execname += " {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt"

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
82
    logger.info("Input dir: " + inputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
83
    logger.info("Delay between files: " + str(args.delay) + " ms")
84
    logger.info("Executing: \"" + execname +"\"")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
85 86

    # prepare list of files to process
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
87 88 89
    inputfilelist = [f for f in os.listdir(inputdir) if os.path.isfile(os.path.join(inputdir, f))]
    logger.info("List of files to process (total: "+str(len(inputfilelist))+"):")
    logger.info(inputfilelist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
90 91

    # prepare output directory
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
92 93
    outputdir = args.output_dir+"/"
    logger.info("Output dir: " + outputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
94 95
    if os.path.isdir("./" + outputdir):
        logger.error("Output directory already exists. Exiting.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
96
        exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
97
    os.mkdir(outputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
98 99 100 101 102 103 104 105
    logger.info("output directory created.")

    #prepare worklist
    worklist = []
    for i, v in enumerate(inputfilelist):
      data = {'execname':args.execname, 'inputdir':inputdir, 'jobid': i, 'inputfilename':v, 'outputdir':outputdir}
      # more hints: see https://pyformat.info
#      worklist.append( ('{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt').format(**data) )
106
      worklist.append( execname.format(**data) )
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
107 108
    logger.info("List if jobs:")
    logger.info(worklist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
109

110 111
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
112
            'os': platform.system(), 'pythonVersion': platform.python_version()}
113
clients = comm.gather(data, root=0)
114

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
115
# --- Distribute Settings
116
data = None
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
117
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
118
    data = {"verbosity": args.verbosity}
119
settings = comm.bcast(data, root=0)
120

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
121 122 123
if settings["verbosity"] >1:
    logger.setLevel(logging.INFO)
if settings["verbosity"] >0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
124
    logger.setLevel(logging.DEBUG)
125
logger.info("Worker fully initialised.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
126
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
127
    logger.info("All clients (total "+str(size)+") active. Printing clients details:")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
128
    logger.info(clients)
129

130 131 132 133
# --- 
# Start of main loop
WLPosition = 0 #Position in list of files
SenderLastJobTimeStamp = 0  # to allow for delays between launched jobs
134

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
135 136 137
# --- Variables for FSM of sender and worker
SenderFSM = 0
WorkerFSM = 0
138 139

while True:
140
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
141 142
    # Distribution stuff
    if (rank == 0):
143
        if SenderFSM == 0: #State 0: Preparing for receive
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
144 145 146 147 148 149 150 151 152 153
            if WLPosition < len(worklist):  # still some work to be distributed
                r2 = comm.irecv(tag=2)  # wait for a worker to connect and get the rank of the free worker
                logger.info("Waiting for some worker to connect...")
                SenderFSM = 1
            else:  # no more work in the list
                SenderFSM = 5
                for i in range(0, size):  # End all workers by sending an empty task
                    logger.info("Sending an empty task to rank " + str(i) + "...")
                    comm.send({"workstr": ""}, i, tag=4)

154
        if SenderFSM == 1: #State 1: delay time between jobs
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
155 156 157
            if time.time() - SenderLastJobTimeStamp > args.delay / 1000:
                SenderFSM = 2

158
        if SenderFSM == 2: #State 2: waiting for rank to ask for taks
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
159 160 161 162 163
            if r2.Get_status():
                rec = r2.wait()
                SenderFSM = 0

                i = WLPosition
164
                logger.info("Send a new job ("+worklist[i]+") to rank " + str(rec) + "...")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
165 166
                #workstr = args.execname + " " + inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
                workstr = worklist[i]
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
167 168 169
                s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
                WLPosition += 1
                SenderLastJobTimeStamp = time.time()
170 171
        if SenderFSM == 5: #State 5: Send empty taks to all and do no further work
            pass
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
172

173
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
174
    # Worker itself
175
    if WorkerFSM == 0: #State 0: Send a request to Master
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
176 177
        h2 = comm.isend(rank, dest=0, tag=2)  # send a data package asking for more work
        WorkerFSM = 1
178 179

    if WorkerFSM == 1: #State 1: wait for sending to comnplete, prepare for answer,
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
180 181 182 183
        if h2.Test():
            h2.wait()
            r4 = comm.irecv(source=0, tag=4)
            WorkerFSM = 2
184 185

    if WorkerFSM == 2: #State 2: waiting for answer, start processing
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
186 187 188 189 190 191 192
        if r4.Get_status():
            dic = r4.wait()
            workstr = dic["workstr"]
            if len(workstr) == 0:  # check if no more to do from master
                break
            outputdir = dic["outputdir"]
            jobid = dic["jobid"]
193 194
            logger.info("Worker recieved (internal job nr=" + str(jobid) + "): " + workstr)
            os.mkdir(outputdir + str(jobid))
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
195

196
            bashCommand = workstr + " > " + outputdir + str(jobid) + "/std_out.txt 2> " + outputdir + str(jobid) + "/err_out.txt"
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
197 198
            try:
                p = subprocess.Popen(bashCommand, shell=True)
199
                logger.info('Worker startet job.')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
200 201 202 203 204
            except subprocess.CalledProcessError as err:
                logger.info('Worker ' + str(rank) + ' ERROR:', err)

            WorkerFSM = 3

205 206 207 208
    if WorkerFSM == 3: #State 3: Running
        if p.poll() is not None:
            logger.info("Job ended, ret code: " + str(p.returncode))
            WorkerFSM = 0
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
209 210 211

    time.sleep(0.001)  # to take some load off the node

212 213

logger.info("Worker ended.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
214 215 216

if rank == 0:
    logger.info("Controlling master ended.")