wkmgr.py 7.53 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
5
# Version 0.3
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
6 7 8 9 10 11 12
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
13
import multiprocessing  # to get number of cores
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
14 15
import argparse
import subprocess
16
import time
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
17 18
import os
import datetime
19
import logging
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
20 21 22 23

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
24

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
25 26
logging.basicConfig(level=logging.WARNING, format='%(asctime)-15s %(name)-6s %(levelname)-8s  %(message)s')
logger = logging.getLogger('rank' + str(rank))
27

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
28
now = datetime.datetime.now()
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
29 30 31
parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
32
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
33 34 35 36 37 38 39 40 41 42
                    version='%(prog)s 0.3')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive jobs to help "
                    "distributing load", default=50, type=int)
parser.add_argument("-i", "--input-dir", help="specifies the directory with files to process", default=".")
parser.add_argument("-o", "--output-dir", help="output directory, default = output[datetime]", default=os.getcwd() + 
                    "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
                                                                           now.hour, now.minute, now.second) )
parser.add_argument("-a", "--argument", help="line called for each job. Default = "
                    "'{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt'", 
                   default='{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt')
43 44

clients = {}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
45 46

if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
47 48 49 50 51
    if size < 2:
        logger.warning(
            "Number of workers (MPI ranks) = 1, to fully unleash computer resources, please run this script with the "
            "appropriate launcher and a higher number of MPI ranks. eg mpirun -n 4 ./wkmgr.py [execname]")

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
52 53 54 55
    args = parser.parse_args()
    if args.verbose:
        logger.setLevel(logging.DEBUG)

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
56 57 58 59 60
    inputdir = args.input_dir+"/"
    if not os.path.isdir(inputdir):
        logger.error("Input directory does not exist. Exiting.")
        exit(1)
    logger.info("Input dir: " + inputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
61 62 63 64
    logger.info("Delay between files: " + str(args.delay) + " ms")
    logger.info("Executing: " + str(args.execname))

    # prepare list of files to process
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
65 66 67
    inputfilelist = [f for f in os.listdir(inputdir) if os.path.isfile(os.path.join(inputdir, f))]
    logger.info("List of files to process (total: "+str(len(inputfilelist))+"):")
    logger.info(inputfilelist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
68 69

    # prepare output directory
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
70 71
    outputdir = args.output_dir+"/"
    logger.info("Output dir: " + outputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
72 73
    if os.path.isdir("./" + outputdir):
        logger.error("Output directory already exists. Exiting.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
74
        exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
75
    os.mkdir(outputdir)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
76 77 78 79 80 81 82 83 84 85 86
    logger.info("output directory created.")

    #prepare worklist
    worklist = []
    for i, v in enumerate(inputfilelist):
      data = {'execname':args.execname, 'inputdir':inputdir, 'jobid': i, 'inputfilename':v, 'outputdir':outputdir}
      # more hints: see https://pyformat.info
#      worklist.append( ('{execname} {inputdir}{inputfilename} {outputdir}{jobid}/outfile.txt').format(**data) )
      worklist.append( args.argument.format(**data) )
    logger.info("List if jobs:")
    logger.info(worklist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
87

88 89
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
90
            'os': platform.system(), 'pythonVersion': platform.python_version()}
91
clients = comm.gather(data, root=0)
92

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
93
# --- Distribute Settings
94
data = None
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
95
if rank == 0:
96 97
    data = {"verbose": args.verbose}
settings = comm.bcast(data, root=0)
98 99

if settings["verbose"]:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
100
    logger.setLevel(logging.DEBUG)
101
logger.info("Worker fully initialised.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
102
if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
103
    logger.info("All clients (total "+str(size)+") active. Printing clients details:")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
104
    logger.info(clients)
105

106 107 108 109
# --- 
# Start of main loop
WLPosition = 0 #Position in list of files
SenderLastJobTimeStamp = 0  # to allow for delays between launched jobs
110

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
111 112 113
# --- Variables for FSM of sender and worker
SenderFSM = 0
WorkerFSM = 0
114 115

while True:
116
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
117 118
    # Distribution stuff
    if (rank == 0):
119
        if SenderFSM == 0: #State 0: Preparing for receive
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
120 121 122 123 124 125 126 127 128 129
            if WLPosition < len(worklist):  # still some work to be distributed
                r2 = comm.irecv(tag=2)  # wait for a worker to connect and get the rank of the free worker
                logger.info("Waiting for some worker to connect...")
                SenderFSM = 1
            else:  # no more work in the list
                SenderFSM = 5
                for i in range(0, size):  # End all workers by sending an empty task
                    logger.info("Sending an empty task to rank " + str(i) + "...")
                    comm.send({"workstr": ""}, i, tag=4)

130
        if SenderFSM == 1: #State 1: delay time between jobs
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
131 132 133
            if time.time() - SenderLastJobTimeStamp > args.delay / 1000:
                SenderFSM = 2

134
        if SenderFSM == 2: #State 2: waiting for rank to ask for taks
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
135 136 137 138 139
            if r2.Get_status():
                rec = r2.wait()
                SenderFSM = 0

                i = WLPosition
140
                logger.info("Send a new job ("+worklist[i]+") to rank " + str(rec) + "...")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
141 142
                #workstr = args.execname + " " + inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
                workstr = worklist[i]
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
143 144 145
                s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
                WLPosition += 1
                SenderLastJobTimeStamp = time.time()
146 147
        if SenderFSM == 5: #State 5: Send empty taks to all and do no further work
            pass
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
148

149
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
150
    # Worker itself
151
    if WorkerFSM == 0: #State 0: Send a request to Master
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
152 153
        h2 = comm.isend(rank, dest=0, tag=2)  # send a data package asking for more work
        WorkerFSM = 1
154 155

    if WorkerFSM == 1: #State 1: wait for sending to comnplete, prepare for answer,
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
156 157 158 159
        if h2.Test():
            h2.wait()
            r4 = comm.irecv(source=0, tag=4)
            WorkerFSM = 2
160 161

    if WorkerFSM == 2: #State 2: waiting for answer, start processing
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
162 163 164 165 166 167 168
        if r4.Get_status():
            dic = r4.wait()
            workstr = dic["workstr"]
            if len(workstr) == 0:  # check if no more to do from master
                break
            outputdir = dic["outputdir"]
            jobid = dic["jobid"]
169 170
            logger.info("Worker recieved (internal job nr=" + str(jobid) + "): " + workstr)
            os.mkdir(outputdir + str(jobid))
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
171

172
            bashCommand = workstr + " > " + outputdir + str(jobid) + "/std_out.txt 2> " + outputdir + str(jobid) + "/err_out.txt"
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
173 174
            try:
                p = subprocess.Popen(bashCommand, shell=True)
175
                logger.info('Worker startet job.')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
176 177 178 179 180
            except subprocess.CalledProcessError as err:
                logger.info('Worker ' + str(rank) + ' ERROR:', err)

            WorkerFSM = 3

181 182 183 184
    if WorkerFSM == 3: #State 3: Running
        if p.poll() is not None:
            logger.info("Job ended, ret code: " + str(p.returncode))
            WorkerFSM = 0
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
185 186 187

    time.sleep(0.001)  # to take some load off the node

188 189

logger.info("Worker ended.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
190 191 192

if rank == 0:
    logger.info("Controlling master ended.")