Skip to content
Snippets Groups Projects
Commit d88bcbeb authored by Lukas Kugler's avatar Lukas Kugler
Browse files

fixes

parent 8ed445f0
Branches
Tags
No related merge requests found
......@@ -127,7 +127,7 @@ def run_ENS(begin, end, depends_on=None):
id = s.run(cmd, depends_on=[id])
# apply forward operator (DART filter without assimilation)
s = my_Slurm("fwOP-1m", cfg_update=dict(time="2"))
s = my_Slurm("fwOP-1m", cfg_update=dict(time="10"))
id = s.run(cluster.python+' '+cluster.scriptsdir+'/apply_obs_op_dart.py '
+ begin.strftime('%Y-%m-%d_%H:%M')+' '
+ begin_plus1.strftime('%Y-%m-%d_%H:%M'),
......@@ -140,7 +140,7 @@ def run_ENS(begin, end, depends_on=None):
id = s.run(' '.join([cluster.python,
cluster.scriptsdir+'/prepare_namelist.py',
begin.strftime('%Y-%m-%d_%H:%M'),
begin_plus1.strftime('%Y-%m-%d_%H:%M'),
end.strftime('%Y-%m-%d_%H:%M'),
str(hist_interval), str(radt),]),
depends_on=[id])
......@@ -257,16 +257,15 @@ elif start_from_existing_state:
# values for assimilation
assim_time = time
prior_init_time = init_time
prior_path_exp = False # use own exp path
prior_path_exp = exppath_arch
while time <= dt.datetime(2008, 7, 30, 10):
while time <= dt.datetime(2008, 7, 30, 16):
id = assimilate(assim_time,
prior_init_time,
prior_path_exp=prior_path_exp,
depends_on=id)
#prior_path_exp = False # use own exp path
prior_path_exp = False # use own exp path
# integration
this_forecast_init = assim_time # start integration from here
......
......@@ -4,9 +4,10 @@ import numpy as np
from scipy.interpolate import interp1d
from config.cfg import exp, cluster
from utils import symlink, copy, mkdir, sed_inplace, append_file
from utils import symlink, copy, mkdir, sed_inplace, append_file, print
import create_obsseq as osq
from gen_synth_obs import read_prior_obs, set_input_nml
import assim_synth_obs as aso
from assim_synth_obs import read_prior_obs, set_DART_nml, generate_observations, assimilate
import pre_assim
def run_operator(obscfg, time):
......@@ -16,12 +17,11 @@ def run_operator(obscfg, time):
# get observation file (obs not important, but their locations)
# this should correspond to configuration to have same locations as in real assim
obs_seq_all_out = cluster.dartrundir + '/obs_seq_all.out'
os.chdir(cluster.dartrundir)
n_obs = obscfg['n_obs']
error_var = (obscfg['err_std'])**2
sat_channel = obscfg.get('channel', False)
sat_channel = obscfg.get('sat_channel', False)
cov_loc = obscfg['cov_loc_radius_km']
dist_obs = obscfg.get('distance_between_obs_km', False)
......@@ -38,37 +38,56 @@ def run_operator(obscfg, time):
wrfout_add_geo.run(cluster.dartrundir+'/geo_em.d01.nc',
cluster.dartrundir+'/wrfout_d01')
print('running perfect model obs', flush=True)
print('running perfect model obs')
os.system('mpirun -np 12 ./perfect_model_obs')
# set namelist for filter (calc only forward op)
set_input_nml(sat_channel=obscfg.get('channel', False),
aso.set_DART_nml(sat_channel=sat_channel,
just_prior_values=True)
# run filter
os.system('mv obs_seq.out obs_seq_all.out')
assert os.path.exists(obs_seq_all_out)
print('running filter', flush=True)
assert os.path.exists(cluster.dartrundir+'/obs_seq.out')
print('running filter')
os.system('mpirun -np 40 ./filter')
# copy output to archive
savedir = cluster.archivedir()+'/obs_seq_final_1min/'
mkdir(savedir)
obsname = obscfg.get('kind', 'satch'+str(obscfg['sat_channel']))
fout = time.strftime('/%Y-%m-%d_%H:%M_'+obsname+'_obs_seq.final')
obsname = obscfg['kind']
copy(cluster.dartrundir+'/obs_seq.final', savedir+fout)
print('output of observation operator saved to', fout)
if __name__ == '__main__':
prev_forecast_init = dt.datetime.strptime(sys.argv[1], '%Y-%m-%d_%H:%M')
time_to_run_fw_op = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
time = dt.datetime.strptime(sys.argv[2], '%Y-%m-%d_%H:%M')
exppath_firstguess = cluster.archivedir()
print(prev_forecast_init, time_to_run_fw_op)
print(prev_forecast_init, time)
# link ensemble states to run_DART directory
pre_assim.run(time_to_run_fw_op, prev_forecast_init, exppath_firstguess)
pre_assim.run(time, prev_forecast_init, exppath_firstguess)
savedir = cluster.archivedir()+'/obs_seq_final_1min/'
mkdir(savedir)
n_stages = len(exp.observations)
for istage, obscfg in enumerate(exp.observations):
kind = obscfg['kind']
n_obs = obscfg['n_obs']
sat_channel = obscfg.get('sat_channel', False)
obscfg['folder_obs_coords'] = False
aso.set_DART_nml(sat_channel=sat_channel,
cov_loc_radius_km=obscfg['cov_loc_radius_km'],
cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False),
just_prior_values=True)
osq.create_obsseq_in(time, obscfg)
aso.generate_observations()
aso.assimilate()
archive_stage = savedir+kind
aso.archive_diagnostics(archive_stage, time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final'))
# run fwd operator, save to archive
for i_obs, obscfg in enumerate(exp.observations):
run_operator(obscfg, time_to_run_fw_op)
import os, sys, shutil
import warnings
import datetime as dt
import numpy as np
from scipy.interpolate import interp1d
from config.cfg import exp, cluster
from utils import symlink, copy, sed_inplace, append_file
from utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print
import create_obsseq as osq
import wrfout_add_geo
......@@ -66,34 +67,38 @@ def read_obsseqout(f):
obs.append(observed)
return true, obs
def edit_obserr_in_obsseq(fpath_obsseqin, OEs):
"""
overwrite observation errors in a obs_seq.out file
according to the values in OEs
"""
# write to txt (write whole obs_seq.out again)
obsseq = open(fpath_obsseqin, 'r').readlines()
obsseq_new = obsseq.copy()
i_obs = 0
for i, line in enumerate(obsseq):
if 'kind\n' in line:
i_line_oe = i+9 # 9 for satellite obs
obsseq_new[i_line_oe] = ' '+str(OEs[i_obs])+' \n'
i_obs += 1
os.rename(fpath_obsseqin, fpath_obsseqin+'-bak') # backup
# write cloud dependent errors (actually whole file)
with open(fpath_obsseqin, 'w') as f:
for line in obsseq_new:
f.write(line)
def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False):
# def edit_obserr_in_obsseq(fpath_obsseqin, OEs):
# """
# overwrite observation errors in a obs_seq.out file
# according to the values in OEs
# """
# # write to txt (write whole obs_seq.out again)
# obsseq = open(fpath_obsseqin, 'r').readlines()
# obsseq_new = obsseq.copy()
# i_obs = 0
# for i, line in enumerate(obsseq):
# if 'kind\n' in line:
# i_line_oe = i+9 # 9 for satellite obs
# obsseq_new[i_line_oe] = ' '+str(OEs[i_obs])+' \n'
# i_obs += 1
# os.rename(fpath_obsseqin, fpath_obsseqin+'-bak') # backup
# # write cloud dependent errors (actually whole file)
# with open(fpath_obsseqin, 'w') as f:
# for line in obsseq_new:
# f.write(line)
def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False,
just_prior_values=False):
"""descr"""
cov_loc_radian = cov_loc_radius_km/earth_radius_km
copy(cluster.scriptsdir+'/../templates/input.nml',
cluster.dartrundir+'/input.nml')
if just_prior_values:
template = cluster.scriptsdir+'/../templates/input.prioronly.nml'
else:
template = cluster.scriptsdir+'/../templates/input.nml'
copy(template, cluster.dartrundir+'/input.nml')
# options are overwritten with settings
options = {'<n_ens>': str(int(exp.n_ens)),
......@@ -123,6 +128,7 @@ def set_DART_nml(sat_channel=False, cov_loc_radius_km=32, cov_loc_vert_km=False)
append_file(cluster.dartrundir+'/input.nml', rttov_nml)
def obs_operator_ensemble():
print('running obs operator on ensemble forecast')
os.chdir(cluster.dartrundir)
if sat_channel:
......@@ -155,6 +161,7 @@ def obs_operator_ensemble():
raise NotImplementedError()
def obs_operator_nature():
print('running obs operator on nature run')
prepare_nature_dart()
os.chdir(cluster.dartrundir)
......@@ -163,7 +170,6 @@ def obs_operator_nature():
true, _ = read_obsseqout(cluster.dartrundir+'/obs_seq.out')
return true
def prepare_nature_dart():
# get wrfout_d01 from nature run
shutil.copy(time.strftime(cluster.nature_wrfout),
......@@ -191,24 +197,25 @@ def calc_obserr_WV73(Hx_nature, Hx_prior):
return OEs
def generate_observations():
# generate actual observations (with correct error)
print('generate actual observations')
os.chdir(cluster.dartrundir)
os.remove(cluster.dartrundir+'/obs_seq.out')
try_remove(cluster.dartrundir+'/obs_seq.out')
if not os.path.exists(cluster.dartrundir+'/obs_seq.in'):
raise RuntimeError('obs_seq.in does not exist in '+cluster.dartrundir)
os.system('mpirun -np 12 ./perfect_model_obs')
def assimilate():
print('running filter')
os.chdir(cluster.dartrundir)
os.remove(cluster.dartrundir+'/obs_seq.final')
try_remove(cluster.dartrundir+'/obs_seq.final')
if not os.path.exists(cluster.dartrundir+'/obs_seq.out'):
raise RuntimeError('obs_seq.out does not exist in '+cluster.dartrundir)
os.system('mpirun -np 48 ./filter')
def archive_diagnostics(archive_stage):
def archive_diagnostics(archive_stage, fname_final):
print('archive obs space diagnostics')
mkdir(archive_stage)
copy(cluster.dartrundir+'/obs_seq.final', archive_stage+'/obs_seq.final')
copy(cluster.dartrundir+'/obs_seq.final', archive_stage+'/'+fname_final)
try:
print('archive regression diagnostics')
......@@ -223,12 +230,9 @@ def recycle_output():
cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01')
def archive_output_mean(archive_stage):
for iens in range(1, exp.n_ens+1):
savedir = archive_stage+'/'+str(iens)
mkdir(savedir)
copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml')
# for iens in range(1, exp.n_ens+1):
# savedir = archive_stage+'/'+str(iens)
# mkdir(savedir)
# filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc'
# filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4)
......@@ -269,13 +273,13 @@ if __name__ == "__main__":
for istage, obscfg in enumerate(exp.observations):
kind = obscfg['kind']
archive_stage = archive_time + '/stage'+str(istage)+'_'+kind
archive_stage = archive_time + '/assim_stage'+str(istage)+'_'+kind
n_obs = obscfg['n_obs']
sat_channel = obscfg.get('sat_channel', False)
obscfg['folder_obs_coords'] = archive_stage+'/obs_coords.pkl'
set_DART_nml(sat_channel=sat_channel,
cov_loc=obscfg['cov_loc_radius_km'],
cov_loc_radius_km=obscfg['cov_loc_radius_km'],
cov_loc_vert_km=obscfg.get('cov_loc_vert_km', False))
use_error_parametrization = obscfg['err_std'] == False
......@@ -296,15 +300,17 @@ if __name__ == "__main__":
generate_observations()
assimilate()
archive_diagnostics(archive_stage)
archive_diagnostics(archive_stage, '/obs_seq.final')
if istage < n_stages-1:
# recirculation: filter output -> input
recycle_output()
copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml')
archive_output_mean(archive_stage)
elif istage == n_stages-1:
# last assimilation, continue integration now
copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml')
pass # call update wrfinput from filteroutput later
else:
......
import os, sys, shutil, glob
import builtins as __builtin__
#copy = shutil.copy
def print(*args):
__builtin__.print(*args, flush=True)
def copy(src, dst):
try:
os.remove(dst)
......@@ -8,6 +12,12 @@ def copy(src, dst):
pass
shutil.copy(src, dst)
def try_remove(f):
try:
os.remove(f)
except:
pass
def mkdir(path):
os.system('mkdir -p '+path)
......
......@@ -570,3 +570,4 @@
interp_test_vertcoord = 'VERTISHEIGHT'
verbose = .false.
/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment