wkmgr.py 3.16 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
# Version 0.1
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
import multiprocessing #to get number of cores
import argparse
import subprocess
from time import sleep
import os
import datetime

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.1')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int)

if rank == 0:
  args = parser.parse_args()
  print("Delay: "+str(args.delay)+" ms")

clients = {}
35
worklist = ['file1.txt', 'file2.txt']
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
36
worklist = ['file1.txt', 'file2.txt', 'file3.txt']
37 38 39 40
worklist = ['file1.txt', 'file2.txt', 'file3.txt', 'file4.txt']

if rank ==0:
  print ("Number of files to process: "+str(len(worklist)))
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
41
now = datetime.datetime.now()
42
outputdir = os.getcwd() + "/output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
if os.path.isdir("./"+outputdir):
  print("Output directory already exists. Exiting.")
  exit(1)

if rank == 0:
  if args.verbose:
    print("verbosity turned on")
  print("Executing: ",args.execname)
  print("Output dir: ",outputdir)
  
  #prepare output directory
  os.mkdir(outputdir)

  print('Total number of workers ',size-1)
  for i in range(1,size):
    clients[i] = comm.recv(source=i)
59
  print("All clients active. Printing clients details:")
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
60 61
  print(clients)

62 63 64 65 66
  for i in range(len(worklist)):
    print("ToDo: "+worklist[i])
    print("Wait for process...")
    rec = comm.recv(tag=2) # get the rank of free worker
    comm.send({"workstr":args.execname,"jobid":i}, dest=rec)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
67
    sleep(args.delay/1000)
68 69
  for i in range(1,size): # End all workers
    comm.send({"workstr":"","jobid":i}, i)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
70 71 72 73 74 75

else:
  print ("rank "+str(rank))
  nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
  comm.send(nodeinfo, dest=0)

76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
  while True:
    comm.send(rank, dest=0, tag=2) # send a data package asking for more work
    dic = comm.recv(source=0)
    workstr = dic["workstr"]
    if len(workstr)==0: #check if no more to do from master 
      break
    jobid = dic["jobid"]
    print("Rank "+str(rank)+" recieved (internal job nr="+str(jobid)+"): "+workstr)
    os.mkdir(outputdir+"/"+str(jobid))

    bashCommand = workstr + " > "+outputdir+"/"+str(jobid)+"/std_out.txt 2> "+outputdir+"/"+str(jobid)+"/err_out.txt"
    try:
      completed = subprocess.run(bashCommand, shell=True)
      print('Rank '+str(rank)+' returncode: ',completed.returncode)
    except subprocess.CalledProcessError as err:
      print('Rank '+str(rank)+' ERROR:', err)
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
92 93 94

if rank == 0:
  print(worklist)