wkmgr.py 4.13 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
# Version 0.1
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
import multiprocessing #to get number of cores
import argparse
import subprocess
from time import sleep
import os
import datetime

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()
23 24 25
if size<2:
  print("Minimum number of MPI ranks = 2, please run this script with the appropriate launcher. eg mpirun -n 4 ./wkmgr.py [execname|]")
  exit(2)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
26 27 28 29 30 31

parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.1')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int)
32
parser.add_argument("inputdir", help="specifies the directory with files to process")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
33 34 35 36 37 38 39

if rank == 0:
  args = parser.parse_args()

clients = {}

if rank == 0:
40 41 42 43 44 45 46 47
  if args.verbose:  
    print("Verbosity turned on")
    print("Input dir: "+args.inputdir)
    print("Delay between files: "+str(args.delay)+" ms")
    print("Executing: ",args.execname)

  #prepare list of files to process
  worklist = [f for f in os.listdir(args.inputdir) if os.path.isfile(os.path.join(args.inputdir, f))]
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
48
  if args.verbose:
49 50 51
    print("Number of files to process: "+str(len(worklist)))
    print("List of files to process:")
    print(worklist)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
52 53
  
  #prepare output directory
54 55 56 57 58 59 60
  now = datetime.datetime.now()
  outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
  if args.verbose:
    print("Output dir: ",outputdir)
  if os.path.isdir("./"+outputdir):
    print("Output directory already exists. Exiting.")
    exit(1)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
61
  os.mkdir(outputdir)
62 63 64
  if args.verbose:
    print("output directory created.")
    print('Total number of workers ',size-1)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
65 66 67

  for i in range(1,size):
    clients[i] = comm.recv(source=i)
68 69 70 71 72
    #distribute settings
    comm.send({"verbose":args.verbose},dest=i,tag=3)
  if args.verbose:
    print("All clients active. Printing clients details:")
    print(clients)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
73

74
  for i in range(len(worklist)):
75 76 77
    if args.verbose:
      print("ToDo: "+worklist[i])
      print("Wait for process...")
78
    rec = comm.recv(tag=2) # get the rank of free worker
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
79
    comm.send({"workstr":args.execname + " " + args.inputdir+"/"+worklist[i] + " "+outputdir+"/"+str(i)+"/outfile.txt" ,"jobid":i,"outputdir":outputdir}, dest=rec)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
80
    sleep(args.delay/1000)
81 82 83 84 85
  for i in range(1,size): # End all workers by sending an empty task
    comm.send({"workstr":""}, i)
  
  if args.verbose:
    print("Controlling master ended.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
86 87 88 89

else:
  nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
  comm.send(nodeinfo, dest=0)
90 91 92
  settings=comm.recv(source=0,tag=3)
  if settings["verbose"]:
    print ("Worker "+str(rank)+" started.")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
93

94 95 96 97 98 99
  while True:
    comm.send(rank, dest=0, tag=2) # send a data package asking for more work
    dic = comm.recv(source=0)
    workstr = dic["workstr"]
    if len(workstr)==0: #check if no more to do from master 
      break
100
    outputdir = dic["outputdir"]
101
    jobid = dic["jobid"]
102 103
    if settings["verbose"]:
      print("Worker "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
104 105 106 107 108
    os.mkdir(outputdir+"/"+str(jobid))

    bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt"
    try:
      completed = subprocess.run(bashCommand, shell=True)
109 110
      if settings["verbose"]:
        print('Worker '+str(rank)+' returncode: ',completed.returncode)
111
    except subprocess.CalledProcessError as err:
112 113
      if settings["verbose"]:
        print('Worker '+str(rank)+' ERROR:', err)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
114

115 116
  if settings["verbose"]:
    print("Worker "+str(rank)+" ended.")