wkmgr.py 2.42 KB
Newer Older
Peter-Bernd Otte's avatar
Peter-Bernd Otte committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Workload Distributor
# Version 0.1
#
# Minimal python version 3.6
# on Mogon 2: run first 
# module load lang/Python/3.6.6-foss-2018b

from mpi4py import MPI
import platform
import multiprocessing #to get number of cores
import argparse
import subprocess
from time import sleep
import os
import datetime

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

parser = argparse.ArgumentParser(description='Workload distributor for trivial parallelism.')
parser.add_argument('execname', help='name of executable to call')
parser.add_argument("-v", "--verbose", help="increase output verbosity", default=False, action="store_true")
parser.add_argument("-V", "--version", help="Returns the actual program version", action="version", version='%(prog)s 0.1')
parser.add_argument("-d", "--delay", help="time delay in ms between starts of consecutive workers", default=50, type=int)

if rank == 0:
  args = parser.parse_args()
  print("Delay: "+str(args.delay)+" ms")


clients = {}
worklist = ['file1.txt', 'file2.txt', 'file3.txt']
now = datetime.datetime.now()
outputdir = os.getcwd() + "output_{:04d}{:02d}{:02d}{:02d}{:02d}{:02d}".format(now.year,now.month,now.day,now.hour,now.minute,now.second)
if os.path.isdir("./"+outputdir):
  print("Output directory already exists. Exiting.")
  exit(1)

if rank == 0:
  if args.verbose:
    print("verbosity turned on")
  print("Executing: ",args.execname)
  print("Output dir: ",outputdir)
  
  #prepare output directory
  os.mkdir(outputdir)

  print('Total number of workers ',size-1)
  for i in range(1,size):
    clients[i] = comm.recv(source=i)
  print(clients)

  for i in range(1,size):
    comm.send(args.execname, dest=i)
    sleep(args.delay/1000)

else:
  print ("rank "+str(rank))
  nodeinfo = {'machine':platform.machine(), 'hostname': platform.node(), 'Ncores': multiprocessing.cpu_count(), 'os': platform.system(), 'pythonVersion': platform.python_version() }
  comm.send(nodeinfo, dest=0)

  workstr = comm.recv(source=0)
  print("Rank "+str(rank)+" recieved: "+workstr)
  
  bashCommand = workstr + " > output"+str(rank)+".txt"
  try:
    completed = subprocess.run(bashCommand, shell=True)
    print('Rank '+str(rank)+' returncode: ',completed.returncode)
  except subprocess.CalledProcessError as err:
    print('Rank '+str(rank)+' ERROR:', err)

if rank == 0:
#  print(clients)
  print(worklist)

# run commands

#cd into subdirector
# catch exceptions