diff --git a/scripts/assim_synth_obs.py b/scripts/assim_synth_obs.py index ba6640dcbb981fadcfb9db3a1a190e787e83b54f..f80ddccc17c747f4ab3f53149498a0a513bc4921 100755 --- a/scripts/assim_synth_obs.py +++ b/scripts/assim_synth_obs.py @@ -268,19 +268,6 @@ def assimilate(nproc=96): os.system('mpirun -genv I_MPI_PIN_PROCESSOR_LIST=0-'+str(int(nproc)-1)+' -np '+str(int(nproc))+' ./filter > log.filter') print('./filter took', int(time_module.time()-t), 'seconds') -def archive_diagnostics(archive_dir, time): - print('archive obs space diagnostics') - mkdir(archive_dir) - fout = archive_dir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final') - copy(cluster.dartrundir+'/obs_seq.final', fout) - print(fout, 'saved.') - - # try: # what are regression diagnostics?! - # print('archive regression diagnostics') - # copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics') - # except Exception as e: - # warnings.warn(str(e)) - def recycle_output(): """Use output of assimilation (./filter) as input for another assimilation (with ./filter) Specifically, this copies the state fields from filter_restart_d01.000x to the wrfout files in advance_temp folders""" @@ -306,22 +293,40 @@ def recycle_output(): print('updating', updates, 'in', dart_input, 'from', dart_output) os.system(cluster.ncks+' -A -v '+updates+' '+dart_output+' '+dart_input) -def archive_output(archive_stage): +############### archiving + +def archive_assimilation(time) + + print('archive obs space diagnostics') + archive_dir = cluster.archivedir+'/obs_seq_final/' + mkdir(archive_dir) + fout = archive_dir+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.final') + copy(cluster.dartrundir+'/obs_seq.final', fout) + print(fout, 'saved.') + + # try: # what are regression diagnostics?! + # print('archive regression diagnostics') + # copy(cluster.dartrundir+'/reg_diagnostics', archive_dir+'/reg_diagnostics') + # except Exception as e: + # warnings.warn(str(e)) + print('archiving output') - mkdir(archive_stage) - copy(cluster.dartrundir+'/input.nml', archive_stage+'/input.nml') + archive_assim = cluster.archivedir + '/assim_stage0/' + mkdir(archive_assim) + copy(cluster.dartrundir+'/input.nml', archive_assim+'/input.nml') - # single members - for iens in range(1, exp.n_ens+1): - #savedir = archive_stage+'/'+str(iens) - #mkdir(savedir) - # filter_in = cluster.dartrundir+'/preassim_member_'+str(iens).zfill(4)+'.nc' + for iens in range(1, exp.n_ens+1): # single members filter_out = cluster.dartrundir+'/filter_restart_d01.'+str(iens).zfill(4) - copy(filter_out, archive_stage+'/filter_restart_d01.'+str(iens).zfill(4)) + copy(filter_out, archive_assim+'/filter_restart_d01.'+str(iens).zfill(4)) + + for f in ['output_mean.nc', 'output_sd.nc']: # copy mean and sd to archive + copy(cluster.dartrundir+'/'+f, archive_assim+'/'+f) + +def archive_obs_generation(time): - # copy mean and sd to archive - for f in ['output_mean.nc', 'output_sd.nc']: - copy(cluster.dartrundir+'/'+f, archive_stage+'/'+f) + dir_obsseq = cluster.archivedir+'/obs_seq_out/' + os.makedirs(dir_obsseq, exist_ok=True) + copy(cluster.dartrundir+'/obs_seq.out', dir_obsseq+time.strftime('/%Y-%m-%d_%H:%M_obs_seq.out')) if __name__ == "__main__": @@ -361,9 +366,6 @@ if __name__ == "__main__": os.system('rm -f obs_seq.in obs_seq.out obs_seq.final') # remove any existing observation files print('create obs_seq.in from config') - istage = 0 - archive_stage = archive_time + '/assim_stage'+str(istage)+'/' - prepare_nature_dart(time) # link WRF files to DART directory ################################################ @@ -404,73 +406,21 @@ if __name__ == "__main__": for i, obscfg in enumerate(exp.observations): error_generate.extend(np.zeros(n_obs_3d) + obscfg['error_generate']) - osq.create_obsseqin_alltypes(time, exp.observations, obs_errors=error_generate, - archive_obs_coords=archive_stage+'/obs_coords.pkl') + osq.create_obsseqin_alltypes(time, exp.observations, obs_errors=error_generate) - first_obstype = exp.observations[0] + first_obstype = exp.observations[0] # TODO: different for each observation type set_DART_nml(cov_loc_radius_km=first_obstype['cov_loc_radius_km'], cov_loc_vert_km=first_obstype.get('cov_loc_vert_km', False)) run_perfect_model_obs() # actually create observations that are used to assimilate + archive_obs_generation(time) ################################################ print(' 3) assimilate with observation-errors for assimilation') - - replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) t = time_module.time() assimilate() print('filter took', time_module.time()-t, 'seconds') - dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage) - archive_diagnostics(dir_obsseq, time) - archive_output(archive_stage) - - - #sys.exit() # below is the code for separate assimilation of different obs types - - #for istage, obscfg in enumerate(exp.observations): - # print('running observation stage', istage, obscfg) - - # archive_stage = archive_time + '/assim_stage'+str(istage)+'/' - # n_obs = obscfg['n_obs'] - # n_obs_3d = n_obs * len(obscfg['heights']) - # sat_channel = obscfg.get('sat_channel', False) - - # # debug option: overwrite time in prior files - # # for iens in range(1,41): - # # os.system('ncks -A -v Times '+cluster.dartrundir+'/wrfout_d01 '+cluster.dartrundir+'/advance_temp'+str(iens)+'/wrfout_d01') - - # if error_assimilate == False: - # # use error parametrization for assimilation-obs.errors - - # if sat_channel != 6: - # raise NotImplementedError('sat channel '+str(sat_channel)) - - # # depends on obs_seq.out produced before by run_perfect_model_obs() - # Hx_nat, _ = read_truth_obs_obsseq(cluster.dartrundir+'/obs_seq.out') - - # Hx_prior = obs_operator_ensemble(istage) # files are already linked to DART directory - # error_assimilate = calc_obserr_WV73(Hx_nat, Hx_prior) - - # else: - # # overwrite error values in obs_seq.out - # error_assimilate = np.zeros(n_obs_3d) + error_assimilate # ensure shape - - # replace_errors_obsseqout(cluster.dartrundir+'/obs_seq.out', error_assimilate) - # assimilate() - # dir_obsseq = cluster.archivedir+'/obs_seq_final/assim_stage'+str(istage) - # archive_diagnostics(dir_obsseq, time) - - # if istage < n_stages-1: - # # recirculation: filter output -> input - # archive_output(archive_stage) - # recycle_output() - - # elif istage == n_stages-1: - # # last assimilation, continue integration now - # archive_output(archive_stage) - - # else: - # RuntimeError('this should never occur?!') + archive_assimilation(time)