Skip to content
Snippets Groups Projects
Commit 526d315f authored by lkugler's avatar lkugler
Browse files

WV62, restructure etc

parent 4af099b3
No related branches found
No related tags found
No related merge requests found
from multiprocessing.sharedctypes import Value
import os, sys, shutil, warnings
import time as time_module
import datetime as dt
import numpy as np
from scipy.interpolate import interp1d
from config.cfg import exp, cluster
from dartwrf.utils import symlink, copy, sed_inplace, append_file, mkdir, try_remove, print, shell
import dartwrf.create_obsseq as osq
from dartwrf import wrfout_add_geo
from dartwrf import obsseq
from dartwrf.obs import error_models as err
earth_radius_km = 6370
def OE_model_harnisch_WV73(ci):
if ci >= 0 and ci < 16:
# Kelvin, fit of Fig 7b, Harnisch 2016
x_ci = [0, 5, 10.5, 13, 16] # average cloud impact [K]
y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error [K]
oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True)
return oe_73_linear(ci)
else:
return 13.0
def cloudimpact_73(bt_mod, bt_obs):
"""
follows Harnisch 2016
"""
biascor_obs = 0
bt_lim = 255 # Kelvin for 7.3 micron WV channel
ci_obs = max(0, bt_lim - (bt_obs - biascor_obs))
ci_mod = max(0, bt_lim - bt_mod)
ci = (ci_obs + ci_mod) / 2
return ci
def read_prior_obs(f_obs_prior):
"""
docstring
"""
obsseq = open(f_obs_prior, "r").readlines()
OBSs = []
# read observations from obs_seq.final
for i, line in enumerate(obsseq):
if " OBS " in line:
observed = float(obsseq[i + 1].strip())
truth = float(obsseq[i + 2].strip())
prior_ensmean = float(obsseq[i + 3].strip())
prior_enssd = float(obsseq[i + 4].strip())
prior_ens = []
for j in range(5, 5 + exp.n_ens):
prior_ens.append(float(obsseq[i + j].strip()))
OBSs.append(dict(observed=observed, truth=truth, prior_ens=np.array(prior_ens)))
return OBSs
def read_truth_obs_obsseq(f):
"""Reads observed and true values from obs_seq.out/final files."""
obsseq = open(f, "r").readlines()
true = []
obs = []
# read observations from obs_seq.out
for i, line in enumerate(obsseq):
if " OBS " in line:
observed = float(obsseq[i + 1].strip())
truth = float(obsseq[i + 2].strip())
true.append(truth)
obs.append(observed)
return true, obs
def replace_errors_obsseqout(f, new_errors):
"""Replaces existing observation errors in obs_seq.final files
new_errors (np.array) : standard deviation,
shape must match the number of observations
"""
debug = True
obsseq = open(f, "r").readlines()
# find number of lines between two ' OBS ' lines:
first_obs = second_obs = None
for i, line in enumerate(obsseq):
if " OBS " in line:
if first_obs is None:
first_obs = i
else:
second_obs = i
break
if not second_obs:
raise RuntimeError('just one OBS in this file?! '+str(f))
lines_between = second_obs - first_obs
lines_obserr_after_obsnr = lines_between - 1 # obserr line is one before ' OBS ' line
# replace values in list obsseq
i_obs = 0
for i, line in enumerate(obsseq):
if " OBS " in line:
line_error_obs_i = i + lines_obserr_after_obsnr
previous_err_var = obsseq[line_error_obs_i]
new_err_obs_i = new_errors[i_obs] ** 2 # variance in obs_seq.out
if debug:
print(
line.strip(),
"previous err var ",
float(previous_err_var.strip()),
"new error",
new_err_obs_i,
)
obsseq[line_error_obs_i] = " " + str(new_err_obs_i) + " \n"
i_obs += 1 # next iteration
with open(f, "w") as file:
for line in obsseq:
file.write(line)
print("replaced obs errors in", f)
def set_DART_nml(just_prior_values=False):
def to_radian_horizontal(cov_loc_horiz_km):
......@@ -142,6 +34,7 @@ def set_DART_nml(just_prior_values=False):
# options keys are replaced in input.nml with values
options = {
"<filter_kind>": str(int(exp.filter_kind)),
"<sampling_error_correction>": '.true.' if exp.sec else '.false.',
"<post_inflation>": '4' if exp.inflation else '0',
"<n_ens>": str(int(exp.n_ens)),
......@@ -151,12 +44,13 @@ def set_DART_nml(just_prior_values=False):
}
# Note: only one value of vertical localization possible
if hasattr(exp, 'cov_loc_vert_km_horiz_km'):
options["<horiz_dist_only>"] = ".false."
try:
cov_loc_vert_km, cov_loc_horiz_km = exp.cov_loc_vert_km_horiz_km
vert_norm_hgt = to_vertical_normalization(cov_loc_vert_km, cov_loc_horiz_km)
options["<vert_norm_hgt>"] = str(vert_norm_hgt)
else:
options["<horiz_dist_only>"] = ".false."
except Exception as e:
warnings.warn(str(e)+' - not using vertical localization.')
options["<horiz_dist_only>"] = ".true."
options["<vert_norm_hgt>"] = "99999.0" # dummy value
......@@ -168,53 +62,14 @@ def set_DART_nml(just_prior_values=False):
append_file(cluster.dartrundir + "/input.nml", rttov_nml)
def run_Hx(time, obscfg):
"""
# assumes that prior ensemble is already linked to advance_temp<i>/wrfout_d01
Creates
obs_seq.final (file): observations on (non-averaged) grid
"""
# existing obs_seq.out is a precondition for filter
# needs to know where to take observations
osq.create_obsseqin_alltypes(time, [obscfg, ])
run_perfect_model_obs()
print("running H(x) : obs operator on ensemble prior")
# set input.nml to calculate prior Hx
set_DART_nml(just_prior_values=True)
# run filter to calculate prior Hx
shell("mpirun -np 12 ./filter &> log.filter.preassim")
osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final")
if hasattr(exp, "superob_km"):
df = osf.df
print("superobbing to", exp.superob_km, "km")
osf.df = osf.df.superob(window_km=exp.superob_km)
osf.to_dart(cluster.dartrundir + "/obs_seq.final")
return osf
def obs_operator_nature(time):
print("getting true values in obs space from nature run")
prepare_nature_dart(time)
run_perfect_model_obs()
true, _ = read_truth_obs_obsseq(cluster.dartrundir + "/obs_seq.out")
return true
def link_nature_to_dart_truth(time):
# get wrfout_d01 from nature run
shutil.copy(time.strftime(exp.nature_wrfout), cluster.dartrundir + "/wrfout_d01")
# DART may need a wrfinput file as well, which serves as a template for dimension sizes
symlink(cluster.dartrundir + "/wrfout_d01", cluster.dartrundir + "/wrfinput_d01")
print(
"linked",
time.strftime(exp.nature_wrfout),
"to",
cluster.dartrundir + "/wrfout_d01",
"linked", time.strftime(exp.nature_wrfout),
"to", cluster.dartrundir + "/wrfout_d01",
)
......@@ -261,21 +116,8 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
# this seems to be necessary (else wrong level selection)
wrfout_add_geo.run(cluster.dartrundir + "/../geo_em.d01.nc", wrfout_dart)
fpath = cluster.dartrundir + "/input_list.txt"
print("writing", fpath)
try_remove(fpath)
with open(fpath, "w") as f:
for iens in range(1, exp.n_ens + 1):
f.write("./advance_temp" + str(iens) + "/wrfout_d01")
f.write("\n")
fpath = cluster.dartrundir + "/output_list.txt"
print("writing", fpath)
try_remove(fpath)
with open(fpath, "w") as f:
for iens in range(1, exp.n_ens + 1):
f.write("./filter_restart_d01." + str(iens).zfill(4))
f.write("\n")
write_list_of_inputfiles_prior()
write_list_of_outputfiles()
print("removing preassim and filter_restart")
os.system("rm -rf " + cluster.dartrundir + "/preassim_*")
......@@ -286,113 +128,74 @@ def prepare_prior_ensemble(assim_time, prior_init_time, prior_valid_time, prior_
os.system("rm -rf " + cluster.dartrundir + "/obs_seq.fina*")
def calc_obserr_WV73(Hx_nature, Hx_prior):
"""Calculate parametrized error (for assimilation)
Args:
Hx_nature (np.array): dim (observations)
Hx_prior (np.array): dim (ensemble_members, observations)
Returns
np.array Observation error std-deviation with dim (observations)
"""
debug = False
n_obs = len(Hx_nature)
OEs = np.ones(n_obs)
for iobs in range(n_obs):
bt_y = Hx_nature[iobs]
bt_x_ens = Hx_prior[:, iobs]
# compute Cloud impact for every pair (ensemble, truth)
CIs = [cloudimpact_73(bt_x, bt_y) for bt_x in bt_x_ens]
mean_CI = np.mean(CIs)
oe_model = OE_model_harnisch_WV73(mean_CI)
if debug:
print("BT_nature=", bt_y, "=> mean_CI=", mean_CI, "=> OE_assim=", oe_model)
OEs[iobs] = oe_model
return OEs
def run_perfect_model_obs(nproc=12):
print("generating observations - running ./perfect_model_obs")
def write_txt(lines, fpath):
try_remove(fpath)
with open(fpath, "w") as file:
for line in lines:
file.write(line+'\n')
def write_list_of_inputfiles_prior():
files = []
for iens in range(1, exp.n_ens+1):
files.append("./advance_temp" + str(iens) + "/wrfout_d01")
write_txt(files, cluster.dartrundir+'/input_list.txt')
def write_list_of_inputfiles_posterior(assim_time):
filedir = cluster.archivedir+assim_time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
files = []
for iens in range(1, exp.n_ens+1):
files.append(filedir+'filter_restart_d01.'+str(iens).zfill(4))
write_txt(files, cluster.dartrundir+'/input_list.txt')
def write_list_of_outputfiles():
files = []
for iens in range(1, exp.n_ens+1):
files.append("./filter_restart_d01." + str(iens).zfill(4))
write_txt(files, cluster.dartrundir+'/output_list.txt')
def run_perfect_model_obs(nproc=12, verbose=True):
if verbose:
print("generating observations - running ./perfect_model_obs")
os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir + "/obs_seq.out")
if not os.path.exists(cluster.dartrundir + "/obs_seq.in"):
raise RuntimeError("obs_seq.in does not exist in " + cluster.dartrundir)
print(shell("mpirun -np "+str(nproc)+" ./perfect_model_obs > log.perfect_model_obs"))
shell('mpirun -np '+str(nproc)+' '+cluster.container+" ./perfect_model_obs > log.perfect_model_obs")
if not os.path.exists(cluster.dartrundir + "/obs_seq.out"):
raise RuntimeError(
"obs_seq.out does not exist in " + cluster.dartrundir,
"\n look for " + cluster.dartrundir + "/log.perfect_model_obs")
def assimilate(nproc=48):
def filter(nproc=48):
print("time now", dt.datetime.now())
print("running filter")
os.chdir(cluster.dartrundir)
try_remove(cluster.dartrundir + "/obs_seq.final")
t = time_module.time()
#shell('mpirun -np 12 ./filter &> log.filter')
shell("".join([
"mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-",
str(int(nproc) - 1)," -np ",str(int(nproc))," ./filter > log.filter"]))
if nproc < 12:
shell('mpirun -np 12 '+cluster.container+' ./filter &> log.filter')
else: # -genv I_MPI_PIN_PROCESSOR_LIST=0-"+str(int(nproc) - 1)
shell("mpirun -np "+str(int(nproc))+' '+cluster.container+" ./filter > log.filter")
print("./filter took", int(time_module.time() - t), "seconds")
if not os.path.isfile(cluster.dartrundir + "/obs_seq.final"):
raise RuntimeError(
"obs_seq.final does not exist in " + cluster.dartrundir,
"\n look for " + cluster.dartrundir + "/log.filter")
# currently unused
# def recycle_output():
# """Use output of assimilation (./filter) as input for another assimilation (with ./filter)
# Specifically, this copies the state fields from filter_restart_d01.000x to the wrfout files in advance_temp folders"""
# update_vars = ['U', 'V', 'T', 'PH', 'MU', 'QVAPOR', 'QCLOUD', 'QICE', 'QRAIN', 'U10', 'V10', 'T2', 'Q2', 'TSK', 'PSFC', 'CLDFRA']
# updates = ','.join(update_vars)
# print('recycle DART output to be used as input')
# for iens in range(1, exp.n_ens+1):
# dart_output = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4)
# dart_input = cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01'
# #print('check for non-monotonic vertical pressure')
# # convert link to file in order to be able to update the content
# if os.path.islink(dart_input):
# l = os.readlink(dart_input)
# os.remove(dart_input)
# copy(l, dart_input)
# # print('move DART output to input: '+dart_output+' -> '+dart_input)
# # os.rename(dart_output, dart_input) # probably doesnt work
# print('updating', updates, 'in', dart_input, 'from', dart_output)
# os.system(cluster.ncks+' -A -v '+updates+' '+dart_output+' '+dart_input)
############### archiving
def archive_osq_final(time, posterior_1min=False):
"""Save obs_seq.final file for later.
time (dt.datetime) : time of sampling values from files
posterior_1min (bool) : False if usual assimilation
"""
def archive_filteroutput(time):
print("archiving ...")
if posterior_1min:
archive_dir = cluster.archivedir + "/obs_seq_final_1min/"
else:
archive_dir = cluster.archivedir + "/obs_seq_final/"
archive_dir = cluster.archivedir + "/obs_seq_final/"
mkdir(archive_dir)
fout = archive_dir + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.final")
copy(cluster.dartrundir + "/obs_seq.final", fout)
print(fout, "saved.")
# try:
# print('archive regression diagnostics') # what are regression diagnostics?!
# copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics')
# except Exception as e:
# warnings.warn(str(e))
def archive_filteroutput(time):
print("archiving output")
archive_assim = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/assim_stage0/")
mkdir(archive_assim)
copy(cluster.dartrundir + "/input.nml", archive_assim + "/input.nml")
......@@ -416,27 +219,7 @@ def archive_filteroutput(time):
except Exception as e:
warnings.warn(str(e))
def archive_osq_out(time):
dir_obsseq = cluster.archivedir + "/obs_seq_out/"
os.makedirs(dir_obsseq, exist_ok=True)
copy(cluster.dartrundir + "/obs_seq.out",
dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out"))
if os.path.isfile(cluster.dartrundir + "/obs_seq.out-orig"):
try:
copy(cluster.dartrundir + "/obs_seq.out-orig",
dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-orig"))
except Exception as e:
warnings.warn(str(e))
def is_assim_error_parametrized(obscfg):
if (obscfg["error_assimilate"] == False) and (
obscfg.get("sat_channel") == 6):
return True
else:
return False
def get_parametrized_error(obscfg):
def get_parametrized_error(obscfg, osf_prior):
"""Calculate the parametrized error for an ObsConfig (one obs type)
Args
......@@ -446,70 +229,72 @@ def get_parametrized_error(obscfg):
Returns
np.array observation error std-dev for assimilation
"""
# run obs operator (through filter program)
# creates obs_seq.final containing truth & prior Hx
osf = run_Hx(time, obscfg)
df_obs = osf.df
df_obs = osf_prior
Hx_prior = df_obs.get_prior_Hx().T
Hx_truth = df_obs.get_truth_Hx()
# compute the obs error for assimilation on the averaged grid
# since the assimilation is done on the averaged grid
if obscfg.get("sat_channel") == 6:
return calc_obserr_WV73(Hx_truth, Hx_prior)
elif obscfg.get("sat_channel") == 1:
return calc_obserr_VIS06(Hx_truth, Hx_prior)
channel = obscfg.get("sat_channel")
if channel == 5:
return err.calc_obserr_WV('WV62', Hx_truth, Hx_prior)
if channel == 6:
return err.calc_obserr_WV('WV73', Hx_truth, Hx_prior)
else:
NotImplementedError('error_assimilate if False and sat_channel', obscfg.get("sat_channel"))
NotImplementedError('sat_channel not implemented', obscfg.get("sat_channel"))
def set_obserr_assimilate_in_obsseqout(obsseqout, outfile="./obs_seq.out"):
def set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile="./obs_seq.out"):
for obscfg in exp.observations:
kind_str = obscfg['kind']
kind = osq.obs_kind_nrs[kind_str]
# modify each kind separately, one after each other
mask_kind = obsseqout.df.kind == kind
where_oso_iskind = oso.df.kind == kind
where_osf_iskind = osf_prior.df.kind == kind
if obscfg["error_assimilate"] == False:
assim_err = get_parametrized_error(obscfg)
obsseqout.df.loc[mask_kind, 'variance'] = assim_err**2
#assert np.allclose(assim_err, obsseqout.df['variance']**2) # check
assim_err = get_parametrized_error(obscfg, osf_prior.df[where_osf_iskind])
oso.df.loc[where_oso_iskind, 'variance'] = assim_err**2
#assert np.allclose(assim_err, oso.df['variance']**2) # check
else:
# overwrite with user-defined values
obsseqout.df.loc[mask_kind, 'variance'] = obscfg["error_assimilate"]**2
oso.df.loc[where_oso_iskind, 'variance'] = obscfg["error_assimilate"]**2
obsseqout.to_dart(outfile)
oso.to_dart(outfile)
def qc_obs(oso, outfile='./obs_seq.out'):
def qc_obs(time, oso, osf_prior):
# obs should be superobbed already!
for i, obscfg in enumerate(exp.observations):
if i > 0:
raise NotImplementedError()
# run obs operator (through filter program)
# creates obs_seq.final containing truth & prior Hx
osf = run_Hx(time, obscfg)
df_final = osf.df
obs = oso.df.observations.values
Hx_prior_mean = osf_prior.df['prior ensemble mean']
n_obs_orig = len(obs)
if obscfg.get("sat_channel") == 1:
print('removing obs with abs(FGD) smaller than prior ens spread')
Hx_prior = df_final.get_prior_Hx().T
Hx_prior_mean = np.mean(Hx_prior, axis=0)
Hx_prior_spread = df_final['prior ensemble spread'].values
#Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9
if False:
print('removing obs with abs(FGD) < 0.05')
Hx_prior = osf_prior.df.get_prior_Hx().T
# Hx_prior_mean = np.mean(Hx_prior, axis=0)
#Hx_prior_spread = osf_prior.df['prior ensemble spread'].values
#Hx_prior_spread[Hx_prior_spread < 1e-9] = 1e-9
abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread
oso.df = oso.df[abs_FGD > Hx_prior_spread]
abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread
oso.df = oso.df[abs_FGD > 0.05] # Hx_prior_spread]
# obs_dist_to_priormean = abs(obs - Hx_prior_mean)
# oso.df = oso.df[obs_dist_to_priormean > 5]
# print('removed', n_obs_orig-len(oso.df), 'observations with abs(FGD) smaller than 5')
# obs_dist_to_priormean = abs(obs - Hx_prior_mean)
# oso.df = oso.df[obs_dist_to_priormean > 5]
# print('removed', n_obs_orig-len(oso.df), 'observations with abs(FGD) smaller than 5')
else:
# set obs to prior mean => no change in mean but spread is adjusted.
abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread
Hx_prior_median = np.median(osf_prior.df.get_prior_Hx(), axis=1)
oso.df.loc[abs_FGD <= 0.05, 'observations'] = Hx_prior_median[abs_FGD <= 0.05]
elif obscfg.get("sat_channel") == 6: # WV73
......@@ -517,16 +302,127 @@ def qc_obs(oso, outfile='./obs_seq.out'):
obs = oso.df.observations.values
n_obs_orig = len(obs)
Hx_prior = df_final.get_prior_Hx().T
Hx_prior_mean = np.mean(Hx_prior, axis=0)
abs_FGD = abs(obs - Hx_prior_mean) # /Hx_prior_spread
oso.df = oso.df[abs_FGD > 5]
else:
raise NotImplementedError('no obs QC implemented for this obs type')
print('QC removed', n_obs_orig-len(oso.df), 'observations')
oso.to_dart(outfile)
print('saved', outfile)
# for archiving
f_out_archive = cluster.archivedir + "/obs_seq_out/" + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-beforeQC")
os.makedirs(cluster.archivedir + "/obs_seq_out/", exist_ok=True)
copy(cluster.dartrundir + "/obs_seq.out", f_out_archive)
# for assimilation later
f_out_dart = cluster.dartrundir+'/obs_seq.out'
oso.to_dart(f_out_dart)
print('saved', f_out_dart)
def evaluate(assim_time,
output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs"):
"""Depending on input_list.txt, this function calculates either prior or posterior obs space values.
"""
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in
os.chdir(cluster.dartrundir)
# link DART binaries to run_DART
os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py")
# remove any existing observation files
os.system("rm -f input.nml obs_seq.final")
print("prepare nature")
prepare_nature_dart(assim_time) # link WRF files to DART directory
if not os.path.isfile(cluster.dartrundir+'/obs_seq.out'):
raise RuntimeError(cluster.dartrundir+'/obs_seq.out does not exist')
set_DART_nml(just_prior_values=True)
filter(nproc=6)
# archiving
fout = cluster.archivedir + "/obs_seq_final/" + assim_time.strftime(output_format)
os.makedirs(cluster.archivedir + "/obs_seq_final/", exist_ok=True)
copy(cluster.dartrundir + "/obs_seq.final", fout)
print(fout, "saved.")
osf = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.final")
return osf
def generate_obsseq_out(time):
def ensure_physical_vis(oso): # set reflectance < surface albedo to surface albedo
print(" 2.2) removing obs below surface albedo ")
if_vis_obs = oso.df['kind'].values == 262
if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928
oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
return oso
def apply_superobbing(oso):
try:
f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out-before_superob")
shutil.copy(cluster.dartrundir + "/obs_seq.out-before_superob", f_oso)
print('saved', f_oso)
except Exception as e:
warnings.warn(str(e))
print(" 2.3) superobbing to", exp.superob_km, "km")
oso.df = oso.df.superob(window_km=exp.superob_km)
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
##############################
dir_obsseq=cluster.archivedir + "/obs_seq_out/"
os.makedirs(dir_obsseq, exist_ok=True)
osq.create_obsseqin_alltypes(time, exp.observations)
run_perfect_model_obs() # generate observation, draws from gaussian
print(" 2.1) obs preprocessing")
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
oso = ensure_physical_vis(oso)
if getattr(exp, "superob_km", False):
oso = apply_superobbing(oso)
# archive complete obsseqout
f_oso = dir_obsseq + time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")
shutil.copy(cluster.dartrundir + "/obs_seq.out", f_oso)
print('saved', f_oso)
return oso
def get_obsseq_out(time):
# did we specify an obsseqout inputfile?
if exp.use_existing_obsseq != False:
f_obsseq = time.strftime(exp.use_existing_obsseq)
copy(f_obsseq, cluster.dartrundir+'/obs_seq.out')
print(f_obsseq, 'copied to', cluster.dartrundir+'/obs_seq.out')
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
else:
# did we already generate observations?
f_oso_thisexp = cluster.archivedir+'/obs_seq_out/'+time.strftime("/%Y-%m-%d_%H:%M_obs_seq.out")
if os.path.isfile(f_oso_thisexp):
# oso exists
copy(f_oso_thisexp, cluster.dartrundir+'/obs_seq.out')
print('copied existing obsseqout from', f_oso_thisexp)
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
else:
# we need to generate observations
oso = generate_obsseq_out(time)
return oso
if __name__ == "__main__":
......@@ -539,10 +435,12 @@ if __name__ == "__main__":
2) create obs_seq.in
3) create obs from nature (obs_seq.out) with user-defined obs-errors
4) Assimilate with assigned errors
Note:
assim_time (dt.datetime): time of output
prior_valid_time (dt.datetime): valid time of prior (may be different to assim_time)
Args:
assim_time: time of output
prior_init_time: forecast start of prior
prior_valid_time: valid time of prior (may be different to assim_time)
path to prior experiment
Example call:
python assim.py 2008-08-07_12:00 2008-08-06:00 2008-08-07_13:00 /home/fs71386/lkugler/data/sim_archive/exp_v1.18_Pwbub-1-ensprof_40mem
......@@ -552,6 +450,10 @@ if __name__ == "__main__":
prior_init_time = dt.datetime.strptime(sys.argv[2], "%Y-%m-%d_%H:%M")
prior_valid_time = dt.datetime.strptime(sys.argv[3], "%Y-%m-%d_%H:%M")
prior_path_exp = str(sys.argv[4])
options = []
if len(sys.argv) >4:
options = sys.argv[5:]
nproc = 6 if 'headnode' in options else 48
archive_time = cluster.archivedir + time.strftime("/%Y-%m-%d_%H:%M/")
os.makedirs(cluster.dartrundir, exist_ok=True) # create directory to run DART in
......@@ -571,44 +473,29 @@ if __name__ == "__main__":
prepare_prior_ensemble(time, prior_init_time, prior_valid_time, prior_path_exp)
################################################
print(" 1) create observations with specified obs-error")
osq.create_obsseqin_alltypes(time, exp.observations)
set_DART_nml()
run_perfect_model_obs() # create observations
print(" 2.1) obs preprocessing")
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
if True: # set reflectance < surface albedo to surface albedo
print(" 2.2) removing obs below surface albedo ")
if_vis_obs = oso.df['kind'].values == 262
if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928
oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
if getattr(exp, "superob_km", False):
print(" 2.3) superobbing to", exp.superob_km, "km")
oso.df = oso.df.superob(window_km=exp.superob_km)
copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig")
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
print(" 1) get observations with specified obs-error")
oso = get_obsseq_out(time)
################################################
print(" 2.4) assign observation-errors for assimilation ")
set_obserr_assimilate_in_obsseqout(oso, outfile=cluster.dartrundir + "/obs_seq.out")
print('3.1) evaluate prior') # evaluate prior observations for all locations
osf_prior = evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_prior_allobs")
print(" 3.2) assign observation-errors for assimilation ")
set_obserr_assimilate_in_obsseqout(oso, osf_prior, outfile=cluster.dartrundir + "/obs_seq.out")
if getattr(exp, "reject_smallFGD", False):
print(" 2.5) QC of observations ")
qc_obs(oso, outfile=cluster.dartrundir + "/obs_seq.out")
print(" 3.3) reject observations? ")
qc_obs(time, oso, osf_prior)
print(" 3) assimilate ")
archive_osq_out(time)
print(" 3.4) assimilate (run filter) ")
set_DART_nml()
assimilate()
filter(nproc=nproc)
archive_filteroutput(time)
archive_osq_final(time)
# evaluate posterior observations for all locations
write_list_of_inputfiles_posterior(time)
if getattr(exp, "reject_smallFGD", False):
copy(cluster.archivedir+'/obs_seq_out/'+time.strftime('%Y-%m-%d_%H:%M_obs_seq.out-beforeQC'),
cluster.dartrundir+'/obs_seq.out')
evaluate(time, output_format="%Y-%m-%d_%H:%M_obs_seq.final-eval_posterior_allobs")
from multiprocessing.sharedctypes import Value
import os, sys, shutil, warnings
import time as time_module
import datetime as dt
import numpy as np
from config.cfg import exp, cluster
from dartwrf.utils import copy, print
import dartwrf.create_obsseq as osq
from dartwrf import obsseq
from dartwrf import assim_synth_obs as aso
tformat = '%Y-%m-%d_%H:%M'
if __name__ == "__main__":
"""Create observations for multiple times upfront
"""
args = sys.argv[1:]
times = []
for tstr in args:
times.append(dt.datetime.strptime(tstr, tformat))
dir_for_obsseqout = exp.nature_wrfout + '/../../../obs_seq_out/'+exp.use_existing_obsseq
os.makedirs(dir_for_obsseqout, exist_ok=True) # create directory to run DART in
print('will save obsseq to', dir_for_obsseqout)
os.chdir(cluster.dartrundir)
# link DART binaries to run_DART
os.system(cluster.python + " " + cluster.scripts_rundir + "/link_dart_rttov.py")
# remove any existing observation files
os.system("rm -f input.nml obs_seq.in obs_seq.out obs_seq.out-orig obs_seq.final")
aso.set_DART_nml()
for time in times:
print('obsseq for time', time)
aso.prepare_nature_dart(time) # link WRF files to DART directory
osq.create_obsseqin_alltypes(time, exp.observations) # obs_seq.in
aso.run_perfect_model_obs(nproc=6) # create observations (obs_seq.out)
oso = obsseq.ObsSeq(cluster.dartrundir + "/obs_seq.out")
if True: # set reflectance < surface albedo to surface albedo
print(" 2.2) removing obs below surface albedo ")
if_vis_obs = oso.df['kind'].values == 262
if_obs_below_surface_albedo = oso.df['observations'].values < 0.2928
oso.df.loc[if_vis_obs & if_obs_below_surface_albedo, ('observations')] = 0.2928
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
if getattr(exp, "superob_km", False):
print(" 2.3) superobbing to", exp.superob_km, "km")
oso.df = oso.df.superob(window_km=exp.superob_km)
copy(cluster.dartrundir + "/obs_seq.out", cluster.dartrundir + "/obs_seq.out-orig")
oso.to_dart(f=cluster.dartrundir + "/obs_seq.out")
aso.archive_osq_out(time, dir_obsseq=dir_for_obsseqout)
......@@ -454,10 +454,15 @@ if __name__ == '__main__':
error_generate=0.03, error_assimilate=0.06,
cov_loc_radius_km=32)
wv62 = dict(plotname='Brightness temperature WV 6.2µm', plotunits='[K]',
kind='MSG_4_SEVIRI_TB', sat_channel=5, n_obs=n_obs,
error_generate=1., error_assimilate=False,
cov_loc_radius_km=20)
wv73 = dict(plotname='Brightness temperature WV 7.3µm', plotunits='[K]',
kind='MSG_4_SEVIRI_TB', sat_channel=6, n_obs=n_obs,
error_generate=1., error_assimilate=False,
cov_loc_radius_km=32)
cov_loc_radius_km=20)
ir108 = dict(plotname='Brightness temperature IR 10.8µm', plotunits='[K]',
kind='MSG_4_SEVIRI_TB', sat_channel=9, n_obs=n_obs,
......@@ -482,7 +487,7 @@ if __name__ == '__main__':
#create_obsseq_in(time_dt, radar, archive_obs_coords=False) #'./coords_stage1.pkl')
create_obsseqin_alltypes(time_dt, [vis], archive_obs_coords=False) #'./obs_coords.pkl')
create_obsseqin_alltypes(time_dt, [wv62], archive_obs_coords=False) #'./obs_coords.pkl')
if False:
error_assimilate = 5.*np.ones(n_obs*len(radar['heights']))
......
import numpy as np
from scipy.interpolate import interp1d
def calc_obserr_WV(channel, Hx_nature, Hx_prior):
"""Calculate parametrized error (for assimilation)
Args:
channel (str): satellite channel
Hx_nature (np.array): dim (observations)
Hx_prior (np.array): dim (ensemble_members, observations)
Returns
np.array Observation error std-deviation with dim (observations)
"""
assert channel in ['WV62', 'WV73']
debug = False
n_obs = len(Hx_nature)
OEs = np.ones(n_obs)
for iobs in range(n_obs):
bt_y = Hx_nature[iobs]
bt_x_ens = Hx_prior[:, iobs]
# compute Cloud impact for every pair (ensemble, truth)
CIs = [cloudimpact(channel, bt_x, bt_y) for bt_x in bt_x_ens]
mean_CI = np.mean(CIs)
if channel == 'WV62':
oe_model = OE_model_harnisch_WV62(mean_CI)
elif channel == 'WV73':
oe_model = OE_model_harnisch_WV73(mean_CI)
if debug:
print("BT_nature=", bt_y, "=> mean_CI=", mean_CI, "=> OE_assim=", oe_model)
OEs[iobs] = oe_model
return OEs
def cloudimpact(channel, bt_mod, bt_obs):
"""
follows Harnisch 2016, Figure 3
"""
if channel == 'WV73':
biascor_obs = 0
bt_lim = 255 # Kelvin for 7.3 micron WV channel
elif channel == 'WV62':
biascor_obs = 0
bt_lim = 232.5 # Kelvin for 6.2 micron WV channel
ci_obs = max(0, bt_lim - (bt_obs - biascor_obs))
ci_mod = max(0, bt_lim - bt_mod)
ci = (ci_obs + ci_mod) / 2
return ci
def OE_model_harnisch_WV62(ci):
if ci >= 0 and ci < 7.5:
# Kelvin, fit of Fig 7a, Harnisch 2016
x_ci = [0, 2.5, 4.5, 5.5, 7.5] # average cloud impact [K]
y_oe = [1.2, 3, 5, 6, 6.5] # adjusted observation error [K]
oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True)
return oe_73_linear(ci)
else: # assign highest observation error
return 6.5
def OE_model_harnisch_WV73(ci):
if ci >= 0 and ci < 16:
# Kelvin, fit of Fig 7b, Harnisch 2016
x_ci = [0, 5, 10.5, 13, 16] # average cloud impact [K]
y_oe = [1, 4.5, 10, 12, 13] # adjusted observation error [K]
oe_73_linear = interp1d(x_ci, y_oe, assume_sorted=True)
return oe_73_linear(ci)
else: # assign highest observation error
return 13.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment