Skip to content
Snippets Groups Projects
Commit ded54c86 authored by Lukas Kugler's avatar Lukas Kugler
Browse files

multi obstype, rewrite v1

parent 83221903
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,7 @@ n_obs = 64 # radar: n_obs for each observation height level
vis = dict(sat_channel=1, n_obs=n_obs, err_std=0.03,
cov_loc_radius_km=10)
wv = dict(sat_channel=6, n_obs=n_obs, err_std=5.,
wv = dict(sat_channel=6, n_obs=n_obs, err_std=False,
cov_loc_radius_km=10)
ir108 = dict(sat_channel=9, n_obs=n_obs, err_std=5.,
cov_loc_radius_km=10)
......
......@@ -34,8 +34,7 @@ vsc.namelist = vsc.scriptsdir+'/../templates/namelist.input'
vsc.run_WRF = '/gpfs/data/fs71386/lkugler/DART-WRF/scripts/osse/run_ens.vsc.sh'
vsc.slurm_cfg = {"account": "p71386", "partition": "mem_0384", "qos": "p71386_0384",
"nodes": "1", "ntasks": "1",
"ntasks-per-node": "48", "ntasks-per-core": 1,
"ntasks": "1", "nodes": "1", "ntasks-per-node": "48", "ntasks-per-core": "1",
"mail-type": "FAIL", "mail-user": "lukas.kugler@univie.ac.at"}
......
......@@ -85,7 +85,7 @@ do
mv $rundir/rsl.out.0000 $rundir/rsl.out.input
done
"""
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens),
s = my_Slurm("ideal", cfg_update={"ntasks": str(exp.n_ens), "nodes": "1",
"time": "10", "mem-per-cpu": "2G"})
id = s.run(cmd, depends_on=[id])
return id
......@@ -188,18 +188,18 @@ def assimilate(assim_time, prior_init_time,
+prior_path_exp, depends_on=[depends_on])
# prepare nature run, generate observations
s = my_Slurm("genSynthObs", cfg_update=dict(ntasks="48", time="12"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/gen_synth_obs.py '
s = my_Slurm("Assim", cfg_update=dict(nodes="1", ntasks="48", time="50", mem="200G"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/assim_synth_obs.py '
+time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
# actuall assimilation step
s = my_Slurm("Assim", cfg_update=dict(ntasks="48", time="50", mem="200G"))
cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out'
id = s.run(cmd, depends_on=[id])
# # actuall assimilation step
# s = my_Slurm("Assim", cfg_update=dict(nodes="1", ntasks="48", time="50", mem="200G"))
# cmd = 'cd '+cluster.dartrundir+'; mpirun -np 48 ./filter; rm obs_seq_all.out'
# id = s.run(cmd, depends_on=[id])
s = my_Slurm("archiveAssim", cfg_update=dict(time="10"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py '
+ assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
# s = my_Slurm("archiveAssim", cfg_update=dict(time="10"))
# id = s.run(cluster.python+' '+cluster.scriptsdir+'/archive_assim.py '
# + assim_time.strftime('%Y-%m-%d_%H:%M'), depends_on=[id])
s = my_Slurm("updateIC", cfg_update=dict(time="8"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/update_wrfinput_from_filteroutput.py '
......@@ -245,27 +245,28 @@ if is_new_run:
first_guess = False
elif start_from_existing_state:
id = prepare_wrfinput() # create initial conditions
#id = prepare_wrfinput() # create initial conditions
# get initial conditions from archive
init_time = dt.datetime(2008, 7, 30, 6)
time = dt.datetime(2008, 7, 30, 10)
exppath_arch = '/gpfs/data/fs71386/lkugler/sim_archive/exp_v1.11_LMU_filter'
id = update_wrfinput_from_archive(time, init_time, exppath_arch, depends_on=id)
#id = update_wrfinput_from_archive(time, init_time, exppath_arch, depends_on=id)
# values for assimilation
assim_time = time
prior_init_time = init_time
prior_path_exp = exppath_arch
prior_path_exp = False # use own exp path
while time <= dt.datetime(2008, 7, 30, 18):
while time <= dt.datetime(2008, 7, 30, 10):
id = assimilate(assim_time,
prior_init_time,
prior_path_exp=prior_path_exp,
depends_on=id)
prior_path_exp = False # use own exp path
#prior_path_exp = False # use own exp path
# integration
this_forecast_init = assim_time # start integration from here
......@@ -282,6 +283,6 @@ while time <= dt.datetime(2008, 7, 30, 18):
assim_time = time
prior_init_time = assim_time - timedelta_btw_assim
create_satimages(depends_on=id)
#create_satimages(depends_on=id)
mailme(id)
......@@ -137,9 +137,7 @@ def obs_operator_ensemble():
copy(cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01',
cluster.dartrundir+'/wrfout_d01')
t = dt.datetime.now()
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc', cluster.dartrundir+'/wrfout_d01')
print((dt.datetime.now()-t).total_seconds(), 'secs for adding geodata')
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01')
......@@ -148,8 +146,8 @@ def obs_operator_ensemble():
os.system('mpirun -np 12 ./perfect_model_obs > /dev/null')
# truth values in obs_seq.out are H(x) values
vals, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out')
list_ensemble_truths.append(vals)
true, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out')
list_ensemble_truths.append(true)
n_obs = len(list_ensemble_truths[0])
np_array = np.full((exp.n_ens, n_obs), np.nan)
......@@ -159,15 +157,16 @@ def obs_operator_ensemble():
else:
raise NotImplementedError()
def obs_operator_nature():
prepare_nature_dart()
if __name__ == "__main__":
time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
fpath_obs_coords = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/obs_coords.pkl')
set_input_nml(sat_channel=sat_channel)
os.chdir(cluster.dartrundir)
os.system('rm -f obs_seq_*.out obs_seq.in obs_seq.final') # remove any existing observation files
os.remove(cluster.dartrundir+'/obs_seq.out')
os.system('mpirun -np 12 ./perfect_model_obs')
true, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out')
return true
def prepare_nature_dart():
# get wrfout_d01 from nature run
......@@ -179,97 +178,154 @@ if __name__ == "__main__":
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir+'/wrfout_d01', cluster.dartrundir+'/wrfinput_d01')
prepare_nature_dart()
def calc_obserr_WV73(Hx_nature, Hx_prior):
# loop over observation types
for i_obs, obscfg in enumerate(exp.observations):
n_obs = len(Hx_nature)
OEs = np.ones(n_obs)
for iobs in range(n_obs):
n_obs = obscfg['n_obs']
error_var = (obscfg['err_std'])**2
sat_channel = obscfg.get('sat_channel', False)
cov_loc = obscfg['cov_loc_radius_km']
dist_obs = obscfg.get('distance_between_obs_km', False)
cov_loc_vert_km = obscfg.get('cov_loc_vert_km', False)
heights = obscfg.get('heights', False)
bt_y = Hx_nature[iobs]
bt_x_ens = Hx_prior[:, iobs]
CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens]
mean_CI = np.mean(CIs)
# generate obs_seq.in
obs_coords = osq.calc_obs_locations(n_obs, coords_from_domaincenter=False,
distance_between_obs_km=dist_obs,
fpath_obs_locations=fpath_obs_coords)
oe_nature = oe_73(mean_CI)
print('oe_nature:', oe_nature, ', bt_y:', bt_y, ', mean_CI:', mean_CI)
OEs[iobs] = oe_nature
return OEs
if sat_channel:
osq.sat(time, sat_channel, obs_coords, error_var,
output_path=cluster.dartrundir)
else:
osq.generic_obs(obscfg['kind'], time, obs_coords, error_var,
heights=heights,
output_path=cluster.dartrundir)
def generate_observations():
set_input_nml(sat_channel=sat_channel, cov_loc_radius_km=cov_loc,
cov_loc_vert_km=cov_loc_vert_km)
# generate actual observations (with correct error)
os.chdir(cluster.dartrundir)
os.remove(cluster.dartrundir+'/obs_seq.out')
if not os.path.exists(cluster.dartrundir+'/obs_seq.in'):
raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir)
os.system('mpirun -np 12 ./perfect_model_obs')
def assimilate():
os.chdir(cluster.dartrundir)
if sat_channel == 6:
"""cloud dependent observation error
# methodologically:
1) gen H(x_nature)
2) gen H(x_prior)
3) find the observation error for a pair of (H(x_nature), H(x_background))
4) generate actual observation
with the observation error as function of H(x_nature) and H(x_background)
# technically:
4) the file 'obs_seq.in' needs to be edited to show corrected observation errors
"""
# 1) gen H(x_nature)
set_input_nml(sat_channel=sat_channel,
cov_loc_radius_km=cov_loc,
cov_loc_vert_km=cov_loc_vert_km)
os.system('mpirun -np 12 ./perfect_model_obs')
Hx_nature, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out')
os.remove(cluster.dartrundir+'/obs_seq.final')
if not os.path.exists(cluster.dartrundir+'/obs_seq.out'):
raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir)
os.system('mpirun -np 48 ./filter')
def archive_diagnostics(time):
print('archive obs space diagnostics')
savedir = cluster.archivedir()+'/obs_seq_final/'
mkdir(savedir)
copy(cluster.dartrundir+'/obs_seq.final', savedir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final'))
try:
print('archive regression diagnostics')
savedir = cluster.archivedir()+'/reg_factor/'
mkdir(savedir)
copy(cluster.dartrundir+'/reg_diagnostics', savedir+time.strftime('/%Y-%m-%d_%H:%M_reg_diagnostics'))
except Exception as e:
warnings.warn(str(e))
def recycle_output():
print('move output to input')
for iens in range(1, exp.n_ens+1):
os.rename(cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4),
cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01')
# 2) gen H(x_prior) for the whole ensemble
Hx_prior = obs_operator_ensemble() # files are already linked to DART directory
# 3) find the observation error for a pair of (H(x_nature), H(x_background))
# necessary to achieve a certain FGD distribution which is near to operational
n_obs = len(Hx_nature)
OEs = []
for iobs in range(n_obs):
def archive_output_mean(time, istage):
istage = str(istage)
bt_y = Hx_nature[iobs]
bt_x_ens = Hx_prior[:,iobs]
CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens]
mean_CI = np.mean(CIs)
print('copy prior posterior members to archive')
for iens in range(1, exp.n_ens+1):
savedir = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/')+str(iens)
mkdir(savedir)
oe_nature = oe_73(mean_CI)
print('oe_nature=', oe_nature, ' K')
OEs.append(oe_nature)
copy(cluster.dartrundir+'/input.nml',
cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/input '+istage+'.nml'))
# correct obs_err in obs_seq.in (to produce actual observations later on)
fpath_obsseqout = cluster.dartrundir+'/obs_seq.in'
edit_obserr_in_obsseq(fpath_obsseqout, OEs)
# filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc'
# filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4)
# ensure correct nature file linked
# nature should be the real nature again (was changed in the meantime)
prepare_nature_dart()
# copy mean and sd to archive
print('copy preassim, postassim mean and sd')
for f in ['output_mean.nc', 'output_sd.nc']:
copy(cluster.dartrundir+'/'+f,
cluster.archivedir()+'/'+f[:-3]+time.strftime('_'+istage+'_%Y-%m-%d_%H:%M:%S'))
# correct input.nml for actual assimilation later on
set_input_nml(sat_channel=sat_channel,
cov_loc_radius_km=cov_loc,
cov_loc_vert_km=cov_loc_vert_km)
# 4) generate actual observations (with correct error)
os.chdir(cluster.dartrundir)
os.system('mpirun -np 12 ./perfect_model_obs')
if __name__ == "__main__":
"""Generate observations (obs_seq.out file)
as defined in config/cfg.py
for a certain timestamp (argument) of the nature run (defined in config/clusters.py)
Workflow:
for each assimilation stage (one obs_seq.in and e.g. one observation type):
1) prepare nature run for DART
optional: 2) calculate obs-error from parametrization
3) create obs_seq.in with obs-errors from 2)
4) generate actual observations (obs_seq.out) with obs_seq.in from 3)
# rename according to i_obs
os.rename(cluster.dartrundir+'/obs_seq.out',
cluster.dartrundir+'/obs_seq_'+str(i_obs)+'.out')
- calculate obs-error from parametrization
1) create obs_seq.in with obs error=0
2) calculate y_nat = H(x_nature) and y_ens = H(x_ensemble)
3) calculate obs error as function of y_nat, y_ensmean
Assumptions:
- x_ensemble is already linked for DART to advance_temp<iens>/wrfout_d01
"""
time = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
folder_obs_coords = cluster.archivedir()+time.strftime('/%Y-%m-%d_%H:%M/')
# concatenate the created obs_seq_*.out files
os.chdir(cluster.dartrundir)
os.system('cat obs_seq_*.out >> obs_seq_all.out')
os.system('rm -f obs_seq.out obs_seq.in obs_seq.final') # remove any existing observation files
print(dt.datetime.now())
n_stages = len(exp.observations)
for istage, obscfg in enumerate(exp.observations):
n_obs = obscfg['n_obs']
sat_channel = obscfg.get('sat_channel', False)
obs_coords = osq.calc_obs_locations(n_obs,
coords_from_domaincenter=False,
distance_between_obs_km=obscfg.get('distance_between_obs_km', False),
folder_obs_coords=folder_obs_coords)
set_DART_nml(sat_channel=sat_channel,
cov_loc=obscfg['cov_loc_radius_km'],
cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False))
use_error_parametrization = obscfg['err_std'] == False
if use_error_parametrization:
if sat_channel != 6:
raise NotImplementedError('sat channel '+str(sat_channel))
osq.create_obsseq_in(obscfg, np.zeros(n_obs))
Hx_nat = obs_operator_nature()
Hx_prior = obs_operator_ensemble() # files are already linked to DART directory
obserr_var = calc_obserr_WV73(Hx_nat, Hx_prior)
else:
obserr_var = np.ones(n_obs) * obscfg['err_std']**2
osq.create_obsseq_in(obscfg, obserr_var) # now with correct errors
generate_observations()
assimilate()
archive_diagnostics(time)
if istage < n_stages-1:
# recirculation: filter output -> input
recycle_output()
archive_output_mean(time, istage)
elif istage == n_stages-1:
# last assimilation, continue integration now
pass # call update wrfinput from filteroutput later
else:
RuntimeError('this should never occur?!')
......@@ -34,7 +34,7 @@ def get_dart_date(time_dt):
return dart_date_day, secs_thatday
def calc_obs_locations(n_obs, coords_from_domaincenter=True,
distance_between_obs_km=9999, fpath_obs_locations=False):
distance_between_obs_km=9999, folder_obs_coords=False):
"""Calculate coordinate pairs for locations
Args:
......@@ -104,6 +104,27 @@ def calc_obs_locations(n_obs, coords_from_domaincenter=True,
assert len(coords) == n_obs, (len(coords), n_obs)
return coords
def main_obspart(obs, last=False):
"""
Args:
obs (object)
last (bool): True if this is the last observation in the obs_seq file
"""
if last:
line_link = " "+str(obs.i-1)+" -1 -1"
else:
line_link = " -1 "+str(obs.i+1)+" -1"
return """
OBS """+str(obs.i)+"""
"""+line_link+"""
obdef
loc3d
"""+obs.lon_rad+" "+obs.lat_rad+" "+obs.vert+" "+obs.vert_coord_sys+"""
kind
"""+obs.kind_nr+"""
"""+obs.secs_thatday+""" """+obs.dart_date_day+"""
"""+obs.error_var
def write_generic_obsseq(obs_name, obs_kind_nr, error_var, coords,
dart_date_day, secs_thatday, output_path,
......@@ -324,23 +345,95 @@ def generic_obs(obs_kind, time_dt, coords, error_var, heights=False, output_path
dart_date_day, secs_thatday, output_path,
vert_coord_sfc=is_sfc_obs)
def obskind_read():
raw_obskind_dart = """
5 RADIOSONDE_TEMPERATURE
6 RADIOSONDE_SPECIFIC_HUMIDITY
12 AIRCRAFT_U_WIND_COMPONENT
13 AIRCRAFT_V_WIND_COMPONENT
14 AIRCRAFT_TEMPERATURE
16 ACARS_U_WIND_COMPONENT
17 ACARS_V_WIND_COMPONENT
18 ACARS_TEMPERATURE
29 LAND_SFC_PRESSURE
30 SAT_U_WIND_COMPONENT
31 SAT_V_WIND_COMPONENT
36 DOPPLER_RADIAL_VELOCITY
37 RADAR_REFLECTIVITY
83 GPSRO_REFRACTIVITY
94 SYNOP_SURFACE_PRESSURE
95 SYNOP_SPECIFIC_HUMIDITY
96 SYNOP_TEMPERATURE
254 MSG_4_SEVIRI_RADIANCE
255 MSG_4_SEVIRI_TB
256 MSG_4_SEVIRI_BDRF"""
# lookup table for kind nr
alist = raw_obskind_dart.split()
assert len(alist) % 2 == 0, alist
obskind_nrs = {}
for i in range(0, len(alist)-1, 2):
obskind_nrs[alist[i+1]] = alist[i]
return obskind_nrs
obskind_nrs = obskind_read()
def create_obsseq_in(obscfg, obserr_var):
"""
Args:
obserr_var (np.array): observation error variance
shape (n_obs,), one value for each observation,
"""
self.coords = osq.calc_obs_locations(obscfg['n_obs'],
coords_from_domaincenter=False,
distance_between_obs_km=obscfg.get('distance_between_obs_km', False),
fpath_obs_locations=folder_obs_coords+'/obs_coords_'+obscfg['kind']+'.pkl')
# for i_obs in obscfg['n_obs']:
# instruction = dict(kind_nr = obskind_nrs[obscfg['kind']],
# sat_channel = obscfg.get('sat_channel', False),
# heights = obscfg.get('heights', False),
# obs_kinds, time_dt, coords, error_var, heights=False, output_path='./'):
if 'SYNOP' in obs_kind:
is_sfc_obs = True
heights = [2,]
else:
is_sfc_obs = False
if not heights:
heights = [5000., ]
coords = calc_obs_locations_3d(coords, heights)
dart_date_day, secs_thatday = get_dart_date(add_timezone_UTC(time_dt))
print('secs, days:', secs_thatday, dart_date_day)
obs_kind_nr = obs_kind_nrs[obs_kind]
for obs_kind in obs_kinds:
write_generic_obsseq2(obs_kind, obs_kind_nr, error_var, coords,
dart_date_day, secs_thatday, output_path,
vert_coord_sfc=is_sfc_obs)
if __name__ == '__main__':
time_dt = dt.datetime(2008, 7, 30, 16, 31)
n_obs = 100
time_dt = dt.datetime(2008, 7, 30, 10, 0)
n_obs = 64
sat_channel = 1
distance_between_obs_meters = 10000
error_var = 0.001
#error_var = 0.001
obs_coords = calc_obs_locations(n_obs, coords_from_domaincenter=False,
distance_between_obs_km=distance_between_obs_meters,
fpath_obs_locations=None)
sat(time_dt, sat_channel, obs_coords, error_var, output_path='./')
#sat(time_dt, sat_channel, obs_coords, error_var, output_path='./')
# error_var = (5.)**2
# generic_obs('RADAR', time_dt, n_obs, error_var, distance_between_obs_meters,
# output_path='./', fpath_obs_locations='./domain.pkl')
# generic_obs('RADAR_REFLECTIVITY', time_dt, obs_coords, error_var, heights=[5000.,], output_path='./')
# error_var = (0.5)**2
# generic_obs('RASO_T', time_dt, n_obs, error_var, distance_between_obs_meters,
# output
error_var = (0.5)**2
generic_obs('RADIOSONDE_TEMPERATURE', time_dt, obs_coords, error_var, heights=[5000.,])
......@@ -31,7 +31,7 @@
async = 0,
adv_ens_command = "../shell_scripts/advance_model.csh",
ens_size = <n_ens>,
obs_sequence_in_name = "obs_seq_all.out",
obs_sequence_in_name = "obs_seq.out",
obs_sequence_out_name = "obs_seq.final",
input_state_file_list = "input_list.txt"
output_state_file_list = "output_list.txt"
......@@ -160,7 +160,10 @@
'MSG_4_SEVIRI_RADIANCE',
'MSG_4_SEVIRI_TB',
'MSG_4_SEVIRI_BDRF',
'LAND_SFC_PRESSURE'
'LAND_SFC_PRESSURE',
'SYNOP_SURFACE_PRESSURE',
'SYNOP_TEMPERATURE',
'SYNOP_SPECIFIC_HUMIDITY'
evaluate_these_obs_types = 'RADIOSONDE_SPECIFIC_HUMIDITY',
/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment