# -*- coding: utf-8 -*-
#
# lsm_rapid_process.py
# RAPIDpy
#
# Created by Alan D. Snow.
# Copyright © 2015-2016 Alan D Snow. All rights reserved.
# License: BSD 3-Clause
from datetime import datetime, timedelta
import multiprocessing
import os
import re
import traceback
# external packages
import pandas as pd
import pangaea
from netCDF4 import Dataset
import numpy as np
# local imports
from ..rapid import RAPID
from .CreateInflowFileFromERAInterimRunoff import CreateInflowFileFromERAInterimRunoff
from .CreateInflowFileFromLDASRunoff import CreateInflowFileFromLDASRunoff
from .CreateInflowFileFromWRFHydroRunoff import CreateInflowFileFromWRFHydroRunoff
from ..postprocess.generate_return_periods import generate_return_periods
from ..postprocess.generate_seasonal_averages import generate_seasonal_averages
from ..utilities import (case_insensitive_file_search,
get_valid_directory_list,
partition)
# ------------------------------------------------------------------------------
# MULTIPROCESSING FUNCTION
# ------------------------------------------------------------------------------
def generate_inflows_from_runoff(args):
"""
prepare runoff inflow file for rapid
"""
runoff_file_list = args[0]
file_index_list = args[1]
weight_table_file = args[2]
grid_type = args[3]
rapid_inflow_file = args[4]
RAPID_Inflow_Tool = args[5]
mp_lock = args[6]
time_start_all = datetime.utcnow()
if not isinstance(runoff_file_list, list):
runoff_file_list = [runoff_file_list]
else:
runoff_file_list = runoff_file_list
if not isinstance(file_index_list, list):
file_index_list = [file_index_list]
else:
file_index_list = file_index_list
if runoff_file_list and file_index_list:
# prepare ECMWF file for RAPID
index_string = "Index: {0}".format(file_index_list[0])
if len(file_index_list) > 1:
index_string += " to {0}".format(file_index_list[-1])
print(index_string)
runoff_string = "File(s): {0}".format(runoff_file_list[0])
if len(runoff_file_list) > 1:
runoff_string += " to {0}".format(runoff_file_list[-1])
print(runoff_string)
print("Converting inflow ...")
try:
RAPID_Inflow_Tool.execute(nc_file_list=runoff_file_list,
index_list=file_index_list,
in_weight_table=weight_table_file,
out_nc=rapid_inflow_file,
grid_type=grid_type,
mp_lock=mp_lock,
)
except Exception:
# This prints the type, value, and stack trace of the
# current exception being handled.
traceback.print_exc()
raise
time_finish_ecmwf = datetime.utcnow()
print("Time to convert inflows: {0}".format(time_finish_ecmwf-time_start_all))
# ------------------------------------------------------------------------------
# UTILITY FUNCTIONS
# ------------------------------------------------------------------------------
DEFAULT_LSM_INPUTS = {
't255': {
'file_datetime_re_pattern': r'\d{8}',
'file_datetime_pattern': "%Y%m%d",
},
't511': {
'file_datetime_re_pattern': r'\d{8}',
'file_datetime_pattern': "%Y%m%d",
},
't159': {
'file_datetime_re_pattern': r'\d{8}',
'file_datetime_pattern': "%Y%m%d",
},
'gldas2': {
'file_datetime_re_pattern': r'\d{8}\.\d{2}',
'file_datetime_pattern': "%Y%m%d.%H",
},
'gldas': {
'file_datetime_re_pattern': r'\d{8}\.\d{2}',
'file_datetime_pattern': "%Y%m%d.%H",
},
'nldas': {
'file_datetime_re_pattern': r'\d{8}\.\d{2}',
'file_datetime_pattern': "%Y%m%d.%H",
},
'cmip5': {
'file_datetime_re_pattern': r'\d{4}',
'file_datetime_pattern': "%Y",
},
'lis': {
'file_datetime_re_pattern': r'\d{10}',
'file_datetime_pattern': "%Y%m%d%H",
},
'joules': {
'file_datetime_re_pattern': r'\d{10}',
'file_datetime_pattern': "%Y%m%d%H",
},
'wrf': {
'file_datetime_re_pattern': r'\d{10}',
'file_datetime_pattern': "%Y%m%d%H",
},
}
def identify_lsm_grid(lsm_grid_path):
"""
This is used to idenfity the input LSM grid
"""
# check to see what kind of file we are dealing with
lsm_example_file = Dataset(lsm_grid_path)
# INDENTIFY LAT/LON DIMENSIONS
dim_list = lsm_example_file.dimensions.keys()
latitude_dim = "lat"
if 'latitude' in dim_list:
latitude_dim = 'latitude'
elif 'g0_lat_0' in dim_list:
# GLDAS/NLDAS MOSAIC
latitude_dim = 'g0_lat_0'
elif 'lat_110' in dim_list:
# NLDAS NOAH/VIC
latitude_dim = 'lat_110'
elif 'north_south' in dim_list:
# LIS/Joules
latitude_dim = 'north_south'
elif 'south_north' in dim_list:
# WRF Hydro
latitude_dim = 'south_north'
elif 'Y' in dim_list:
# FLDAS
latitude_dim = 'Y'
longitude_dim = "lon"
if 'longitude' in dim_list:
longitude_dim = 'longitude'
elif 'g0_lon_1' in dim_list:
# GLDAS/NLDAS MOSAIC
longitude_dim = 'g0_lon_1'
elif 'lon_110' in dim_list:
# NLDAS NOAH/VIC
longitude_dim = 'lon_110'
elif 'east_west' in dim_list:
# LIS/Joules
longitude_dim = 'east_west'
elif 'west_east' in dim_list:
# WRF Hydro
longitude_dim = 'west_east'
elif 'X' in dim_list:
# FLDAS
longitude_dim = 'X'
time_dim = None
if 'time' in dim_list:
time_dim = 'time'
elif 'Time' in dim_list:
time_dim = 'Time'
elif 'Times' in dim_list:
time_dim = 'Times'
elif 'times' in dim_list:
time_dim = 'times'
lat_dim_size = len(lsm_example_file.dimensions[latitude_dim])
lon_dim_size = len(lsm_example_file.dimensions[longitude_dim])
# IDENTIFY VARIABLES
var_list = lsm_example_file.variables.keys()
latitude_var = "lat"
if 'latitude' in var_list:
latitude_var = 'latitude'
elif 'g0_lat_0' in var_list:
latitude_var = 'g0_lat_0'
elif 'lat_110' in var_list:
latitude_var = 'lat_110'
elif 'north_south' in var_list:
latitude_var = 'north_south'
elif 'XLAT' in var_list:
# WRF
latitude_var = 'XLAT'
elif 'Y' in var_list:
# FLDAS
latitude_var = 'Y'
longitude_var = "lon"
if 'longitude' in var_list:
longitude_var = 'longitude'
elif 'g0_lon_1' in var_list:
longitude_var = 'g0_lon_1'
elif 'lon_110' in var_list:
longitude_var = 'lon_110'
elif 'east_west' in var_list:
longitude_var = 'east_west'
elif 'XLONG' in var_list:
# WRF
longitude_var = 'XLONG'
elif 'X' in var_list:
# FLDAS
longitude_var = 'X'
time_var = None
if 'time' in var_list:
time_var = 'time'
elif 'Time' in var_list:
time_var = 'Time'
elif 'Times' in var_list:
time_var = 'Times'
elif 'times' in var_list:
time_var = 'times'
surface_runoff_var = ""
subsurface_runoff_var = ""
total_runoff_var = ""
for var in var_list:
if var.startswith("SSRUN"):
# NLDAS/GLDAS
surface_runoff_var = var
elif var.startswith("BGRUN"):
# NLDAS/GLDAS
subsurface_runoff_var = var
elif var == "Qs_acc":
# GLDAS v2
surface_runoff_var = var
elif var == "Qsb_acc":
# GLDAS v2
subsurface_runoff_var = var
elif var == "Qs_tavg":
# FLDAS
surface_runoff_var = var
elif var == "Qsb_tavg":
# FLDAS
subsurface_runoff_var = var
elif var == "Qs_inst":
# LIS
surface_runoff_var = var
elif var == "Qsb_inst":
# LIS
subsurface_runoff_var = var
elif var == "SFROFF":
# WRF Hydro
surface_runoff_var = var
elif var == "UDROFF":
# WRF Hydro
subsurface_runoff_var = var
elif var.lower() == "ro":
# ERA Interim
total_runoff_var = var
elif var == "total runoff":
# CMIP5 data
total_runoff_var = var
# IDENTIFY GRID TYPE
lsm_file_data = {
"weight_file_name": "",
"grid_type": "",
"model_name": "",
"description": "",
"rapid_inflow_tool": None,
"latitude_var": latitude_var,
"longitude_var": longitude_var,
"time_var": time_var,
"latitude_dim": latitude_dim,
"longitude_dim": longitude_dim,
"time_dim": time_dim,
}
institution = ""
title = ""
try:
institution = lsm_example_file.getncattr("institution")
except AttributeError:
pass
try:
title = lsm_example_file.getncattr("title")
except AttributeError:
pass
runoff_vars = [surface_runoff_var, subsurface_runoff_var]
if institution == "European Centre for Medium-Range Weather Forecasts" \
or total_runoff_var.lower() == "ro":
# these are the ECMWF models
if lat_dim_size == 361 and lon_dim_size == 720:
print("Runoff file identified as ERA Interim Low Res (T255) GRID")
# A) ERA Interim Low Res (T255)
# Downloaded as 0.5 degree grid
# dimensions:
# longitude = 720 ;
# latitude = 361 ;
lsm_file_data["description"] = "ERA Interim (T255 Grid)"
lsm_file_data["model_name"] = "erai"
lsm_file_data["weight_file_name"] = r'weight_era_t255\.csv'
lsm_file_data["grid_type"] = 't255'
elif lat_dim_size == 512 and lon_dim_size == 1024:
print("Runoff file identified as ERA Interim High Res (T511) GRID")
# B) ERA Interim High Res (T511)
# dimensions:
# lon = 1024 ;
# lat = 512 ;
lsm_file_data["description"] = "ERA Interim (T511 Grid)"
lsm_file_data["weight_file_name"]= r'weight_era_t511\.csv'
lsm_file_data["model_name"] = "erai"
lsm_file_data["grid_type"] = 't511'
elif lat_dim_size == 161 and lon_dim_size == 320:
print("Runoff file identified as ERA 20CM (T159) GRID")
# C) ERA 20CM (T159) - 3hr - 10 ensembles
# Downloaded as 1.125 degree grid
# dimensions:
# longitude = 320 ;
# latitude = 161 ;
lsm_file_data["description"] = "ERA 20CM (T159 Grid)"
lsm_file_data["weight_file_name"] = r'weight_era_t159\.csv'
lsm_file_data["model_name"] = "era_20cm"
lsm_file_data["grid_type"] = 't159'
else:
lsm_example_file.close()
raise Exception("Unsupported ECMWF grid.")
lsm_file_data["rapid_inflow_tool"] = \
CreateInflowFileFromERAInterimRunoff()
elif institution == "NASA GSFC":
if title == "GLDAS2.0 LIS land surface model output":
print("Runoff file identified as GLDAS v2 LIS GRID")
# this is the LIS model
lsm_file_data["weight_file_name"] = r'weight_gldas2\.csv'
lsm_file_data["grid_type"] = 'gldas2'
lsm_file_data["description"] = "GLDAS2.0 LIS"
lsm_file_data["model_name"] = "nasa"
else:
print("Runoff file identified as LIS GRID")
# this is the LIS model (can be FLDAS)
# THIS CASE CAN ALSO BE FOR FLDAS, however you will need to add
# the file_datetime_pattern && file_datetime_re_pattern for it to
# work if it is not 3-hourly time step.
lsm_file_data["weight_file_name"] = r'weight_lis\.csv'
lsm_file_data["grid_type"] = 'lis'
lsm_file_data["description"] = "NASA GSFC LIS"
lsm_file_data["model_name"] = "nasa"
elif institution == "Met Office, UK":
print("Runoff file identified as Joules GRID")
lsm_file_data["weight_file_name"] = r'weight_joules\.csv'
lsm_file_data["grid_type"] = 'joules'
lsm_file_data["description"] = "Met Office Joules"
lsm_file_data["model_name"] = "met_office"
elif institution == "NCAR, USACE, USBR":
print("Runoff file identified as CMIP5")
lsm_file_data["weight_file_name"] = r'weight_cmip5\.csv'
lsm_file_data["grid_type"] = 'cmip5'
lsm_file_data["description"] = "CMIP5 Runoff"
lsm_file_data["model_name"] = "cmip5"
runoff_vars = [total_runoff_var]
elif surface_runoff_var.startswith("SSRUN") \
and subsurface_runoff_var.startswith("BGRUN"):
lsm_file_data["model_name"] = "nasa"
if lat_dim_size == 600 and lon_dim_size == 1440:
print("Runoff file identified as GLDAS GRID")
# GLDAS NC FILE
# dimensions:
# g0_lat_0 = 600 ;
# g0_lon_1 = 1440 ;
# variables
# SSRUN_GDS0_SFC_ave1h (surface), BGRUN_GDS0_SFC_ave1h (subsurface)
# or
# SSRUNsfc_GDS0_SFC_ave1h (surface), BGRUNsfc_GDS0_SFC_ave1h (subsurface)
lsm_file_data["description"] = "GLDAS"
lsm_file_data["weight_file_name"] = r'weight_gldas\.csv'
lsm_file_data["grid_type"] = 'gldas'
elif lat_dim_size <= 224 and lon_dim_size <= 464:
print("Runoff file identified as NLDAS GRID")
# NLDAS MOSAIC FILE
# dimensions:
# g0_lat_0 = 224 ;
# g0_lon_1 = 464 ;
# NLDAS NOAH/VIC FILE
# dimensions:
# lat_110 = 224 ;
# lon_110 = 464 ;
lsm_file_data["description"] = "NLDAS"
lsm_file_data["weight_file_name"] = r'weight_nldas\.csv'
lsm_file_data["grid_type"] = 'nldas'
else:
lsm_example_file.close()
raise Exception("Unsupported runoff grid.")
else:
title = ""
try:
title = lsm_example_file.getncattr("TITLE")
except AttributeError:
pass
if "WRF" in title:
lsm_file_data["description"] = "WRF/WRF-Hydro Runoff"
lsm_file_data["weight_file_name"] = r'weight_wrf\.csv'
lsm_file_data["model_name"] = 'wrf'
lsm_file_data["grid_type"] = 'wrf'
lsm_file_data['rapid_inflow_tool'] = \
CreateInflowFileFromWRFHydroRunoff(
latitude_dim,
longitude_dim,
latitude_var,
longitude_var,
surface_runoff_var,
subsurface_runoff_var,
)
else:
lsm_example_file.close()
raise Exception("Unsupported LSM grid.")
lsm_example_file.close()
# set the inflow tool to use the LDAS tool by default
if lsm_file_data["rapid_inflow_tool"] is None:
lsm_file_data["rapid_inflow_tool"] = \
CreateInflowFileFromLDASRunoff(
latitude_dim,
longitude_dim,
latitude_var,
longitude_var,
runoff_vars,
)
return lsm_file_data
def determine_start_end_timestep(lsm_file_list, file_re_match=None, file_datetime_pattern=None,
expected_time_step=None, lsm_grid_info=None):
"""
Determine the start and end date from LSM input files
"""
if lsm_grid_info is None:
lsm_grid_info = identify_lsm_grid(lsm_file_list[0])
if None in (lsm_grid_info['time_var'], lsm_grid_info['time_dim'])\
or lsm_grid_info['model_name'] in ('era_20cm', 'erai'):
# NOTE: the ERA20CM and ERA 24hr time variables in the tests are erroneous
if None in (file_re_match, file_datetime_pattern):
raise ValueError("LSM files missing time dimension and/or variable."
"To mitigate this, add the 'file_re_match' and "
"'file_datetime_pattern' arguments.")
if lsm_grid_info['time_dim'] is None:
print("Assuming time dimension is 1")
file_size_time = 1
else:
lsm_example_file = Dataset(lsm_file_list[0])
file_size_time = len(lsm_example_file.dimensions[lsm_grid_info['time_dim']])
lsm_example_file.close()
total_num_time_steps = int(file_size_time * len(lsm_file_list))
# determine the start time from the existing files
actual_simulation_start_datetime = datetime.strptime(file_re_match.search(lsm_file_list[0]).group(0),
file_datetime_pattern)
# check to see if the time step matches expected
if len(lsm_file_list) > 1:
time_step = int((datetime.strptime(file_re_match.search(lsm_file_list[1]).group(0), file_datetime_pattern)
- actual_simulation_start_datetime).total_seconds()
/ float(file_size_time))
elif expected_time_step is not None:
time_step = int(expected_time_step)
else:
raise ValueError("Only one LSM file with one timestep present. "
"'expected_time_step' parameter required to continue.")
# determine the end datetime
actual_simulation_end_datetime = \
datetime.strptime(file_re_match.search(lsm_file_list[-1]).group(0),
file_datetime_pattern) \
+ timedelta(seconds=(file_size_time-1) * time_step)
else:
with pangaea.open_mfdataset(lsm_file_list,
lat_var=lsm_grid_info['latitude_var'],
lon_var=lsm_grid_info['longitude_var'],
time_var=lsm_grid_info['time_var'],
lat_dim=lsm_grid_info['latitude_dim'],
lon_dim=lsm_grid_info['longitude_dim'],
time_dim=lsm_grid_info['time_dim']) as xds:
datetime_arr = [pd.to_datetime(dval) for dval in xds.lsm.datetime.values]
actual_simulation_start_datetime = datetime_arr[0]
actual_simulation_end_datetime = datetime_arr[-1]
total_num_time_steps = len(datetime_arr)
if total_num_time_steps <= 1:
if expected_time_step is not None:
time_step = int(expected_time_step)
else:
raise ValueError("Only one LSM file with one timestep present. "
"'expected_time_step' parameter required to continue.")
else:
time_step = int(np.diff(xds.lsm.datetime.values)[0] / np.timedelta64(1, 's'))
if expected_time_step is not None:
if time_step != int(expected_time_step):
print("WARNING: The time step used {0} is different than expected {1}".format(time_step,
expected_time_step))
return actual_simulation_start_datetime, actual_simulation_end_datetime, time_step, total_num_time_steps
# ------------------------------------------------------------------------------
# MAIN PROCESS
# ------------------------------------------------------------------------------
[docs]def run_lsm_rapid_process(rapid_executable_location,
lsm_data_location,
rapid_io_files_location=None,
rapid_input_location=None,
rapid_output_location=None,
simulation_start_datetime=None,
simulation_end_datetime=datetime.utcnow(),
file_datetime_pattern=None,
file_datetime_re_pattern=None,
initial_flows_file=None,
ensemble_list=[None],
generate_rapid_namelist_file=True,
run_rapid_simulation=True,
generate_return_periods_file=False,
return_period_method='weibul',
generate_seasonal_averages_file=False,
generate_seasonal_initialization_file=False,
generate_initialization_file=False,
use_all_processors=True,
num_processors=1,
mpiexec_command="mpiexec",
cygwin_bin_location="",
modeling_institution="US Army Engineer Research and Development Center",
convert_one_hour_to_three=False,
expected_time_step=None,
):
"""
This is the main process to generate inflow for RAPID and to run RAPID.
Args:
rapid_executable_location(str): Path to the RAPID executable.
lsm_data_location(str): Path to the directory containing the Land Surface Model output files.
rapid_io_files_location(Optional[str]): Path to the directory containing the input and output folders for RAPID. This is for running multiple watersheds.
rapid_input_location(Optional[str]): Path to directory with RAPID simulation input data. Required if `rapid_io_files_location` is not set.
rapid_output_location(Optional[str]): Path to directory to put output. Required if `rapid_io_files_location` is not set.
simulation_start_datetime(Optional[datetime]): Datetime object with date bound of earliest simulation start.
simulation_end_datetime(Optional[datetime]): Datetime object with date bound of latest simulation end. Defaults to datetime.utcnow().
file_datetime_pattern(Optional[str]): Datetime pattern for files (Ex. '%Y%m%d%H'). If set, file_datetime_re_pattern is required. Various defaults used by each model.
file_datetime_re_pattern(Optional[raw str]): Regex pattern to extract datetime (Ex. r'\d{10}'). If set, file_datetime_pattern is required. Various defaults used by each model.
initial_flows_file(Optional[str]): If given, this is the path to a file with initial flows for the simulaion.
ensemble_list(Optional[list]): This is the expexted ensemble name appended to the end of the file name.
generate_rapid_namelist_file(Optional[bool]): If True, this will create a RAPID namelist file for the run in your RAPID input directory. Default is True.
run_rapid_simulation(Optional[bool]): If True, the RAPID simulation will run after generating the inflow file. Default is True.
generate_return_periods_file(Optional[bool]): If True, the return period file will be generated in the output. Default is False.
return_period_method(Optional[str]): If True, the return period file will be generated in the output. Default is False.
generate_seasonal_averages_file(Optional[bool]): If True, the season average file will be generated. Default is False.
generate_seasonal_initialization_file(Optional[bool]): If True, an intialization based on the seasonal average for the current day of the year will be created. Default is False.
generate_initialization_file(Optional[bool]): If True, an initialization file from the last time step of the simulation willl be created. Default is False.
use_all_processors(Optional[bool]): If True, it will use all available processors to perform this operation. Default is True.
num_processors(Optional[int]): If use_all_processors is False, this argument will determine the number of processors to use. Default is 1.
mpiexec_command(Optional[str]): This is the command to execute RAPID. Default is "mpiexec".
cygwin_bin_location(Optional[str]): If using Windows, this is the path to the Cygwin bin location. Default is "".
modeling_institution(Optional[str]): This is the institution performing the modeling and is in the output files. Default is "US Army Engineer Research and Development Center".
convert_one_hour_to_three(Optional[bool]): If the time step is expected to be 1-hr it will convert to 3. Set to False if the LIS, NLDAS, or Joules grid time step is greater than 1-hr.
expected_time_step(Optional[int]): The time step in seconds of your LSM input data if only one file is given. Required if only one file is present.
Returns:
list: A list of output file information.
Example of regular run:
.. code:: python
from datetime import datetime
from RAPIDpy.inflow import run_lsm_rapid_process
#------------------------------------------------------------------------------
#main process
#------------------------------------------------------------------------------
if __name__ == "__main__":
run_lsm_rapid_process(
rapid_executable_location='/home/alan/rapid/src/rapid',
rapid_io_files_location='/home/alan/rapid-io',
lsm_data_location='/home/alan/era_data',
)
Example of single input/output run:
.. code:: python
from datetime import datetime
from RAPIDpy.inflow import run_lsm_rapid_process
#------------------------------------------------------------------------------
#main process
#------------------------------------------------------------------------------
if __name__ == "__main__":
run_lsm_rapid_process(
rapid_executable_location='/home/alan/rapid/src/rapid',
rapid_input_location='/home/alan/rapid-io/input/provo_watershed',
rapid_output_location='/home/alan/rapid-io/output/provo_watershed',
lsm_data_location='/home/alan/era_data',
)
Example of run with FLDAS and datetime filter:
.. note:: http://disc.sci.gsfc.nasa.gov/uui/datasets?keywords=FLDAS
.. code:: python
from datetime import datetime
from RAPIDpy.inflow import run_lsm_rapid_process
#------------------------------------------------------------------------------
#main process
#------------------------------------------------------------------------------
if __name__ == "__main__":
run_lsm_rapid_process(
rapid_executable_location='/home/alan/rapid/src/rapid',
rapid_io_files_location='/home/alan/rapid-io',
lsm_data_location='/home/alan/lsm_data',
simulation_start_datetime=datetime(1980, 1, 1),
file_datetime_re_pattern = r'\d{8}',
file_datetime_pattern = "%Y%m%d",
)
Example of run with CMIP5:
.. note:: http://gdo-dcp.ucllnl.org/downscaled_cmip_projections/techmemo/BCSD5HydrologyMemo.pdf
.. code:: python
from datetime import datetime
from RAPIDpy.inflow import run_lsm_rapid_process
#------------------------------------------------------------------------------
#main process
#------------------------------------------------------------------------------
if __name__ == "__main__":
run_lsm_rapid_process(
rapid_executable_location='/home/jimwlewis/rapid/src/rapid',
rapid_io_files_location='/data/rapid-io4',
lsm_data_location='/data/rapid-io4/input/cmip5-jun01',
simulation_start_datetime=datetime(2001, 1, 1),
simulation_end_datetime=datetime(2002, 12, 31),
file_datetime_pattern="%Y",
file_datetime_re_pattern=r'\d{4}',
)
"""
time_begin_all = datetime.utcnow()
# use all processors makes precedent over num_processors arg
if use_all_processors == True:
NUM_CPUS = multiprocessing.cpu_count()
elif num_processors > multiprocessing.cpu_count():
print("WARNING: Num processors requested exceeded max. Set to max ...")
NUM_CPUS = multiprocessing.cpu_count()
else:
NUM_CPUS = num_processors
# get list of correclty formatted rapid input directories in rapid directory
rapid_directories = []
if rapid_io_files_location is not None:
main_rapid_input_directory = os.path.join(rapid_io_files_location, 'input')
for watershed_directory in get_valid_directory_list(main_rapid_input_directory):
watershed_input_path = os.path.join(main_rapid_input_directory,
watershed_directory)
watershed_output_path = os.path.join(rapid_io_files_location,
'output',
watershed_directory)
rapid_directories.append((watershed_input_path, watershed_output_path))
elif None not in (rapid_input_location, rapid_output_location):
rapid_directories = [(rapid_input_location, rapid_output_location)]
else:
raise ValueError("Need 'rapid_io_files_location' or 'rapid_input_location' "
"and 'rapid_output_location' set to continue.")
all_output_file_information = []
for ensemble in ensemble_list:
output_file_information = {
'ensemble' : ensemble,
}
ensemble_file_ending = ".nc"
ensemble_file_ending4 = ".nc4"
if ensemble != None:
ensemble_file_ending = "_{0}.nc".format(ensemble)
ensemble_file_ending4 = "_{0}.nc4".format(ensemble)
# get list of files
lsm_file_list = []
for subdir, dirs, files in os.walk(lsm_data_location, followlinks=True):
for lsm_file in files:
if lsm_file.endswith(ensemble_file_ending) or lsm_file.endswith(ensemble_file_ending4):
lsm_file_list.append(os.path.join(subdir, lsm_file))
lsm_file_list = sorted(lsm_file_list)
# IDENTIFY THE GRID
lsm_file_data = identify_lsm_grid(lsm_file_list[0])
# load in the datetime pattern
if file_datetime_pattern is None or file_datetime_re_pattern is None:
file_datetime_re_pattern = DEFAULT_LSM_INPUTS[lsm_file_data['grid_type']]['file_datetime_re_pattern']
file_datetime_pattern = DEFAULT_LSM_INPUTS[lsm_file_data['grid_type']]['file_datetime_pattern']
file_re_match = re.compile(file_datetime_re_pattern)
# get subset based on time bounds
if simulation_start_datetime is not None:
print("Filtering files by datetime ...")
lsm_file_list_subset = []
for lsm_file in lsm_file_list:
match = file_re_match.search(lsm_file)
file_date = datetime.strptime(match.group(0), file_datetime_pattern)
if file_date > simulation_end_datetime:
break
if file_date >= simulation_start_datetime:
lsm_file_list_subset.append(os.path.join(subdir, lsm_file))
lsm_file_list = sorted(lsm_file_list_subset)
print("Running from {0} to {1}".format(lsm_file_list[0],
lsm_file_list[-1]))
# get number of time steps in file
actual_simulation_start_datetime, actual_simulation_end_datetime, time_step, total_num_time_steps = \
determine_start_end_timestep(lsm_file_list,
file_re_match=file_re_match,
file_datetime_pattern=file_datetime_pattern,
expected_time_step=expected_time_step,
lsm_grid_info=lsm_file_data)
# VALIDATING INPUT IF DIVIDING BY 3
time_step_multiply_factor = 1
if (lsm_file_data['grid_type'] in ('nldas', 'lis', 'joules')) and convert_one_hour_to_three:
num_extra_files = total_num_time_steps % 3
if num_extra_files != 0:
print("WARNING: Number of files needs to be divisible by 3. Remainder is {0}".format(num_extra_files))
print("This means your simulation will be truncated")
total_num_time_steps /= 3
time_step *= 3
# compile the file ending
out_file_ending = "{0}_{1}_{2}hr_{3:%Y%m%d}to{4:%Y%m%d}{5}".format(lsm_file_data['model_name'],
lsm_file_data['grid_type'],
int(time_step/3600),
actual_simulation_start_datetime,
actual_simulation_end_datetime,
ensemble_file_ending)
# run LSM processes
for master_watershed_input_directory, master_watershed_output_directory in rapid_directories:
print("Running from: {0}".format(master_watershed_input_directory))
try:
os.makedirs(master_watershed_output_directory)
except OSError:
pass
# create inflow to dump data into
master_rapid_runoff_file = os.path.join(master_watershed_output_directory,
'm3_riv_bas_{0}'.format(out_file_ending))
weight_table_file = case_insensitive_file_search(master_watershed_input_directory,
lsm_file_data['weight_file_name'])
try:
in_rivid_lat_lon_z_file = case_insensitive_file_search(master_watershed_input_directory,
r'comid_lat_lon_z\.csv')
except Exception:
in_rivid_lat_lon_z_file = ""
print("WARNING: comid_lat_lon_z file not found. The lat/lon will not be added ...")
pass
print("Writing inflow file to: {0}".format(master_rapid_runoff_file))
lsm_file_data['rapid_inflow_tool'].generateOutputInflowFile(
out_nc=master_rapid_runoff_file,
start_datetime_utc=actual_simulation_start_datetime,
number_of_timesteps=total_num_time_steps,
simulation_time_step_seconds=time_step,
in_rapid_connect_file=case_insensitive_file_search(master_watershed_input_directory,
r'rapid_connect\.csv'),
in_rivid_lat_lon_z_file=in_rivid_lat_lon_z_file,
land_surface_model_description=lsm_file_data['description'],
modeling_institution=modeling_institution
)
job_combinations = []
if (lsm_file_data['grid_type'] in ('nldas', 'lis', 'joules')) and convert_one_hour_to_three:
print("Grouping {0} in threes".format(lsm_file_data['grid_type']))
lsm_file_list = [lsm_file_list[nldas_index:nldas_index+3]
for nldas_index in range(0, len(lsm_file_list), 3)
if len(lsm_file_list[nldas_index:nldas_index+3]) == 3]
if len(lsm_file_list) < NUM_CPUS:
NUM_CPUS = len(lsm_file_list)
mp_lock = multiprocessing.Manager().Lock()
partition_list, partition_index_list = partition(lsm_file_list, NUM_CPUS)
for loop_index, cpu_grouped_file_list in enumerate(partition_list):
if cpu_grouped_file_list and partition_index_list[loop_index]:
job_combinations.append((cpu_grouped_file_list,
partition_index_list[loop_index],
weight_table_file,
lsm_file_data['grid_type'],
master_rapid_runoff_file,
lsm_file_data['rapid_inflow_tool'],
mp_lock))
# COMMENTED CODE IS FOR DEBUGGING
# generate_inflows_from_runoff((cpu_grouped_file_list,
# partition_index_list[loop_index],
# lsm_file_data['weight_table_file'],
# lsm_file_data['grid_type'],
# master_rapid_runoff_file,
# lsm_file_data['rapid_inflow_tool'],
# mp_lock))
pool = multiprocessing.Pool(NUM_CPUS)
pool.map(generate_inflows_from_runoff,
job_combinations)
pool.close()
pool.join()
# set up RAPID manager
rapid_manager = RAPID(rapid_executable_location=rapid_executable_location,
cygwin_bin_location=cygwin_bin_location,
num_processors=NUM_CPUS,
mpiexec_command=mpiexec_command,
ZS_TauR=time_step, # duration of routing procedure (time step of runoff data)
ZS_dtR=15 * 60, # internal routing time step
ZS_TauM=total_num_time_steps * time_step, # total simulation time
ZS_dtM=time_step # RAPID recommended internal time step (1 day)
)
if initial_flows_file and os.path.exists(initial_flows_file):
rapid_manager.update_parameters(
Qinit_file=initial_flows_file,
BS_opt_Qinit=True
)
# run RAPID for the watershed
lsm_rapid_output_file = os.path.join(master_watershed_output_directory,
'Qout_{0}'.format(out_file_ending))
rapid_manager.update_parameters(
rapid_connect_file=case_insensitive_file_search(master_watershed_input_directory,
r'rapid_connect\.csv'),
Vlat_file=master_rapid_runoff_file,
riv_bas_id_file=case_insensitive_file_search(master_watershed_input_directory,
r'riv_bas_id\.csv'),
k_file=case_insensitive_file_search(master_watershed_input_directory,
r'k\.csv'),
x_file=case_insensitive_file_search(master_watershed_input_directory,
r'x\.csv'),
Qout_file=lsm_rapid_output_file
)
rapid_manager.update_reach_number_data()
output_file_information[os.path.basename(master_watershed_input_directory)] = {
'm3_riv': master_rapid_runoff_file,
'qout': lsm_rapid_output_file
}
if generate_rapid_namelist_file:
rapid_manager.generate_namelist_file(os.path.join(master_watershed_input_directory,
"rapid_namelist_{}".format(out_file_ending[:-3])))
if run_rapid_simulation:
rapid_manager.run()
rapid_manager.make_output_CF_compliant(
simulation_start_datetime=actual_simulation_start_datetime,
comid_lat_lon_z_file=in_rivid_lat_lon_z_file,
project_name="{0} Based Historical flows by {1}".format(lsm_file_data['description'],
modeling_institution)
)
# generate return periods
if generate_return_periods_file and os.path.exists(lsm_rapid_output_file) and lsm_rapid_output_file:
return_periods_file = os.path.join(master_watershed_output_directory,
'return_periods_{0}'.format(out_file_ending))
# assume storm has 3 day length
storm_length_days = 3
generate_return_periods(qout_file=lsm_rapid_output_file,
return_period_file=return_periods_file,
num_cpus=NUM_CPUS,
storm_duration_days=storm_length_days,
method=return_period_method)
# generate seasonal averages file
if generate_seasonal_averages_file and os.path.exists(lsm_rapid_output_file) and lsm_rapid_output_file:
seasonal_averages_file = os.path.join(master_watershed_output_directory,
'seasonal_averages_{0}'.format(out_file_ending))
generate_seasonal_averages(lsm_rapid_output_file,
seasonal_averages_file,
NUM_CPUS)
# generate seasonal initialization file
if generate_seasonal_initialization_file and os.path.exists(lsm_rapid_output_file) and lsm_rapid_output_file:
seasonal_qinit_file = os.path.join(master_watershed_input_directory,
'seasonal_qinit_{0}.csv'.format(out_file_ending[:-3]))
rapid_manager.generate_seasonal_intitialization(seasonal_qinit_file)
# generate initialization file
if generate_initialization_file and os.path.exists(lsm_rapid_output_file) and lsm_rapid_output_file:
qinit_file = os.path.join(master_watershed_input_directory,
'qinit_{0}.csv'.format(out_file_ending[:-3]))
rapid_manager.generate_qinit_from_past_qout(qinit_file)
all_output_file_information.append(output_file_information)
# print info to user
time_end = datetime.utcnow()
print("Time Begin All: {0}".format(time_begin_all))
print("Time Finish All: {0}".format(time_end))
print("TOTAL TIME: {0}".format(time_end-time_begin_all))
return all_output_file_information