diff --git a/Source/Python/Classes/EcFlexpart.py b/Source/Python/Classes/EcFlexpart.py index 65ba97f7e4212e4187accda307d79436bee23231..00f61a340f48d9c3d3d38110711569b70c28a12a 100644 --- a/Source/Python/Classes/EcFlexpart.py +++ b/Source/Python/Classes/EcFlexpart.py @@ -1902,10 +1902,22 @@ class EcFlexpart(object): 'FILES FAILED!') if c.ectrans and _config.FLAG_ON_ECMWFSERVER: - execute_subprocess(['ectrans', '-overwrite', '-gateway', - c.gateway, '-remote', c.destination, - '-source', ofile], - error_msg='TRANSFER TO LOCAL SERVER FAILED!') + proc = execute_subprocess(['ectrans', '-overwrite', '-gateway', + c.gateway, '-remote', c.destination, + '-source', ofile], + error_msg='TRANSFER TO LOCAL SERVER FAILED!') + # apl: start + # if the ectrans routine returns 1 => file should have + # been copied in tmp folder, where it will be deleted + # after a successful transfer + try: + import time + time.sleep(10) + if(proc==1): + execute_subprocess(['rm', ofile]) + except TypeError: + break + # apl: end if c.ecstorage and _config.FLAG_ON_ECMWFSERVER: execute_subprocess(['ecp', '-o', ofile, diff --git a/Source/Python/Mods/tools.py b/Source/Python/Mods/tools.py index 0286555e38617a01a78fdff51f94f6575d02ab7f..fb6585e18be5b4a999f5f3a6f13a1371a70dd9d3 100644 --- a/Source/Python/Mods/tools.py +++ b/Source/Python/Mods/tools.py @@ -722,6 +722,40 @@ def submit_job_to_ecserver(target, jobname): The id number of the job as a reference at the ECMWF server. ''' + # apl: start + noEXEC = 99 + noWAIT = 99 + waittime = 10 + + while noEXEC>10: + # run "ecaccess-job-list | grep EXEC" and split the resulting + # string at the whitespaces and then count the number of + # occurences of EXEC => # of active jobs + ps = subprocess.Popen(('ecaccess-job-list'), stdout=subprocess.PIPE) + try: + output = str(subprocess.check_output(('grep', 'EXEC'), stdin=ps.stdout)).split() + except subprocess.CalledProcessError: + break + noEXEC = output.count('EXEC') + print('number of active jobs: ', str(noEXEC)) + import time + print('waiting until less processes are active (checking every '+str(waittime)+'s)') + time.sleep(waittime) + + # do the same for waiting jobs + while noWAIT>5: + ps = subprocess.Popen(('ecaccess-job-list'), stdout=subprocess.PIPE) + try: + output = str(subprocess.check_output(('grep', 'WAIT'), stdin=ps.stdout)).split() + except subprocess.CalledProcessError: + break + noWAIT = output.count('WAIT') + print('number of waiting jobs: ', str(noWAIT)) + import time + print('waiting until less processes are active (checking every '+str(waittime)+'s)') + time.sleep(waittime) + # apl: end + try: job_id = subprocess.check_output(['ecaccess-job-submit', '-queueName', target, jobname]) @@ -868,6 +902,7 @@ def execute_subprocess(cmd_list, error_msg='SUBPROCESS FAILED!'): try: subprocess.check_call(cmd_list) + e = 1 except subprocess.CalledProcessError as e: print('... ERROR CODE: ' + str(e.returncode)) print('... ERROR MESSAGE:\n \t ' + str(e)) @@ -879,7 +914,7 @@ def execute_subprocess(cmd_list, error_msg='SUBPROCESS FAILED!'): sys.exit('... ' + error_msg) - return + return e def generate_retrieval_period_boundary(c):