Skip to content
Snippets Groups Projects
Commit de26359e authored by Lukas Kugler's avatar Lukas Kugler
Browse files

small fix

parent 25164bec
No related branches found
No related tags found
No related merge requests found
......@@ -23,13 +23,21 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs):
"""
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), **kwargs)
def slurm_submit(bashcmd, slurm_cfg_update=None, depends_on=None):
function_name = sys._getframe(1).f_code.co_name # magic
id = my_Slurm(function_name, cfg_update=slurm_cfg_update, **kwargs
).run(bashcmd, depends_on=[depends_on])
def slurm_submit(bashcmd, name=None, cfg_update=None, depends_on=None):
"""Submit a 'workflow task'=script=job to the SLURM queue.
Args:
bashcmd (str): command to run (i.e. call to script)
name (str): SLURM job name (useful for debugging)
cfg_update (dict): enforce these SLURM parameters
depends_on (int): SLURM id; job starts as soon as this id has finished
Returns:
int : SLURM job id, can be used in `depends_on` of another `slurm_submit` call
"""
if name is None: # slurm job name = name of calling function
name = sys._getframe(1).f_code.co_name
id = my_Slurm(name, cfg_update=cfg_update).run(bashcmd, depends_on=depends_on)
return id
def clear_logs(backup_existing_to_archive=True):
dirs = ['/logs/', '/slurm-scripts/']
for d in dirs:
......@@ -46,11 +54,11 @@ def clear_logs(backup_existing_to_archive=True):
def prepare_wrfinput():
"""Create WRF/run directories and wrfinput files
"""
s = my_Slurm("pre_osse", cfg_update={"time": "5", "mail-type": "BEGIN"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py')
# s = my_Slurm("pre_osse", cfg_update={"time": "5", "mail-type": "BEGIN"})
# id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py')
id = slurm_submit(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py',
name='prep_wrfinput', cfg_update={"time": "5", "mail-type": "BEGIN"})
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_nens), "time": "10",
"mem-per-cpu": "2G"})
cmd = """# run ideal.exe in parallel, then add geodata
export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
......@@ -69,7 +77,8 @@ do
cp $rundir/wrfinput_d01 """+cluster.archivedir()+"""/wrfinput/$n/wrfinput_d01
done
"""
id = s.run(cmd, depends_on=[id])
id = slurm_submit(cmd, name="ideal", cfg_update={"ntasks": str(exp.n_ens),
"time": "10", "mem-per-cpu": "2G"}, depends_on=[id])
return id
def update_wrfinput_from_archive(time, background_init_time, exppath, depends_on=None):
......@@ -112,7 +121,7 @@ def gen_synth_obs(time, depends_on=None):
# prepare state of nature run, from which observation is sampled
id = slurm_submit(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py '
+time.strftime('%Y-%m-%d_%H:%M ') + str(channel_id),
+time.strftime('%Y-%m-%d_%H:%M'), name='prep_nature',
cfg_update=dict(time="2"), depends_on=[depends_on])
for channel_id in exp.sat_channels:
......@@ -121,9 +130,9 @@ def gen_synth_obs(time, depends_on=None):
+time.strftime('%Y-%m-%d_%H:%M ') + str(channel_id),
depends_on=[id])
s = my_Slurm("gensynth", cfg_update=dict(time="20"))
cmd = 'cd '+cluster.dartrundir+'; mpirun -np 24 ./perfect_model_obs; ' \
+ 'obs_seq.out >> obs_seq_all.out' # combine all observations
s = my_Slurm("gensynth", cfg_update=dict(ntasks="48", time="20"))
cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./perfect_model_obs; ' \
+ 'cat obs_seq.out >> obs_seq_all.out' # combine all observations
id2 = s.run(cmd, depends_on=[id])
return id2
......@@ -166,9 +175,9 @@ def mailme(depends_on=None):
print('starting osse')
clear_logs(backup_existing_to_archive=True)
clear_logs(backup_existing_to_archive=False)
is_new_run = True
is_new_run = False
if is_new_run:
id = prepare_wrfinput() # create initial conditions
......@@ -180,18 +189,21 @@ if is_new_run:
depends_on=id)
time = integration_end_time
else:
#id = prepare_wrfinput() # create initial conditions
id = None
# get initial conditions from archive
background_init_time = dt.datetime(2008, 7, 30, 10, 45)
time = dt.datetime(2008, 7, 30, 11, 0)
exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.11_LMU_filter'
first_guess = exppath_arch
id = update_wrfinput_from_archive(time, background_init_time, exppath_arch)
#id = update_wrfinput_from_archive(time, background_init_time, exppath_arch,
# depends_on=id)
# now, start the ensemble data assimilation cycles
timedelta_integrate = dt.timedelta(minutes=15)
timedelta_btw_assim = dt.timedelta(minutes=15)
while time < dt.datetime(2008, 7, 30, 14, 15):
while time < dt.datetime(2008, 7, 30, 16, 15):
assim_time = time
id = gen_synth_obs(assim_time, depends_on=id)
......@@ -200,7 +212,7 @@ while time < dt.datetime(2008, 7, 30, 14, 15):
first_guess=first_guess,
depends_on=id)
# first_guess = None #
first_guess = None
background_init_time = assim_time # start integration from here
integration_end_time = assim_time + timedelta_integrate
......
......@@ -51,7 +51,7 @@ def run(time_dt, channel_id, n_obs, error_variance, output_path='./',
# Brightness temperature or Reflectance?
channel_id = int(channel_id)
if channel_id in [1, 2, 3, 11]:
if channel_id in [1, 2, 3, 12]:
line_obstypedef = ' 256 MSG_4_SEVIRI_BDRF'
else:
line_obstypedef = ' 255 MSG_4_SEVIRI_TB'
......@@ -97,13 +97,17 @@ def run(time_dt, channel_id, n_obs, error_variance, output_path='./',
lons = ds.XLONG_M.isel(Time=0).values
lats = ds.XLAT_M.isel(Time=0).values
n_obs_x = int(np.sqrt(n_obs))
dx = int(len(ds.south_north)/n_obs_x)
skip = int(dx/2)
for i in range(n_obs_x):
for j in range(n_obs_x):
coords.append((lats[i,j], lons[i,j]))
coords.append((lats[skip+i*dx,skip+j*dx],
lons[skip+i*dx,skip+j*dx]))
try:
import pickle
os.makedirs(os.path.dirname(fpath_obs_locations), exist_ok=True)
with open(fpath_obs_locations, 'wb') as f:
pickle.dump(coords, f); print(fpath_obs_locations, 'saved.')
except Exception as e:
......@@ -125,16 +129,14 @@ obs_kind_definitions
num_obs: """+n_obs_str+" max_num_obs: "+n_obs_str+"""
first: 1 last: """+n_obs_str
for i_obs in range(int(n_obs)):
# data
for i_obs in range(1, int(n_obs)+1):
lon = coords[i_obs][1]
lat = coords[i_obs][0]
lon = coords[i_obs-1][1]
lat = coords[i_obs-1][0]
lon_rad = str(degr_to_rad(lon))
lat_rad = str(degr_to_rad(lat))
# compile text
if i_obs < int(n_obs):
msg += """
......
......@@ -11,7 +11,12 @@ channel_id = int(sys.argv[2])
copy(cluster.scriptsdir+'/../templates/input.nml',
cluster.dartrundir+'/input.nml')
sed_inplace(cluster.dartrundir+'/input.nml', '<n_ens>', str(int(exp.n_ens)))
append_file(cluster.dartrundir+'/input.nml', cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml')
if channel_id in [1, 2, 3, 12]:
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml'
else:
rttov_nml = cluster.scriptsdir+'/../templates/obs_def_rttov.IR.nml'
append_file(cluster.dartrundir+'/input.nml', rttov_nml)
# prepare observation file
create_obs_sat.run(time, channel_id, exp.n_obs, exp.error_variance,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment