wkmgr.py 6.5 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
5
# Version 0.2
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
6 7 8 9 10 11 12
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
13
import multiprocessing  # to get number of cores
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
14 15
import argparse
import subprocess
16
import time
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
17 18
import os
import datetime
19
import logging
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
20 21 22 23

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
24

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
25 26
logging.basicConfig(level=logging.WARNING, format='%(asctime)-15s %(name)-6s %(levelname)-8s  %(message)s')
logger = logging.getLogger('rank' + str(rank))
27

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
28 29 30 31 32
if size < 2:
    logger.error(
        "Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 "
        "./wkmgr.py [execname]")
    exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
33 34 35 36

parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
37 38 39 40
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version",
                    version='%(prog)s 0.2')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50,
                    type=int)
41 42 43
parser.add_argument("inputdir", help="specifies the directory with files to process", default=".")

clients = {}
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
44 45

if rank == 0:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
    args = parser.parse_args()
    if args.verbose:
        logger.setLevel(logging.DEBUG)

    logger.info("Input dir: " + args.inputdir)
    logger.info("Delay between files: " + str(args.delay) + " ms")
    logger.info("Executing: " + str(args.execname))

    # prepare list of files to process
    worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
    logger.info("Number of files to process: " + str(len(worklist)))
    logger.info("List of files to process:")
    logger.info(worklist)

    # prepare output directory
    now = datetime.datetime.now()
62
    outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}/".format(now.year, now.month, now.day,
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
63 64 65 66 67 68 69 70 71 72 73
                                                                                    now.hour, now.minute, now.second)
    if args.verbose:
        logger.info("Output dir: " + outputdir)
    if os.path.isdir("./" + outputdir):
        logger.error("Output directory already exists. Exiting.")
        exit(1)
    os.mkdir(outputdir)
    if args.verbose:
        logger.info("output directory created.")
        logger.info('Total number of workers ' + str(size))

74 75
# --- Send machine data to master rank
data = {'rank': rank, 'machine': platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(),
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
76
            'os': platform.system(), 'pythonVersion': platform.python_version()}
77
clients = comm.gather(data, root=0)
78

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
79
# --- Distribute Settings
80
data = None
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
81
if rank == 0:
82 83
    data = {"verbose": args.verbose}
settings = comm.bcast(data, root=0)
84 85

if settings["verbose"]:
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
86
    logger.setLevel(logging.DEBUG)
87
logger.info("Worker fully initialised.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
88 89 90
if rank == 0:
    logger.info("All clients active. Printing clients details:")
    logger.info(clients)
91

92 93 94 95
# --- 
# Start of main loop
WLPosition = 0 #Position in list of files
SenderLastJobTimeStamp = 0  # to allow for delays between launched jobs
96

Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
97 98 99
# --- Variables for FSM of sender and worker
SenderFSM = 0
WorkerFSM = 0
100 101

while True:
102
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
103 104
    # Distribution stuff
    if (rank == 0):
105
        if SenderFSM == 0: #State 0: Preparing for receive
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
106 107 108 109 110 111 112 113 114 115
            if WLPosition < len(worklist):  # still some work to be distributed
                r2 = comm.irecv(tag=2)  # wait for a worker to connect and get the rank of the free worker
                logger.info("Waiting for some worker to connect...")
                SenderFSM = 1
            else:  # no more work in the list
                SenderFSM = 5
                for i in range(0, size):  # End all workers by sending an empty task
                    logger.info("Sending an empty task to rank " + str(i) + "...")
                    comm.send({"workstr": ""}, i, tag=4)

116
        if SenderFSM == 1: #State 1: delay time between jobs
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
117 118 119
            if time.time() - SenderLastJobTimeStamp > args.delay / 1000:
                SenderFSM = 2

120
        if SenderFSM == 2: #State 2: waiting for rank to ask for taks
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
121 122 123 124 125
            if r2.Get_status():
                rec = r2.wait()
                SenderFSM = 0

                i = WLPosition
126 127
                logger.info("Send a new job ("+worklist[i]+") to rank " + str(rec) + "...")
                workstr = args.execname + " " + args.inputdir + "/" + worklist[i] + " " + outputdir + str(i) + "/outfile.txt"
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
128 129 130
                s4 = comm.isend({"workstr": workstr, "jobid": i, "outputdir": outputdir}, dest=rec, tag=4)
                WLPosition += 1
                SenderLastJobTimeStamp = time.time()
131 132
        if SenderFSM == 5: #State 5: Send empty taks to all and do no further work
            pass
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
133

134
    # ---
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
135
    # Worker itself
136
    if WorkerFSM == 0: #State 0: Send a request to Master
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
137 138
        h2 = comm.isend(rank, dest=0, tag=2)  # send a data package asking for more work
        WorkerFSM = 1
139 140

    if WorkerFSM == 1: #State 1: wait for sending to comnplete, prepare for answer,
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
141 142 143 144
        if h2.Test():
            h2.wait()
            r4 = comm.irecv(source=0, tag=4)
            WorkerFSM = 2
145 146

    if WorkerFSM == 2: #State 2: waiting for answer, start processing
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
147 148 149 150 151 152 153
        if r4.Get_status():
            dic = r4.wait()
            workstr = dic["workstr"]
            if len(workstr) == 0:  # check if no more to do from master
                break
            outputdir = dic["outputdir"]
            jobid = dic["jobid"]
154 155
            logger.info("Worker recieved (internal job nr=" + str(jobid) + "): " + workstr)
            os.mkdir(outputdir + str(jobid))
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
156

157
            bashCommand = workstr + " > " + outputdir + str(jobid) + "/std_out.txt 2> " + outputdir + str(jobid) + "/err_out.txt"
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
158 159
            try:
                p = subprocess.Popen(bashCommand, shell=True)
160
                logger.info('Worker startet job.')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
161 162 163 164 165
            except subprocess.CalledProcessError as err:
                logger.info('Worker ' + str(rank) + ' ERROR:', err)

            WorkerFSM = 3

166 167 168 169
    if WorkerFSM == 3: #State 3: Running
        if p.poll() is not None:
            logger.info("Job ended, ret code: " + str(p.returncode))
            WorkerFSM = 0
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
170 171 172

    time.sleep(0.001)  # to take some load off the node

173 174

logger.info("Worker ended.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
175 176 177

if rank == 0:
    logger.info("Controlling master ended.")