wkmgr.py 7.06 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
5
# Version 0.2
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
6 7 8 9 10 11 12
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
13
import multiprocessing  # to get number of cores
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
14 15
import argparse
import subprocess
16
import time
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
17 18
import os
import datetime
19
import logging
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
20 21 22 23

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
24

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
25 26
logging.basicConfig(level=logging.WARNING, format='%(asctime)-15s %(name)-6s %(levelname)-8s  %(message)s')
logger = logging.getLogger('rank' + str(rank))
27

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
28 29 30 31 32
if size < 2:
    logger.error(
        "Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 "
        "./wkmgr.py [execname]")
    exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
33 34 35 36

parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
37 38 39 40
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
                    version='%(prog)s 0.2')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50,
                    type=int)
41 42 43
parser.add_argument("inputdir", help="specifies the directory with files to process", default=".")

clients = {}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
44 45

if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
    args = parser.parse_args()
    if args.verbose:
        logger.setLevel(logging.DEBUG)

    logger.info("Verbosity turned on")
    logger.info("Input dir: " + args.inputdir)
    logger.info("Delay between files: " + str(args.delay) + " ms")
    logger.info("Executing: " + str(args.execname))

    # prepare list of files to process
    worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
    logger.info("Number of files to process: " + str(len(worklist)))
    logger.info("List of files to process:")
    logger.info(worklist)

    # prepare output directory
    now = datetime.datetime.now()
    outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year, now.month, now.day,
                                                                                    now.hour, now.minute, now.second)
    if args.verbose:
        logger.info("Output dir: " + outputdir)
    if os.path.isdir("./" + outputdir):
        logger.error("Output directory already exists. Exiting.")
        exit(1)
    os.mkdir(outputdir)
    if args.verbose:
        logger.info("output directory created.")
        logger.info('Total number of workers ' + str(size))

# --- Send Data to master rank
76
# replace later with: MPI_Gathering
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
77 78
nodeinfo = {'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
            'os': platform.system(), 'pythonVersion': platform.python_version()}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
79

80
h5 = comm.isend(nodeinfo, dest=0, tag=5)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
81 82 83 84
if rank == 0:
    h3 = {}
    for i in range(0, size):
        clients[i] = comm.recv(source=i, tag=5)
85 86
h5.wait()

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
87
# --- Distribute Settings
88
# Replace later with MPI_Broadcasting
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
89 90 91 92
if rank == 0:
    for i in range(0, size):
        # distribute settings
        h3[i] = comm.isend({"verbose": args.verbose}, dest=i, tag=3)
93

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
94
settings = comm.recv(source=0, tag=3)
95
if settings["verbose"]:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
96 97 98 99 100
    logger.setLevel(logging.DEBUG)
logger.info("Worker " + str(rank) + " initialised.")
if rank == 0:
    for i in range(0, size):
        h3[i].wait()
101

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
102 103
    logger.info("All clients active. Printing clients details:")
    logger.info(clients)
104

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
105
# --- Go into working loop
106 107 108

WLPosition = 0

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
109 110 111 112 113 114 115 116
# --- Variables for FSM of sender and worker
# 0=Preparing for receive, 1=waiting between jobs,
# 2=waiting for rank to ask for taks,  5=Send empty taks to all
SenderFSM = 0
# 0=Send a request to Master, 1=wait for sending to comnplete, prepare for answer,
# 2=waiting for answer, start processing, 3=running,  4=completed and reset
WorkerFSM = 0
SenderLastJobTimeStamp = 0  # to allow for delays between launched jobs
117 118

while True:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
    ####
    # Distribution stuff

    if (rank == 0):
        if SenderFSM == 0:
            if WLPosition < len(worklist):  # still some work to be distributed
                r2 = comm.irecv(tag=2)  # wait for a worker to connect and get the rank of the free worker
                logger.info("Waiting for some worker to connect...")
                SenderFSM = 1
            else:  # no more work in the list
                SenderFSM = 5
                for i in range(0, size):  # End all workers by sending an empty task
                    logger.info("Sending an empty task to rank " + str(i) + "...")
                    comm.send({"workstr": ""}, i, tag=4)

        if SenderFSM == 1:  # delay time between jobs
            if time.time() - SenderLastJobTimeStamp > args.delay / 1000:
                SenderFSM = 2

        if SenderFSM == 2:
            if r2.Get_status():
                rec = r2.wait()
                SenderFSM = 0

                i = WLPosition
                logger.info("ToDo: " + worklist[i])
                logger.info("Send a new job to rank " + str(rec) + "...")
                workstr = args.execname + " " + args.inputdir + "/" + worklist[i] + " " + outputdir + "/" + str(
                    i) + "/outfile.txt"
                #        workstr = args.execname
                s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
                WLPosition += 1
                SenderLastJobTimeStamp = time.time()

    ###
    # Worker itself
    if WorkerFSM == 0:
        h2 = comm.isend(rank, dest=0, tag=2)  # send a data package asking for more work
        WorkerFSM = 1
    if WorkerFSM == 1:
        if h2.Test():
            h2.wait()
            r4 = comm.irecv(source=0, tag=4)
            WorkerFSM = 2
    if WorkerFSM == 2:
        if r4.Get_status():
            dic = r4.wait()
            workstr = dic["workstr"]
            if len(workstr) == 0:  # check if no more to do from master
                break
            outputdir = dic["outputdir"]
            jobid = dic["jobid"]
            logger.info("Worker " + str(rank) + " recieved (internal job nr=" + str(jobid) + "): " + workstr)
            os.mkdir(outputdir + "/" + str(jobid))

            bashCommand = workstr + " > " + outputdir + "/" + str(jobid) + "/std_out.txt 2> " + outputdir + "/" + str(
                jobid) + "/err_out.txt"
            try:
                p = subprocess.Popen(bashCommand, shell=True)
                logger.info('Worker startet job. ')
            except subprocess.CalledProcessError as err:
                logger.info('Worker ' + str(rank) + ' ERROR:', err)

            WorkerFSM = 3
    if WorkerFSM == 3:  # Running
        if p.poll() is not None:
            logger.info("Process ended, ret code: " + str(p.returncode))
            WorkerFSM = 4

    if WorkerFSM == 4:
        logger.info("Worker " + str(rank) + ": Work completed and reset")
        WorkerFSM = 0

    time.sleep(0.001)  # to take some load off the node

logger.info("Worker " + str(rank) + " ended.")

if rank == 0:
    logger.info("Controlling master ended.")