Source code for model

# import libraries to work with Pyomo models
from data import *
import logging
import os

from itertools import product
from pyomo.environ import *
from pyomo.opt import SolverFactory
import numpy as np
import pandas as pd
import geopandas as gpd
import utils

from termcolor import colored, cprint

import time


[docs] class HeatNetworkModel(ConcreteModel): """Class to represent the core model of the heat network optimization problem. It inherits from ConcreteModel. """ def __init__(self, scenario_name:str, logger = None): """Constructor method for the HeatNetworkModel class. It calls the parent class's constructor and initializes the model's data structures. :param scenario_name: name of the scenario, used for the model name and logging :type scenario_name: str :param logger: optional logger to log the model's solution process. If not provided, logging is disabled. :type logger: _type_, optional """ super().__init__(name=scenario_name) self.data_dict = {} self.logging = False if logger is not None: self.logger = logger self.logging = True
[docs] def fill_model_data(self, input_dict:dict): """Read the model data from the input dictionary and set up the model sets and parameters in Pyomo. :param input_dict: input data from disk :type input_dict: dict """ df_heat_demand = input_dict['heat_demand'] df_heat_gen_units = input_dict['heat_gen_units'] df_heat_network = input_dict['heat_network'] df_heat_nodes = input_dict['heat_nodes'] dict_parameter_cost = input_dict['parameter_cost'] df_waste_heat_prof = None if input_dict['waste_heat_prof'] is not None: df_waste_heat_prof = input_dict['waste_heat_prof'] # check that the waste heat has an associated heat generation unit for wh in df_waste_heat_prof.columns[1:]: if wh not in df_heat_gen_units.source.unique(): self.logger.error(f"Waste heat profile {wh} has no associated heat generation unit") assert False, f"Waste heat profile {wh} has no associated heat generation unit" hours = df_heat_demand.hour.values.astype(int) hn = df_heat_nodes.node.unique() if self.logging: self.logger.info("######################") self.logger.info("Start: fill model data") self.logger.info("######################") # Define sets self.n = hn # Thermal nodes self.h = hours # Hourly index # General parameters self.pCostHNS = Param(initialize=dict_parameter_cost['pCostHNS']) self.pCostPumping = Param(initialize=dict_parameter_cost['pCostPumping']) self.pTsupply = Param(initialize=dict_parameter_cost['pTsupply']) self.pTreturn = Param(initialize=dict_parameter_cost['pTreturn']) self.pPipeCostIni = Param(initialize=dict_parameter_cost['pPipeCostIni']) self.pPipeCostSlope = Param(initialize=dict_parameter_cost['pPipeCostsSlope']) self.pMassFlowIni = Param(initialize=dict_parameter_cost['pMassFlowIni']) self.pAllowDoubleHeating = Param(initialize=dict_parameter_cost['allow_double_heating']) # will be replaced by a parameter of heat gen units later self.pTESlosses = Param(initialize=0.0) # Physical parameters, in kWh/(m³*K) water at standard conditions self.pCWater = Param(initialize=1.16389) start = time.time() # dictionary to create the heat network dict_heat_network = {} for idx, row in df_heat_network.iterrows(): dict_heat_network[(row.iat[0], row.iat[1])] = row['distance'] if self.logging: self.logger.info("Created heat network dictionary - {:.2f}".format(time.time() - start)) print("Done: create heat network dictionary") print("Time: ", time.time() - start) start = time.time() # use the network dictionary to create the connections index self.pc = Set(initialize=[(n1, n2) for n1, n2 in dict_heat_network.keys()]) # Pipe candidate connections self.hg = Set(initialize=df_heat_gen_units.source.unique()) # Heat generation units self.hgn = Set(initialize=[(row.source, row.heat_unit_id) for idx, row in df_heat_gen_units.iterrows()]) # Assignment of heat gen units to nodes self.tes = Set(within=self.hg, initialize=df_heat_gen_units[df_heat_gen_units.isTES == 1].source.unique()) # Thermal energy storage self.hb = Set(within=self.hg, initialize=df_heat_gen_units[df_heat_gen_units.isBoiler == 1].source.unique()) # Heat boilers if df_waste_heat_prof is not None: self.wh = Set(within=self.hg, initialize=df_waste_heat_prof.columns[1:]) # Waste heat generation units else: self.wh = Set(within=self.hg) # Waste heat generation units if self.logging: self.logger.info("Created heat network dictionary - {:.2f}".format(time.time() - start)) print("Done: add parameters") print("Time: ", time.time() - start) start = time.time() # add the heat demand data to the model dict_cost_dh_connect = {} dict_heat_demand = {} dict_max_dh_power = {} dict_local_heat_prod_cost = {} for n in self.n: # create connection cost dict_cost_dh_connect[n] = df_heat_nodes[df_heat_nodes.node == n].CostsDHconnect.values[0] # create max dh power dict_max_dh_power[n] = df_heat_nodes[df_heat_nodes.node == n].maxDHPower.values[0] # create local heat production cost dict_local_heat_prod_cost[n] = df_heat_nodes[df_heat_nodes.node == n].LocalHeatProdCost.values[0] aux_df = df_heat_demand.loc[:, ['hour', n]] # this could be improved to use itertuples, but it is not a bottleneck right now for idx, row in aux_df.iterrows(): dict_heat_demand[(n, int(row['hour']))] = row[n] if self.logging: self.logger.info("Created heat demand dictionary - {:.2f}".format(time.time() - start)) print("Done: created heat demand dictionary") print("Time: ", time.time() - start) start = time.time() self.pHeatDemand = Param(self.n, self.h, initialize=dict_heat_demand, within=NonNegativeReals) self.pCostDHConnect = Param(self.n, initialize=dict_cost_dh_connect, within=NonNegativeReals) self.pMaxDHPower = Param(self.n, initialize=dict_max_dh_power, within=NonNegativeReals) self.pCostLocalHeatProd = Param(self.n, initialize=dict_local_heat_prod_cost,within=NonNegativeReals) self.pPipeLength = Param(self.pc, initialize=dict_heat_network, within=NonNegativeReals) if self.logging: self.logger.info("Added heat demand data - {:.2f}".format(time.time() - start)) print("Done: add heat demand data") print("Time: ", time.time() - start) start = time.time() dict_waste_heat_prof = {} dict_cost_central_heat_prod = {} dict_cost_central_heat_prod_inv = {} dict_cost_tes_inv = {} for hg in self.hg: if not(np.isnan(df_heat_gen_units[df_heat_gen_units.source == hg].OMVarCost.values[0])): dict_cost_central_heat_prod[hg] = df_heat_gen_units[df_heat_gen_units.source == hg].OMVarCost.values[0] if not (np.isnan(df_heat_gen_units[df_heat_gen_units.source == hg].PowerInvCost.values[0])): dict_cost_central_heat_prod_inv[hg] = df_heat_gen_units[df_heat_gen_units.source == hg].PowerInvCost.values[0] if (df_heat_gen_units[df_heat_gen_units.source == hg].isTES.values[0] == 1 and not (np.isnan(df_heat_gen_units[df_heat_gen_units.source == hg].StorageInvCost.values[0]))): dict_cost_tes_inv[hg] = df_heat_gen_units[df_heat_gen_units.source == hg].StorageInvCost.values[0] if df_waste_heat_prof is not None: if hg in df_waste_heat_prof.columns: for idx, row in df_waste_heat_prof.iterrows(): dict_waste_heat_prof[(hg, row['hour'])] = row[hg] if self.logging: self.logger.info("Created heat generation dictionaries - {:.2f}".format(time.time() - start)) print("Done: add heat generation dictionaries") print("Time: ", time.time() - start) start = time.time() self.pMaxWHMF = Param(self.hg, self.h, initialize=dict_waste_heat_prof, within=NonNegativeReals) self.pCostCentrHeatProd = Param(self.hg, initialize=dict_cost_central_heat_prod, within=NonNegativeReals) self.pCostCentralHeatProdInv = Param(self.hg, initialize=dict_cost_central_heat_prod_inv, within=NonNegativeReals) self.pCostTESInv = Param(self.hg, initialize=dict_cost_tes_inv, within=NonNegativeReals) if self.logging: self.logger.info("Added heat generation data - {:.2f}".format(time.time() - start)) print("Done: add heat generation parameters") print("Time: ", time.time() - start) if self.logging: self.logger.info("######################") self.logger.info("End: fill model data") self.logger.info("######################")
[docs] def initialize_variables(self): """Initialize the variables of the model. """ if self.logging: self.logger.info("###########################") self.logger.info("Start: initialize variables") self.logger.info("###########################") start = time.time() # Continuous variables #self.vTotalVCost = Var(domain=NonNegativeReals) self.vMF = Var(self.pc, self.h, domain=NonNegativeReals) self.vHNS = Var(self.n, self.h, domain=NonNegativeReals) self.vLocalHeatProd = Var(self.n, self.h, domain=NonNegativeReals) self.vMFConsumption = Var(self.n, self.h, domain=NonNegativeReals) self.vMFInjection = Var(self.n, self.h, domain=NonNegativeReals) self.vCentralHeatProd = Var(self.hg, self.h, domain=NonNegativeReals) self.vCentralHeatProdInv = Var(self.hg, domain=NonNegativeReals) self.vTESLevel = Var(self.hg, self.h, domain=NonNegativeReals) self.vTESCharge = Var(self.hg, self.h, domain=NonNegativeReals) self.vTESDischarge = Var(self.hg, self.h, domain=NonNegativeReals) self.vTESCapacitivInv = Var(self.hg, domain=NonNegativeReals) self.vPipeMassFlowInv = Var(self.pc, domain=NonNegativeReals) # Binary variables self.vDHconnect = Var(self.n, domain=Binary, initialize=0) self.vBinBuildPipe = Var(self.pc, domain=Binary) if self.logging: self.logger.info("Added variables - {:.2f}".format(time.time() - start)) print("Done: add variables") print("Time: ", time.time() - start) if self.logging: self.logger.info("###########################") self.logger.info("End: initialize variables") self.logger.info("###########################")
[docs] def initialize_constraints(self): """Initialize the constraints of the model. """ if self.logging: self.logger.info("#############################") self.logger.info("Start: initialize constraints") self.logger.info("#############################") start = time.time() def hourly_cost_function(model, h): return ( sum(model.pCostHNS * model.vHNS[n, h] for n in model.n) + sum(model.pCostLocalHeatProd[n] * model.vLocalHeatProd[n, h] for n in model.n) + sum(model.pCostCentrHeatProd[hg] * model.vCentralHeatProd[hg, h] for hg in model.hg) + sum(model.pCostPumping * model.pPipeLength[n,m] * model.vMF[n, m, h] for (n, m) in model.pc) ) self.hourly_cost = Expression(self.h, rule=hourly_cost_function) # Objective function def objective_function(model): return ( sum(model.pCostDHConnect[n] * model.vDHconnect[n] for n in model.n) + sum(model.pCostCentralHeatProdInv[hg] * model.vCentralHeatProdInv[hg] for hg in model.hg) + sum(model.pCostTESInv[tes] * model.vTESCapacitivInv[tes] for tes in model.tes) + sum(model.pPipeCostIni * model.pPipeLength[n, m] * model.vBinBuildPipe[n, m] for (n, m) in model.pPipeLength) + sum(model.pPipeCostSlope * model.pPipeLength[n, m] * model.vPipeMassFlowInv[n, m] for (n, m) in model.pPipeLength) + # tbd add variable slope price sum(model.hourly_cost[h] for h in model.h) ) self.obj = Objective(rule=objective_function, sense=minimize) if self.logging: self.logger.info("Added objective function - {:.2f}".format(time.time() - start)) print("Done: add objective function") print("Time: ", time.time() - start) start = time.time() # Energy balance in kW for ever heat node and hour def energy_balance_rule(model, n, h): return ( model.vMFConsumption[n, h] == (model.pHeatDemand[n, h] - model.vLocalHeatProd[n, h]) / ((model.pTsupply - model.pTreturn) * model.pCWater) ) self.energy_balance = Constraint(self.n, self.h, rule=energy_balance_rule) if self.logging: self.logger.info("Added energy balance - {:.2f}".format(time.time() - start)) print("Done: add energy balance") print("Time: ", time.time() - start) start = time.time() # Limits use of distrct heating only to connected nodes and to a maximum power (as a specific bigM) def max_dh_power_rule(model, n, h): return ( model.vMFConsumption[n, h] <= model.vDHconnect[n] * model.pMaxDHPower[n] / ((model.pTsupply - model.pTreturn) * model.pCWater) ) self.max_dh_power = Constraint(self.n, self.h, rule=max_dh_power_rule) if self.logging: self.logger.info("Added max. district heating consumption - {:.2f}".format(time.time() - start)) print("Done: add max dh power") print("Time: ", time.time() - start) start = time.time() # Mass flow balacne between all nodes def mf_balance_rule(model, n, h): aux_pc_out = set((i, j) for i, j in model.pc if i == n) aux_pc_inf = set((j, i) for j, i in model.pc if i == n) return ( model.vMFConsumption[n, h] + sum(model.vTESCharge[tes, h] for tes in model.tes if (tes, n) in model.hgn) - model.vMFInjection[n, h] == # sum(model.vMF[m, n, h] for m in model.n if (m, n) in aux_pc) - sum(model.vMF[m, n, h] for m, _ in aux_pc_inf) - # sum(model.vMF[n, m, h] for m in model.n if (n, m) in aux_pc) sum(model.vMF[n, m, h] for _, m in aux_pc_out) ) self.mf_balance = Constraint(self.n, self.h, rule=mf_balance_rule) if self.logging: self.logger.info("Added mass flow balance system - {:.2f}".format(time.time() - start)) print("Done: add mf balance") print("Time: ", time.time() - start) start = time.time() # Mass flow injection into the network def mf_injection_rule(model, n, h): return ( model.vMFInjection[n,h] == sum(model.vCentralHeatProd[wh, h] for wh in model.wh if (wh, n) in model.hgn) / ((model.pTsupply - model.pTreturn) * model.pCWater) + sum(model.vCentralHeatProd[hb, h] for hb in model.hb if (hb, n) in model.hgn) / ((model.pTsupply - model.pTreturn) * model.pCWater) + sum(model.vCentralHeatProd[tes, h] for tes in model.tes if (tes, n) in model.hgn) / ((model.pTsupply - model.pTreturn) * model.pCWater) ) self.mf_injection = Constraint(self.n, self.h, rule=mf_injection_rule) if self.logging: self.logger.info("Added mass flow balance node - {:.2f}".format(time.time() - start)) print("Done: add mf injection") print("Time: ", time.time() - start) if self.pAllowDoubleHeating == 0: start = time.time() # Exclude decentral heating for nodes that are connected to the network def decentral_condition_rule(model, n, h): return ( model.vLocalHeatProd[n, h] <= (1 - model.vDHconnect[n]) * model.pMaxDHPower[n] ) self.decentral_condition = Constraint(self.n, self.h, rule=decentral_condition_rule) if self.logging: self.logger.info("Added local heating - {:.2f}".format(time.time() - start)) print('Info: No decentral heating allowed when building connected to the network') print("Done: add decentral condition") print("Time: ", time.time() - start) else: print('Info: Decentral heating allowed when building connected to the network!') start = time.time() # Limit the maximum power of the waste heat generation units def max_wh_power_rule(model, wh, h): return ( model.vCentralHeatProd[wh, h] <= model.pMaxWHMF[wh, h] * (model.pTsupply - model.pTreturn) * model.pCWater ) self.max_wh_power = Constraint(self.wh, self.h, rule=max_wh_power_rule) if self.logging: self.logger.info("Added max. waste heat - {:.2f}".format(time.time() - start)) print("Done: add max wh power") print("Time: ", time.time() - start) start = time.time() # calculate the heat production of the TES def tes_production_rule(model, tes, h): return ( model.vCentralHeatProd[tes, h] == model.vTESDischarge[tes, h] * (model.pTsupply - model.pTreturn) * model.pCWater ) self.tes_production = Constraint(self.tes, self.h, rule=tes_production_rule) if self.logging: self.logger.info("Added thermal storage production - {:.2f}".format(time.time() - start)) print("Done: add tes production") print("Time: ", time.time() - start) start = time.time() # Limit the maximum power to investments def max_power_invest_rule(model, hg, h): return ( model.vCentralHeatProd[hg, h] <= model.vCentralHeatProdInv[hg] ) self.max_power_invest = Constraint(self.hg, self.h, rule=max_power_invest_rule) if self.logging: self.logger.info("Added max. power invest - {:.2f}".format(time.time() - start)) print("Done: add max power invest") print("Time: ", time.time() - start) start = time.time() # Limit the size of the storage to its investments def tes_storage_invest_rule(model, tes, h): return ( model.vTESLevel[tes,h] <= model.vTESCapacitivInv[tes] ) self.tes_storage_invest = Constraint(self.tes, self.h, rule=tes_storage_invest_rule) if self.logging: self.logger.info("Added max. thermal storage invest - {:.2f}".format(time.time() - start)) print("Done: add tes storage invest") print("Time: ", time.time() - start) start = time.time() # Balance equation for the TES def tes_balance_rule(model, tes, h): if h == 1: return model.vTESLevel[tes, h] == model.vTESLevel[tes, len(model.h)] + model.vTESCharge[tes, h] - model.vTESDischarge[tes, h] return ( model.vTESLevel[tes, h] == model.vTESLevel[tes, h-1] * (1 - model.pTESlosses) + model.vTESCharge[tes, h] - model.vTESDischarge[tes, h] ) self.tes_balance = Constraint(self.tes, self.h, rule=tes_balance_rule) if self.logging: self.logger.info("Added thermal storage balance - {:.2f}".format(time.time() - start)) print("Done: add tes balance") print("Time: ", time.time() - start) start = time.time() # Limit mass flow through the pipes def max_mass_flow_rule(model, n, m, h): return ( model.vMF[n, m, h] <= model.vBinBuildPipe[n, m] * model.pMassFlowIni + model.vPipeMassFlowInv[n, m] ) self.max_mass_flow = Constraint(self.pc, self.h, rule=max_mass_flow_rule) def logic_mass_flow_rule(model, n, m): return ( model.vPipeMassFlowInv[n, m] <= model.vBinBuildPipe[n, m] * 1e4 ) self.logic_mass_flow = Constraint(self.pc, rule=logic_mass_flow_rule) if self.logging: self.logger.info("Added mass flow pipe - {:.2f}".format(time.time() - start)) print("Done: build max mass flow") print("Time: ", time.time() - start) if self.logging: self.logger.info("#############################") self.logger.info("End: initialize constraints") self.logger.info("#############################")
[docs] def model_run(self, config: dict): """Run the model to solve the optimization problem. The solver can be defined here. """ if self.logging: self.logger.info("#############################") self.logger.info("Start: model solve") self.logger.info("#############################") solver = config['solver'] mip_gap = config['mip_gap'] if solver is None: print("Warning: No solver defined, using default 'appsi_highs'") solver = 'appsi_highs' start = time.time() print("Start: solve model") if solver == 'gurobi': solver = SolverFactory('gurobi') results = solver.solve(self, tee=True, options={'MIPGap': mip_gap, 'Threads':os.cpu_count()}) elif solver == 'highs': solver = SolverFactory('appsi_highs') results = solver.solve(self, tee=True, options={'mip_rel_gap': mip_gap, 'threads':os.cpu_count()}) else: print("Error: Solver not recognized") return None print("Done: solve model") print("Time: ", time.time() - start) if self.logging: self.logger.info("#############################") self.logger.info("End: model solve") self.logger.info("#############################") return results
[docs] def export_results(self, case_study_name: str, model_name: str, config: dict): """Perform some ex-post calculations and export the results to csv files. :param case_study_name: name of the case study :type case_study_name: str :param model_name: name of the scenario / model :type model_name: str :param config: configuration dictionary :type config: dict """ print("Start: export results") # convert the DH connection to a dataframe df_dh_connection = pd.DataFrame() df_dh_connection['Node'] = self.n df_dh_connection['DH Connection'] = [self.vDHconnect[n].value for n in self.n] # convert the pipe connections to a dataframe df_pipe_connections = pd.DataFrame() df_pipe_connections['from'] = [n for (n, m) in self.pc] df_pipe_connections['to'] = [m for (n, m) in self.pc] df_pipe_connections['Build Pipe'] = [self.vBinBuildPipe[n, m].value for (n, m) in self.pc] df_pipe_connections['Max Mass Flow'] = [max(self.vMF[n, m, h].value for h in self.h) for (n, m) in self.pc] df_pipe_connections['Mass Flow Investment'] = [self.vBinBuildPipe[n, m].value * self.pMassFlowIni + self.vPipeMassFlowInv[n, m].value for (n, m) in self.pc] # convert the heat generation time series to a dataframe data = [] for hg in self.hg: for h in self.h: data.append([hg, h, self.vCentralHeatProd[hg, h].value]) df_heat_gen = pd.DataFrame(data, columns=['Generation Unit', 'Hour', 'Heat Production']) df_heat_gen = df_heat_gen.pivot(index='Generation Unit', columns='Hour', values='Heat Production').reset_index() df_heat_gen.loc['Local Heat Production', 'Generation Unit'] = 'Dezentral Heat Production' for h in self.h: df_heat_gen.loc['Local Heat Production', h] = sum(self.vLocalHeatProd[n, h].value for n in self.n) # write economic results to a dataframe df_economic_results = pd.DataFrame({ 'Costs in k€': [ 'Total Costs', 'DH Connection Cost', 'HNS Cost', 'Local Heat Production Cost', 'Central Heat Production Cost', 'Central Heat Production Investment Cost', 'TES Investment Cost', 'Pumping Cost', 'Pipe Base Investment Cost', 'Pipe Slope Investment Cost' ], 'Value': [ round(value(self.obj)/1e3,1), round(sum(self.pCostDHConnect[n] * self.vDHconnect[n].value for n in self.n if self.vDHconnect[n].value is not None)/1e3,1), round(sum(self.pCostHNS * self.vHNS[n, h].value for n in self.n for h in self.h if self.vHNS[n, h].value is not None)/1e3,1), round(sum(self.pCostLocalHeatProd[n] * self.vLocalHeatProd[n, h].value for n in self.n for h in self.h if self.vLocalHeatProd[n, h].value is not None)/1e3,1), round(sum(self.pCostCentrHeatProd[hg] * self.vCentralHeatProd[hg, h].value for hg in self.hg for h in self.h if self.vCentralHeatProd[hg, h].value is not None)/1e3,1), round(sum(self.pCostCentralHeatProdInv[hg] * self.vCentralHeatProdInv[hg].value for hg in self.hg if self.vCentralHeatProdInv[hg].value is not None)/1e3,1), round(sum(self.pCostTESInv[tes] * self.vTESCapacitivInv[tes].value for tes in self.tes if self.vTESCapacitivInv[tes].value is not None)/1e3,1), round(sum(self.pCostPumping * self.pPipeLength[n,m] * self.vMF[n, m, h].value for (n, m) in self.pc for h in self.h if self.vMF[n, m, h].value is not None)/1e3,1), round(sum(self.pPipeCostIni * self.pPipeLength[n, m] * self.vBinBuildPipe[n, m].value for (n, m) in self.pc if self.vBinBuildPipe[n, m].value is not None)/1e3,1), round(sum(self.pPipeCostSlope * self.pPipeLength[n, m] * self.vPipeMassFlowInv[n, m].value for (n, m) in self.pc if self.vPipeMassFlowInv[n, m].value is not None)/1e3,1) ] }) # write the investment decisions per technology to a dataframe df_investment_decisions = pd.DataFrame(columns=['Generation Unit', 'Type', 'Capacity Investments / kW', 'Storage Investments / m3']) # add if the generation unit is a boiler, waste heat or TES for hg in (self.hg): df_investment_decisions.loc[hg,'Generation Unit'] = hg df_investment_decisions.loc[hg,'Type'] = 'Boiler' if hg in self.hb else 'Waste Heat' if hg in self.wh else 'TES' if hg in self.tes else 'Central Heat' df_investment_decisions.loc[hg,'Capacity Investments / kW'] = self.vCentralHeatProdInv[hg].value df_investment_decisions.loc[hg,'Storage Investments / m3'] = self.vTESCapacitivInv[hg].value if hg in self.tes else 0 output_folder = os.path.join(case_study_name, config['scenario_dir'], model_name, config['expost_dir']) if not os.path.exists(output_folder): os.makedirs(output_folder) # write the economic results to a csv file df_economic_results.to_csv(os.path.join(output_folder, 'Economic_Results.csv'), sep=';', index=False) df_investment_decisions.to_csv(os.path.join(output_folder, 'Investment_Decisions.csv'), sep=';', index=False) # write the results to a csv file df_dh_connection.to_csv(os.path.join(output_folder,'DH_Connection.csv'), sep=';', index=False) df_pipe_connections.to_csv(os.path.join(output_folder,'Pipe_Connections.csv'), sep=';', index=False) df_heat_gen.to_csv(os.path.join(output_folder,'Heat_Generation_TS.csv'), sep=';', index=False) print("Done: export results")
[docs] def export_model_variables(self): """" This function exports the variables from a pyomo model to a dictionary of dataframes. Does not depend on the model's structure, all the data is extracted from the model's components """ # a dictionary of dataframes to store all the variables data, includingg their indices and values. # The keys are the variables' names d_vars = {} # get all the variables from the model variables = self.component_objects(Var) for v in variables: # get the variable's representation inside the model var_object = getattr(self, str(v)) # create a dictionary to store the variable's data: its index set and its value aux_d = {} aux_d['value'] = [] # get the indices of the variable idx_elements = list(var_object.index_set()) aux_data = v.get_values() aux_data_df = pd.DataFrame({'indices': list(aux_data.keys()), 'value': list(aux_data.values())}) # if it is a tuple, the variable has more than one index set if isinstance(idx_elements[0], tuple): # count the number of index sets len_test_elem = len(idx_elements[0]) aux_cols_idx = ['index_set_' + str(i+1) for i in range(0, len_test_elem)] aux_data_df[aux_cols_idx] = pd.DataFrame(aux_data_df['indices'].tolist(), index=aux_data_df.index) aux_data_df.drop(columns=['indices'], inplace=True) # if it is not a tuple, the variable one index set and we can the values directly else: aux_d['index_set_1'] = [] aux_cols_idx = ['index_set_1'] aux_data_df[aux_cols_idx] = pd.DataFrame(aux_data_df['indices'].tolist(), index=aux_data_df.index) aux_data_df.drop(columns=['indices'], inplace=True) # store the dataframe in the dictionary with all the other variables d_vars[str(v)] = aux_data_df return d_vars
[docs] def export_model_parameters(self): """This function exports the parameters from a pyomo model to a dictionary of dataframes. Does not depend on the model's structure, all the data is extracted from the model's components. """ # a dictionary of dataframes to store all the variables data, including their indices and values. # The keys are the variables' names d_params = {} # get all the variables from the model parameters = self.component_objects(Param) for p in parameters: param_object = getattr(self, str(p)) # create a dictionary to store the variable's data: its index set and its value aux_d = {} aux_d['value'] = [] # get the indices of the variable idx_elements = list(param_object.index_set()) aux_data = p.values() aux_data_df = pd.DataFrame({'indices': idx_elements, 'value': list(aux_data)}) # if it is a tuple, the variable has more than one index set if isinstance(idx_elements[0], tuple): # count the number of index sets len_test_elem = len(idx_elements[0]) aux_cols_idx = ['index_set_' + str(i+1) for i in range(0, len_test_elem)] aux_data_df[aux_cols_idx] = pd.DataFrame(aux_data_df['indices'].tolist(), index=aux_data_df.index) aux_data_df.drop(columns=['indices'], inplace=True) # if it is not a tuple, the variable one index set and we can the values directly else: aux_d['index_set_1'] = [] aux_cols_idx = ['index_set_1'] aux_data_df[aux_cols_idx] = pd.DataFrame(aux_data_df['indices'].tolist(), index=aux_data_df.index) aux_data_df.drop(columns=['indices'], inplace=True) # store the dataframe in the dictionary with all the other variables d_params[str(v)] = aux_data_df return d_params
[docs] def run_model(case_study_name: str, model_name: str): """Run the complete workflow to load data, fill the model, run the optimization and export the results. This function is the main entry point for running the model. :param case_study_name: the name of the case study :type case_study_name: str :param model_name: the name of the model :type model_name: str """ config = load_config() logger = utils.create_logger(case_study_name + "_" + model_name, config['log_dir']) dh_model = HeatNetworkModel(case_study_name, logger) input_dict = load_data_from_disk(case_study_name,model_name, config) error = False if input_dict['heat_nodes'] is None: logger.error("Heat nodes data is missing") error = True if input_dict['heat_gen_units'] is None: logger.error("Heat generation units data is missing") error = True assert False, "Heat generation units data is missing" if input_dict['heat_demand'] is None: logger.error("Heat demand data is missing") error = True assert False, "Heat demand data is missing" if input_dict['waste_heat_prof'] is None: logger.warning("Waste heat profile data is missing") if error: assert False, "Missing input data (e.g., heat nodes, heat generation units, heat demand)" input_dict['heat_nodes'].rename(columns={'cluster_id': 'node'}, inplace=True) input_dict['heat_nodes'].fillna(0, inplace=True) input_dict['heat_gen_units'].rename(columns={'unit': 'source'}, inplace=True) input_dict['heat_gen_units'].rename(columns={'O&M and Fuel Costs': 'OMVarCost'}, inplace=True) input_dict['heat_gen_units'].rename(columns={'Power Investment Costs': 'PowerInvCost'}, inplace=True) input_dict['heat_gen_units'].rename(columns={'Storage Investment Costs': 'StorageInvCost'}, inplace=True) if input_dict['waste_heat_prof'] is not None: input_dict['waste_heat_prof'].rename(columns={'unit': 'source'}, inplace=True) # Even zero demand is needed to create the balance at each node heat_gen_units = input_dict['heat_gen_units'].source.values for hg in heat_gen_units: input_dict['heat_demand']['heat_unit_'+str(hg)] = 0 dh_model.fill_model_data(input_dict) dh_model.initialize_variables() dh_model.initialize_constraints() slvr_res = dh_model.model_run(config) logger.info("MODEL SOLUTION") logger.info("==============") logger.info(str(slvr_res)) res_dict = dh_model.export_model_variables() output_folder = os.path.join(case_study_name, config['scenario_dir'], model_name, config['output_dir']) if not os.path.exists(output_folder): os.makedirs(output_folder) for k, v in res_dict.items(): v.to_csv(os.path.join(output_folder, k + '.csv'), sep=',', index=False) print("Exported: ", k) dh_model.export_results(case_study_name, model_name, config) logging.shutdown()
if __name__ == "__main__": case_study_name = 'Puertollano' model_name = 'realistic_costs' config = load_config() logger = utils.create_logger(case_study_name + "_" + model_name, config['log_dir']) dh_model = HeatNetworkModel(case_study_name, logger) input_dict = load_data_from_disk(case_study_name,model_name, config) error = False if input_dict['heat_nodes'] is None: logger.error("Heat nodes data is missing") error = True if input_dict['heat_gen_units'] is None: logger.error("Heat generation units data is missing") error = True assert False, "Heat generation units data is missing" if input_dict['heat_demand'] is None: logger.error("Heat demand data is missing") error = True assert False, "Heat demand data is missing" if input_dict['waste_heat_prof'] is None: logger.warning("Waste heat profile data is missing") if error: assert False, "Missing input data (e.g., heat nodes, heat generation units, heat demand)" input_dict['heat_nodes'].rename(columns={'cluster_id': 'node'}, inplace=True) input_dict['heat_nodes'].fillna(0, inplace=True) input_dict['heat_gen_units'].rename(columns={'unit': 'source'}, inplace=True) input_dict['heat_gen_units'].rename(columns={'O&M and Fuel Costs': 'OMVarCost'}, inplace=True) input_dict['heat_gen_units'].rename(columns={'Power Investment Costs': 'PowerInvCost'}, inplace=True) input_dict['heat_gen_units'].rename(columns={'Storage Investment Costs': 'StorageInvCost'}, inplace=True) if input_dict['waste_heat_prof'] is not None: input_dict['waste_heat_prof'].rename(columns={'unit': 'source'}, inplace=True) # Even zero demand is needed to create the balance at each node heat_gen_units = input_dict['heat_gen_units'].source.values for hg in heat_gen_units: input_dict['heat_demand']['heat_unit_'+str(hg)] = 0 dh_model.fill_model_data(input_dict) dh_model.initialize_variables() dh_model.initialize_constraints() slvr_res = dh_model.model_run(config) logger.info("MODEL SOLUTION") logger.info("==============") logger.info(str(slvr_res)) res_dict = dh_model.export_model_variables() output_folder = os.path.join(case_study_name, config['scenario_dir'], model_name, config['output_dir']) if not os.path.exists(output_folder): os.makedirs(output_folder) for k, v in res_dict.items(): v.to_csv(os.path.join(output_folder, k + '.csv'), sep=',', index=False) print("Exported: ", k) dh_model.export_results(case_study_name, model_name, config) logging.shutdown()