wkmgr.py 4.44 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
# Version 0.1
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
import multiprocessing #to get number of cores
import argparse
import subprocess
from time import sleep
import os
import datetime
19 20 21 22 23
import logging
from threading import Thread

logging.basicConfig(level=logging.WARNING,format='%(asctime)-15s %(name)-5s %(levelname)-8s  %(message)s')
logger = logging.getLogger('workload-manager')
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
24 25 26 27

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
28
if size<2:
29
  logger.error("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname]")
30
  exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
31 32 33 34 35 36

parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.1')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int)
37
parser.add_argument("inputdir", help="specifies the directory with files to process")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
38 39 40

if rank == 0:
  args = parser.parse_args()
41 42
  if args.verbose:
    logger.setLevel(logging.DEBUG)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
43 44 45 46

clients = {}

if rank == 0:
47 48 49 50
  logger.info("Verbosity turned on")
  logger.info("Input dir: "+args.inputdir)
  logger.info("Delay between files: "+str(args.delay)+" ms")
  logger.info("Executing: "+str(args.execname))
51 52 53

  #prepare list of files to process
  worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
54 55 56
  logger.info("Number of files to process: "+str(len(worklist)))
  logger.info("List of files to process:")
  logger.info(worklist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
57 58
  
  #prepare output directory
59 60 61
  now = datetime.datetime.now()
  outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
  if args.verbose:
62
    logger.info("Output dir: "+outputdir )
63
  if os.path.isdir("./"+outputdir):
64
    logger.error("Output directory already exists. Exiting.")
65
    exit(1)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
66
  os.mkdir(outputdir)
67
  if args.verbose:
68 69
    logger.info("output directory created.")
    logger.info('Total number of workers '+str(size-1) )
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
70 71 72

  for i in range(1,size):
    clients[i] = comm.recv(source=i)
73 74 75
    #distribute settings
    comm.send({"verbose":args.verbose},dest=i,tag=3)
  if args.verbose:
76 77
    logger.info("All clients active. Printing clients details:")
    logger.info(clients)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
78

79
  for i in range(len(worklist)):
80
    if args.verbose:
81 82
      logger.info("ToDo: "+worklist[i])
      logger.info("Wait for process...")
83
    rec = comm.recv(tag=2) # get the rank of free worker
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
84
    comm.send({"workstr":args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt" ,"jobid":i,"outputdir":outputdir}, dest=rec)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
85
    sleep(args.delay/1000)
86 87 88 89
  for i in range(1,size): # End all workers by sending an empty task
    comm.send({"workstr":""}, i)
  
  if args.verbose:
90
    logger.info("Controlling master ended.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
91 92 93 94

else:
  nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
  comm.send(nodeinfo, dest=0)
95 96 97
  settings=comm.recv(source=0,tag=3)
  if settings["verbose"]:
    print ("Worker "+str(rank)+" started.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
98

99 100 101 102 103 104
  while True:
    comm.send(rank, dest=0, tag=2) # send a data package asking for more work
    dic = comm.recv(source=0)
    workstr = dic["workstr"]
    if len(workstr)==0: #check if no more to do from master 
      break
105
    outputdir = dic["outputdir"]
106
    jobid = dic["jobid"]
107 108
    if settings["verbose"]:
      print("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
109 110 111 112 113
    os.mkdir(outputdir+"/"+str(jobid))

    bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt"
    try:
      completed = subprocess.run(bashCommand, shell=True)
114 115
      if settings["verbose"]:
        print('Worker '+str(rank)+' returncode: ',completed.returncode)
116
    except subprocess.CalledProcessError as err:
117 118
      if settings["verbose"]:
        print('Worker '+str(rank)+' ERROR:', err)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
119

120 121
  if settings["verbose"]:
    print("Worker "+str(rank)+" ended.")