import geopandas as gpd
import pandas as pd
import os
import sys
import yaml
import json
import shutil
from termcolor import cprint
[docs]
def load_data_from_disk(casestudy: str, scenario: str, config: dict) -> dict:
"""loads the data required for the optimization model from disk.
:param casestudy: name of the case study
:type casestudy: str
:param scenario: name of the scenario
:type scenario: str
:param config: configuration dictionary
:type config: dict
:return: dictionary containing the dataframes and geodataframes for the optimization model
:rtype: dict
"""
scenario_dir = config['scenario_dir']
data_dir = config['data_dir']
heat_root_dir = config['model_data']['root_dir']
data_path = os.path.join(casestudy, scenario_dir, scenario, data_dir, heat_root_dir)
print("Loading data from: ", data_path)
df_heat_demand = gdf_heat_gen_units = gdf_heat_network = gdf_heat_nodes = df_waste_heat_prof = None
if os.path.exists(os.path.join(data_path, config['model_data']['heat_dem'])):
df_heat_demand = pd.read_csv(os.path.join(data_path, config['model_data']['heat_dem']), sep=',')
if os.path.exists(os.path.join(data_path, config['model_data']['heat_gen_units'])):
gdf_heat_gen_units = gpd.read_file(os.path.join(data_path, config['model_data']['heat_gen_units']))
if os.path.exists(os.path.join(data_path, config['model_data']['heat_network'])):
gdf_heat_network = gpd.read_file(os.path.join(data_path, config['model_data']['heat_network']))
if os.path.exists(os.path.join(data_path, config['model_data']['heat_nodes'])):
gdf_heat_nodes = gpd.read_file(os.path.join(data_path, config['model_data']['heat_nodes']))
if os.path.exists(os.path.join(data_path, config['model_data']['wh_profiles'])):
df_waste_heat_prof = pd.read_csv(os.path.join(data_path, config['model_data']['wh_profiles']), sep=',')
with open(os.path.join(data_path, config['model_data']['cost_parameter'])) as json_file:
dict_parameter_cost = json.load(json_file)
# merge them into a dictionary
model_input_data = {
'heat_demand': df_heat_demand,
'heat_gen_units': gdf_heat_gen_units,
'heat_network': gdf_heat_network,
'heat_nodes': gdf_heat_nodes,
'waste_heat_prof': df_waste_heat_prof,
'parameter_cost': dict_parameter_cost
}
return model_input_data
[docs]
def load_config() -> dict:
"""loads the configuration file
:return: configuration dictionary
:rtype: dict
"""
with open('_config.yaml', 'r') as file:
config = yaml.safe_load(file)
return config
[docs]
def load_cost_parameter(casestudy: str, scenario: str, config: dict) -> dict:
"""loads the cost and parameters for tje scenario from disk.
:param casestudy: name of the case study
:type casestudy: str
:param scenario: name of the scenario
:type scenario: str
:param config: configuration dictionary
:type config: dict
:return: dictionary containing the costs and parameters
:rtype: dict
"""
scenario_dir = config['scenario_dir']
data_dir = config['data_dir']
heat_root_dir = config['model_data']['root_dir']
data_path = os.path.join(casestudy, scenario_dir, scenario, data_dir, heat_root_dir)
with open(os.path.join(data_path, config['model_data']['cost_parameter'])) as json_file:
dict_parameter_cost = json.load(json_file)
return dict_parameter_cost
# copy that and set the scenario_path to that directory
[docs]
def load_model_data(scenario_path: str) -> dict: ## not used anymore
"""Loads the model input data from the scenario path. - not used anymore, use load_data_from_disk instead
:param scenario_path: path to the correct scenario folder
:type scenario_path: str
:return: dictionary containing the dataframes and geodataframes for the optimization model
:rtype: dict
"""
cprint("Start: loading data from: " + scenario_path)
# Load the data if the coresponding dataframes are not in the workspace allready
subfolder = 'model_input_data'
assert os.path.exists(os.path.join(scenario_path,subfolder, 'Heat_Demand.csv')), 'Heat_Demand.csv not found!'
df_heat_demand = pd.read_csv(os.path.join(scenario_path,subfolder, 'Heat_Demand.csv'))
assert os.path.exists(os.path.join(scenario_path, subfolder, 'Heat_Generation_Units.geojson')), \
'Heat_Generation_Units.geojson not found!'
gdf_heat_gen_units = gpd.read_file(os.path.join(scenario_path,subfolder, 'Heat_Generation_Units.geojson'))
assert os.path.exists(os.path.join(scenario_path, subfolder, 'Heat_Network.geojson')), \
'Heat_Network.geojson not found!'
gdf_heat_network = gpd.read_file(os.path.join(scenario_path,subfolder, 'Heat_Network.geojson'), sep=';')
assert os.path.exists(os.path.join(scenario_path, subfolder, 'Heat_Nodes.csv')), \
'Heat_Nodes.geojson not found!'
gdf_heat_nodes = gpd.read_file(os.path.join(scenario_path,subfolder, 'Heat_Nodes.geojson'))
if os.path.exists(os.path.join(scenario_path, subfolder, 'Heat_WH_Profiles.csv')):
df_waste_heat_prof = pd.read_csv(os.path.join(scenario_path, subfolder, 'Heat_WH_Profiles.csv'), sep=';')
#add here for the other parameters and import them
# merge them into a dictionary
model_input_data = {
'heat_demand': df_heat_demand,
'heat_gen_units': gdf_heat_gen_units,
'heat_network': gdf_heat_network,
'heat_nodes': gdf_heat_nodes,
'waste_heat_prof': df_waste_heat_prof
}
print("Done: loading model input data", 'green')
return model_input_data
[docs]
def load_temp_data(scenario: str, config: dict) -> pd.DataFrame:
"""Loads the outside temperature data for the scenario from disk.
:param scenario: name of the scenario
:type scenario: str
:param config: configuration dictionary
:type config: dict
:return: dataframe containing the outside temperature data and the time index
:rtype: pd.DataFrame
"""
path_temp = os.path.join(scenario, config["parameter_dir"], config["MCMC_dir"], config['case_study_data']['outside_temp'])
df_temperature = pd.read_excel(path_temp) #, skiprows=[0,1,2])
df_temperature['time'] = pd.to_datetime(df_temperature['time'])
return df_temperature
[docs]
def load_solar_gain_data(scenario: str, config: dict) -> pd.DataFrame:
"""Loads the solar gain data for the scenario from disk.
Solar gain data here referes to the solar irradiation in W/m2 data that is used to calculate the solar gain for the buildings.
:param scenario: name of the scenario
:type scenario: str
:param config: configuration dictionary
:type config: dict
:return: dataframe containing the solar gain data and the time index
:rtype: pd.DataFrame
"""
path_temp = os.path.join(scenario, config["parameter_dir"], config["MCMC_dir"], config['case_study_data']['solar_gain'])
df_solar_gain = pd.read_excel(path_temp) #, skiprows=[0,1,2])
df_solar_gain['time'] = pd.to_datetime(df_solar_gain['time'])
return df_solar_gain
[docs]
def generate_new_scenario(casestudy: str, scenario_dir: str, config: dict):
"""Generates a new scenario folder with default files and default values.
:param casestudy: name of the case study
:type casestudy: str
:param scenario_dir: name of the scenario directory to be created
:type scenario_dir: str
:param config: configuration dictionary
:type config: dict
"""
# create a new scenario folder
# copy the data from the template folder
# create a new scenario
scenario_path = os.path.join(casestudy, config['scenario_dir'], scenario_dir)
# check if the scenario folder exists
if os.path.exists(scenario_path):
print("Scenario folder exists already")
else:
print("Creating new scenario folder with default values")
os.makedirs(scenario_path)
# copy the data from the defailt folder
default_path = os.path.join(config['default_data'])
input_path = os.path.join(scenario_path,config['input_dir'])
os.makedirs(input_path)
# copy the data
shutil.copyfile(os.path.join(default_path,config['heat_source_data']['heat_profiles']), os.path.join(input_path,config['heat_source_data']['heat_profiles']))
shutil.copyfile(os.path.join(default_path,config['heat_source_data']['heat_sources']), os.path.join(input_path,config['heat_source_data']['heat_sources']))
shutil.copyfile(os.path.join(default_path,config['parameter_costs']['input_file']), os.path.join(input_path,config['parameter_costs']['input_file']))
[docs]
def generate_new_case_study(casestudy: str, config: dict):
"""Generates a new case study folder with default files and default values.
:param casestudy: name of the case study to be created
:type casestudy: str
:param config: configuration dictionary
:type config: dict
"""
# create a new case study folder
# copy the data from the template folder
# create a new scenario
casestudy_path = os.path.join(casestudy)
# check if the scenario folder exists
if os.path.exists(casestudy_path):
print(f"Case study {casestudy_path} folder exists already")
else:
default_path = os.path.join(config['default_data'])
buidling_data_path = os.path.join(casestudy_path,config['parameter_dir'],config['building_data_dir'])
MCMC_data_path = os.path.join(casestudy_path,config['parameter_dir'],config['MCMC_dir'])
print(f"Creating new case study folder {casestudy_path} with default values. Please adjust the parameters in the config file!")
os.makedirs(buidling_data_path)
os.makedirs(MCMC_data_path)
# copy the data from the defailt folder
# copy the data
shutil.copyfile(os.path.join(default_path,config['case_study_data']['building_typology']), os.path.join(buidling_data_path,config['case_study_data']['building_typology']))
shutil.copyfile(os.path.join(default_path,config['case_study_data']['outside_temp']), os.path.join(MCMC_data_path,config['case_study_data']['outside_temp']))
shutil.copyfile(os.path.join(default_path,config['case_study_data']['solar_gain']), os.path.join(MCMC_data_path,config['case_study_data']['solar_gain']))
shutil.copyfile(os.path.join(default_path,config['case_study_data']['transition_matrix_WD']), os.path.join(MCMC_data_path,config['case_study_data']['transition_matrix_WD']))
shutil.copyfile(os.path.join(default_path,config['case_study_data']['transition_matrix_WE']), os.path.join(MCMC_data_path,config['case_study_data']['transition_matrix_WE']))
[docs]
def prepare_parameter_file(case_study_name: str, scenario_name: str, config: dict):
"""Prepares the parameter file for the scenario by loading the parameter xlsx file and saving it as a json file.
:param case_study_name: name of the case study
:type case_study_name: str
:param scenario_name: name of the scenario
:type scenario_name: str
:param config: configuration dictionary
:type config: dict
"""
#load the parameter xlsx
path_read_parameter = os.path.join(case_study_name, config['scenario_dir'], scenario_name, config['input_dir'])
path_write_parameter = os.path.join(case_study_name, config['scenario_dir'], scenario_name, config['data_dir'], config['model_data']['root_dir'])
print("Start: load heat generation units from: " + path_read_parameter)
df_parametercost = pd.read_excel(os.path.join(path_read_parameter,config['parameter_costs']['input_file']))
# change the columns parameter and value to dictionary
df_parametercost = df_parametercost[['Parameter','Value']]
print(df_parametercost)
dict_parametercost = df_parametercost.set_index('Parameter').T.to_dict('records')[0]
# check if the directory exists
if not os.path.exists(path_write_parameter):
os.makedirs(path_write_parameter)
# save dictionary to json
with open(os.path.join(path_write_parameter,config['model_data']['cost_parameter']), 'w') as fp:
json.dump(dict_parametercost, fp)
[docs]
def read_output_from_disk(case_study_name: str, model_name: str, config: dict) -> dict:
"""Reads the output data of the optimisation model from the disk for the given case study and model name.
:param case_study_name: name of the case study
:type case_study_name: str
:param model_name: name of the scenario
:type model_name: str
:param config: configuration dictionary
:type config: dict
:return: dictionary containing the output dataframes from the optimization model
:rtype: dict
"""
path_output = os.path.join(case_study_name, config['scenario_dir'], model_name, config['output_dir'])
# get all the file names in the folder
files = os.listdir(path_output)
model_output_data = {}
for file in files:
temp_filename = file.split('.')[0]
if file.endswith('.csv'):
model_output_data[temp_filename] = pd.read_csv(os.path.join(path_output, file), sep=',') # change sperator if needed (and changed for the rest)
elif file.endswith('.geojson'):
model_output_data[file] = gpd.read_file(os.path.join(path_output, file))
else:
print("File format not supported: ", file)
return model_output_data