Skip to content
Snippets Groups Projects
Commit 25164bec authored by Lukas Kugler's avatar Lukas Kugler
Browse files

create_obs for all channels

parent 99147693
No related branches found
No related tags found
No related merge requests found
...@@ -7,12 +7,14 @@ class ExperimentConfiguration(object): ...@@ -7,12 +7,14 @@ class ExperimentConfiguration(object):
pass pass
exp = ExperimentConfiguration() exp = ExperimentConfiguration()
exp.expname = "exp_v1.11_LMU_filter2" exp.expname = "exp_v1.11_LMU_filter_domainobs"
exp.model_dx = 2000 exp.model_dx = 2000
exp.timestep = 10 exp.timestep = 10
exp.n_ens = 40 exp.n_ens = 40
exp.n_nodes = 10 exp.n_nodes = 10
exp.n_obs = 100 exp.n_obs = 100
exp.sat_channels = [1,]
exp.error_variance = 0.0009 exp.error_variance = 0.0009
......
...@@ -34,6 +34,7 @@ vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input' ...@@ -34,6 +34,7 @@ vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input'
vsc.run_WRF = '/gpfs/data/fs71386/lkugler/DART-WRF/scripts/osse/run_ens.vsc.sh' vsc.run_WRF = '/gpfs/data/fs71386/lkugler/DART-WRF/scripts/osse/run_ens.vsc.sh'
vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384", vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384",
"nodes": "1", "ntasks": "1",
"ntasks-per-node": "48", "ntasks-per-core": 1, "ntasks-per-node": "48", "ntasks-per-core": 1,
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"} "mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
......
...@@ -23,6 +23,12 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs): ...@@ -23,6 +23,12 @@ def my_Slurm(*args, cfg_update=dict(), **kwargs):
""" """
return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), **kwargs) return Slurm(*args, slurm_kwargs=dict(cluster.slurm_cfg, **cfg_update), **kwargs)
def slurm_submit(bashcmd, slurm_cfg_update=None, depends_on=None):
function_name = sys._getframe(1).f_code.co_name # magic
id = my_Slurm(function_name, cfg_update=slurm_cfg_update, **kwargs
).run(bashcmd, depends_on=[depends_on])
return id
def clear_logs(backup_existing_to_archive=True): def clear_logs(backup_existing_to_archive=True):
dirs = ['/logs/', '/slurm-scripts/'] dirs = ['/logs/', '/slurm-scripts/']
...@@ -40,11 +46,11 @@ def clear_logs(backup_existing_to_archive=True): ...@@ -40,11 +46,11 @@ def clear_logs(backup_existing_to_archive=True):
def prepare_wrfinput(): def prepare_wrfinput():
"""Create WRF/run directories and wrfinput files """Create WRF/run directories and wrfinput files
""" """
s = my_Slurm("pre_osse", cfg_update={"nodes": "1", "ntasks": "1", "time": "5", s = my_Slurm("pre_osse", cfg_update={"time": "5", "mail-type": "BEGIN"})
"mail-type": "BEGIN"})
id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py') id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_wrfinput.py')
s = my_Slurm("ideal", cfg_update={"nodes": "1", "time": "10", "mem-per-cpu": "2G"}) s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_nens), "time": "10",
"mem-per-cpu": "2G"})
cmd = """# run ideal.exe in parallel, then add geodata cmd = """# run ideal.exe in parallel, then add geodata
export SLURM_STEP_GRES=none export SLURM_STEP_GRES=none
for ((n=1; n<="""+str(exp.n_ens)+"""; n++)) for ((n=1; n<="""+str(exp.n_ens)+"""; n++))
...@@ -70,7 +76,7 @@ def update_wrfinput_from_archive(time, background_init_time, exppath, depends_on ...@@ -70,7 +76,7 @@ def update_wrfinput_from_archive(time, background_init_time, exppath, depends_on
"""Given that directories with wrfinput files exist, """Given that directories with wrfinput files exist,
update these wrfinput files according to wrfout files update these wrfinput files according to wrfout files
""" """
s = my_Slurm("upd_wrfinput", cfg_update={"nodes": "1", "ntasks": "1", "time": "5"}) s = my_Slurm("upd_wrfinput", cfg_update={"time": "5"})
# path of initial conditions, <iens> is replaced by member index # path of initial conditions, <iens> is replaced by member index
IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \ IC_path = exppath + background_init_time.strftime('/%Y-%m-%d_%H:%M/') \
...@@ -82,7 +88,7 @@ def update_wrfinput_from_archive(time, background_init_time, exppath, depends_on ...@@ -82,7 +88,7 @@ def update_wrfinput_from_archive(time, background_init_time, exppath, depends_on
def run_ENS(begin, end, depends_on=None, **kwargs): def run_ENS(begin, end, depends_on=None, **kwargs):
prev_id = depends_on prev_id = depends_on
s = my_Slurm("preWRF", cfg_update=dict(nodes="1", ntasks="1", time="2")) s = my_Slurm("preWRF", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_namelist.py ' id = s.run(cluster.python+' '+cluster.scriptsdir+'/prepare_namelist.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' ' + begin.strftime('%Y-%m-%d_%H:%M')+' '
+ end.strftime('%Y-%m-%d_%H:%M'), + end.strftime('%Y-%m-%d_%H:%M'),
...@@ -103,12 +109,21 @@ def run_ENS(begin, end, depends_on=None, **kwargs): ...@@ -103,12 +109,21 @@ def run_ENS(begin, end, depends_on=None, **kwargs):
def gen_synth_obs(time, depends_on=None): def gen_synth_obs(time, depends_on=None):
s = my_Slurm("pre_gensynthobs", cfg_update=dict(nodes="1", ntasks="1", time="2"))
# prepare state of nature run, from which observation is sampled
id = slurm_submit(cluster.python+' '+cluster.scriptsdir+'/prepare_nature.py '
+time.strftime('%Y-%m-%d_%H:%M ') + str(channel_id),
cfg_update=dict(time="2"), depends_on=[depends_on])
for channel_id in exp.sat_channels:
s = my_Slurm("pre_gensynthobs", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_gen_synth_obs.py ' id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_gen_synth_obs.py '
+time.strftime('%Y-%m-%d_%H:%M'), depends_on=[depends_on]) +time.strftime('%Y-%m-%d_%H:%M ') + str(channel_id),
depends_on=[id])
s = my_Slurm("gensynth", cfg_update=dict(nodes="1", time="20")) s = my_Slurm("gensynth", cfg_update=dict(time="20"))
cmd = 'cd '+cluster.dartrundir+'; mpirun -np 24 ./perfect_model_obs' cmd = 'cd '+cluster.dartrundir+'; mpirun -np 24 ./perfect_model_obs; ' \
+ 'obs_seq.out >> obs_seq_all.out' # combine all observations
id2 = s.run(cmd, depends_on=[id]) id2 = s.run(cmd, depends_on=[id])
return id2 return id2
...@@ -120,22 +135,22 @@ def assimilate(assim_time, background_init_time, ...@@ -120,22 +135,22 @@ def assimilate(assim_time, background_init_time,
if first_guess is None: if first_guess is None:
first_guess = cluster.archivedir() first_guess = cluster.archivedir()
s = my_Slurm("preAssim", cfg_update=dict(nodes="1", ntasks="1", time="2")) s = my_Slurm("preAssim", cfg_update=dict(time="2"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py ' \ id = s.run(cluster.python+' '+cluster.scriptsdir+'/pre_assim.py ' \
+assim_time.strftime('%Y-%m-%d_%H:%M ') +assim_time.strftime('%Y-%m-%d_%H:%M ')
+background_init_time.strftime('%Y-%m-%d_%H:%M ') +background_init_time.strftime('%Y-%m-%d_%H:%M ')
+first_guess, +first_guess,
depends_on=[prev_id]) depends_on=[prev_id])
s = my_Slurm("Assim", cfg_update=dict(nodes="1", time="50", mem="200G")) s = my_Slurm("Assim", cfg_update=dict(time="50", mem="200G"))
cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter' cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out'
id2 = s.run(cmd, depends_on=[id]) id2 = s.run(cmd, depends_on=[id])
s = my_Slurm("archiveAssim", cfg_update=dict(nodes="1", ntasks="1", time="10")) s = my_Slurm("archiveAssim", cfg_update=dict(time="10"))
id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py ' id3 = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py '
+ assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2]) + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id2])
s = my_Slurm("updateIC", cfg_update=dict(nodes="1", ntasks="1", time="3")) s = my_Slurm("updateIC", cfg_update=dict(time="3"))
id4 = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py ' id4 = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py '
+assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id3]) +assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id3])
return id4 return id4
...@@ -143,8 +158,7 @@ def assimilate(assim_time, background_init_time, ...@@ -143,8 +158,7 @@ def assimilate(assim_time, background_init_time,
def mailme(depends_on=None): def mailme(depends_on=None):
id = depends_on id = depends_on
if id: if id:
s = my_Slurm("AllFinished", cfg_update={"nodes": "1", "ntasks": "1", "time": "1", s = my_Slurm("AllFinished", cfg_update={"time": "1", "mail-type": "BEGIN"})
"mail-type": "BEGIN"})
s.run('sleep 1', depends_on=[id]) s.run('sleep 1', depends_on=[id])
......
"""Create obs_seq.in """Create obs_seq.in
""" """
import os, sys import os, sys, warnings
import numpy as np import numpy as np
import datetime as dt import datetime as dt
from pysolar.solar import get_altitude, get_azimuth from pysolar.solar import get_altitude, get_azimuth
def degr_to_rad(degr): def degr_to_rad(degr):
"""Convert to DART convention = radians"""
if degr < 0: if degr < 0:
degr += 360 degr += 360
return degr/360*2*np.pi return degr/360*2*np.pi
...@@ -17,72 +18,114 @@ def round_to_day(dtobj): ...@@ -17,72 +18,114 @@ def round_to_day(dtobj):
def add_timezone_UTC(t): def add_timezone_UTC(t):
return dt.datetime(t.year, t.month, t.day, t.hour, t.minute, tzinfo=dt.timezone.utc) return dt.datetime(t.year, t.month, t.day, t.hour, t.minute, tzinfo=dt.timezone.utc)
def get_dart_date(time_dt):
days_since_010120 = 145731
ref_day = dt.datetime(2000,1,1, tzinfo=dt.timezone.utc)
dart_date_day = str((time_dt - ref_day).days + days_since_010120)
secs_thatday = str(int((time_dt - round_to_day(time_dt)).total_seconds()))
return dart_date_day, secs_thatday
def run(time_dt, n_obs, error_variance, output_path='./'): def run(time_dt, channel_id, n_obs, error_variance, output_path='./',
fpath_obs_locations=False):
"""Create obs_seq.in """Create obs_seq.in
implemented for Args:
- reflectivity of VIS 0.6 time_dt (dt.datetime): time of observation
channel_id (int): SEVIRI channel number
see https://nwp-saf.eumetsat.int/downloads/rtcoef_rttov12/ir_srf/rtcoef_msg_4_seviri_srf.html
n_obs (int):
number of observations (must be a square of an integer: 4, 9, 1000, ...)
error_variance (float):
gaussian error with this variance is added to the truth at observation time
output_path (str): directory where `obs_seq.in` will be saved
fpath_obs_locations (False or str):
write an obs_coords.pkl, can be used to check observation locations
if str, write to file
""" """
time_dt = add_timezone_UTC(time_dt) time_dt = add_timezone_UTC(time_dt)
# time_dt = dt.datetime(2008, 7, 30, 15, 30, tzinfo=dt.timezone.utc) # time_dt = dt.datetime(2008, 7, 30, 15, 30, tzinfo=dt.timezone.utc)
assert n_obs == int(n_obs) assert n_obs == int(n_obs)
n_obs_str = str(int(n_obs)) n_obs_str = str(int(n_obs))
error_variance = str(error_variance) error_variance = str(error_variance)
# solar angles # Brightness temperature or Reflectance?
channel_id = int(channel_id)
if channel_id in [1, 2, 3, 11]:
line_obstypedef = ' 256 MSG_4_SEVIRI_BDRF'
else:
line_obstypedef = ' 255 MSG_4_SEVIRI_TB'
channel_id = str(channel_id)
# position on earth to calculate solar angles
lat0 = 45. lat0 = 45.
lon0 = 0. lon0 = 0.
sat_az = "180.0" sat_az = "180.0"
sat_zen = "45.0" sat_zen = "45.0"
radius_earth = 6.371*1E6 radius_earth_meters = 6.371*1E6
distance_between_obs_meters = 40000
# equally spaced grid for satellite observations every 4 km
nx, ny = int(np.sqrt(n_obs)), int(np.sqrt(n_obs))
coords = [] coords = []
coords_from_domaincenter = False
if coords_from_domaincenter:
"""
Create equally spaced grid for satellite observations every 4 km
e.g. ny,nx = 10
-> obs locations are from -5 to +5 times dy in south_north direction
and from -5 to +5 times dx in west_east direction
"""
nx, ny = int(np.sqrt(n_obs)), int(np.sqrt(n_obs))
km_per_degree = 2*np.pi*radius_earth/360 m_per_degree = 2*np.pi*radius_earth_meters/360
dy_4km_in_degree = 4000/km_per_degree dy_4km_in_degree = distance_between_obs_meters/m_per_degree
#print(dy_4km_in_degree)
for iy in range(int(-ny/2), int(ny/2+1)): for iy in range(int(-ny/2), int(ny/2+1)):
for ix in range(int(-nx/2), int(nx/2+1)): for ix in range(int(-nx/2), int(nx/2+1)):
lat = lat0 + iy*dy_4km_in_degree lat = lat0 + iy*dy_4km_in_degree
km_per_degree_x = 2*np.pi*radius_earth*np.sin(lat/180*np.pi)/360 m_per_degree_x = 2*np.pi*radius_earth_meters*np.sin(lat/180*np.pi)/360
dx_4km_in_degree = 4000/km_per_degree_x dx_4km_in_degree = distance_between_obs_meters/m_per_degree_x
lon = lon0 + ix*dx_4km_in_degree lon = lon0 + ix*dx_4km_in_degree
coords.append((lat, lon)) coords.append((lat, lon))
else:
"""Observations spread evenly over domain"""
fcoords = '/home/fs71386/lkugler/run_DART/geo_em.d01.nc'
import xarray as xr
ds = xr.open_dataset(fcoords)
lons = ds.XLONG_M.isel(Time=0).values
lats = ds.XLAT_M.isel(Time=0).values
n_obs_x = int(np.sqrt(n_obs))
for i in range(n_obs_x):
for j in range(n_obs_x):
coords.append((lats[i,j], lons[i,j]))
if False: try:
import pickle import pickle
with open('obs_coords.pkl', 'wb') as f: with open(fpath_obs_locations, 'wb') as f:
pickle.dump(coords, f) pickle.dump(coords, f); print(fpath_obs_locations, 'saved.')
except Exception as e:
warnings.warn(str(e))
sun_az = str(get_azimuth(lat0, lon0, time_dt)) sun_az = str(get_azimuth(lat0, lon0, time_dt))
sun_zen = str(90. - get_altitude(lat0, lon0, time_dt)) sun_zen = str(90. - get_altitude(lat0, lon0, time_dt))
print('sunzen', sun_zen, 'sunazi', sun_az) print('sunzen', sun_zen, 'sunazi', sun_az)
days_since_010120 = 145731 dart_date_day, secs_thatday = get_dart_date(time_dt)
ref_day = dt.datetime(2000,1,1, tzinfo=dt.timezone.utc)
dart_date_day = str((time_dt - ref_day).days + days_since_010120)
secs_thatday = str(int((time_dt - round_to_day(time_dt)).total_seconds()))
print('secs, days:', secs_thatday, dart_date_day) print('secs, days:', secs_thatday, dart_date_day)
msg = """ msg = """
obs_sequence obs_sequence
obs_kind_definitions obs_kind_definitions
1 1
256 MSG_4_SEVIRI_BDRF """+line_obstypedef+"""
num_copies: 0 num_qc: 0 num_copies: 0 num_qc: 0
num_obs: """+n_obs_str+" max_num_obs: "+n_obs_str+""" num_obs: """+n_obs_str+" max_num_obs: "+n_obs_str+"""
first: 1 last: """+n_obs_str first: 1 last: """+n_obs_str
for i_obs in range(1, int(n_obs)+1): for i_obs in range(int(n_obs)):
# data # data
lon = coords[i_obs][1] lon = coords[i_obs][1]
...@@ -105,7 +148,7 @@ kind ...@@ -105,7 +148,7 @@ kind
visir visir
"""+sat_az+""" """+sat_zen+""" """+sun_az+""" """+sat_az+""" """+sat_zen+""" """+sun_az+"""
"""+sun_zen+""" """+sun_zen+"""
12 4 21 1 12 4 21 """+channel_id+"""
-888888.000000000 -888888.000000000
1 1
"""+secs_thatday+""" """+dart_date_day+""" """+secs_thatday+""" """+dart_date_day+"""
...@@ -123,19 +166,26 @@ kind ...@@ -123,19 +166,26 @@ kind
visir visir
"""+sat_az+""" """+sat_zen+""" """+sun_az+""" """+sat_az+""" """+sat_zen+""" """+sun_az+"""
"""+sun_zen+""" """+sun_zen+"""
12 4 21 1 12 4 21 """+channel_id+"""
-888888.000000000 -888888.000000000
1 1
"""+secs_thatday+""" """+dart_date_day+""" """+secs_thatday+""" """+dart_date_day+"""
"""+error_variance """+error_variance
#print(msg) fpath = output_path+'/obs_seq.in'
os.remove(output_path+'/obs_seq.in') try:
with open(output_path+'/obs_seq.in', 'w') as f: os.remove(fpath)
except OSError:
pass
with open(fpath, 'w') as f:
f.write(msg) f.write(msg)
print(fpath, 'saved.')
if __name__ == '__main__': if __name__ == '__main__':
time_dt = dt.datetime(2008, 7, 30, 15, 30, tzinfo=dt.timezone.utc) time_dt = dt.datetime(2008, 7, 30, 15, 30, tzinfo=dt.timezone.utc)
n_obs = 100 n_obs = 100
error_variance = str(.001) channel_id = 1
run(time_dt, n_obs, error_variance) error_variance = 0.001
run(time_dt, channel_id, n_obs, error_variance, output_path='./',
fpath_obs_locations='./domain.pkl')
...@@ -2,8 +2,10 @@ import os, sys, shutil ...@@ -2,8 +2,10 @@ import os, sys, shutil
import datetime as dt import datetime as dt
from config.cfg import exp, cluster from config.cfg import exp, cluster
from utils import symlink, copy, sed_inplace, append_file from utils import symlink, copy, sed_inplace, append_file
import create_obs_sat
time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M') time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
channel_id = int(sys.argv[2])
# ensure correct input.nml # ensure correct input.nml
copy(cluster.scriptsdir+'/../templates/input.nml', copy(cluster.scriptsdir+'/../templates/input.nml',
...@@ -12,18 +14,10 @@ sed_inplace(cluster.dartrundir+'/input.nml', '<n_ens>', str(int(exp.n_ens))) ...@@ -12,18 +14,10 @@ sed_inplace(cluster.dartrundir+'/input.nml', '<n_ens>', str(int(exp.n_ens)))
append_file(cluster.dartrundir+'/input.nml', cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml') append_file(cluster.dartrundir+'/input.nml', cluster.scriptsdir+'/../templates/obs_def_rttov.VIS.nml')
# prepare observation file # prepare observation file
import create_obs_sat create_obs_sat.run(time, channel_id, exp.n_obs, exp.error_variance,
create_obs_sat.run(time, exp.n_obs, exp.error_variance, output_path=cluster.dartrundir) output_path=cluster.dartrundir,
fpath_obs_locations=cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M')
+'/obs_coords_id'+str(channel_id)+'.pkl')
if not os.path.exists(cluster.dartrundir+'/obs_seq.in'): if not os.path.exists(cluster.dartrundir+'/obs_seq.in'):
raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir) raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir)
# get wrfout_d01 from nature run
shutil.copy(time.strftime(cluster.nature_wrfout),
cluster.dartrundir+'/wrfout_d01')
import wrfout_add_geo
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01')
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01')
import os, sys, shutil
import datetime as dt
from config.cfg import exp, cluster
from utils import symlink, copy
time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
# get wrfout_d01 from nature run
shutil.copy(time.strftime(cluster.nature_wrfout),
cluster.dartrundir+'/wrfout_d01')
import wrfout_add_geo
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01')
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01')
...@@ -31,7 +31,7 @@ ...@@ -31,7 +31,7 @@
async = 0, async = 0,
adv_ens_command = "../shell_scripts/advance_model.csh", adv_ens_command = "../shell_scripts/advance_model.csh",
ens_size = <n_ens>, ens_size = <n_ens>,
obs_sequence_in_name = "obs_seq.out", obs_sequence_in_name = "obs_seq_all.out",
obs_sequence_out_name = "obs_seq.final", obs_sequence_out_name = "obs_seq.final",
input_state_file_list = "input_list.txt" input_state_file_list = "input_list.txt"
output_state_file_list = "output_list.txt" output_state_file_list = "output_list.txt"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment