Source code for topfarm.cost_models.cost_model_wrappers

from openmdao.core.explicitcomponent import ExplicitComponent
import numpy as np
import time
from _collections import defaultdict
# from topfarm.constraint_components.post_constraint import PostConstraint
import warnings


[docs]class CostModelComponent(ExplicitComponent): """Wrapper for pure-Python cost functions"""
[docs] def __init__(self, input_keys, n_wt, cost_function, cost_gradient_function=None, output_keys=["Cost"], output_unit="", additional_input=[], additional_output=[], max_eval=None, objective=True, maximize=False, output_vals=[0.0], input_units=[], step={}, use_constraint_violation=True, **kwargs): """Initialize wrapper for pure-Python cost function Parameters ---------- input_keys : list of str Inputs to the cost function n_wt : int Number of wind turbines cost_function : function handle Function to evaluate cost cost_gradient_function : function handle, optional Function to evaluate gradient of the cost function output_key : list Name of output key output_unit : str Units of output of cost function additional_input : list of str Other (non-design-variable) inputs required by the cost function\n Gradients will not be computed for these inputs additional_output : list of str or list of tuples Other outputs to request\n if list of str: ['add_out_name',...]\n if list of tuples [('add_out_name', val),...], where val is a template value of the output The cost function must return: cost, {'add_out1_name': add_out1, ...} max_eval : int Maximum number of function evaluations objective : boolean Must be True for standalone CostModelComponents and the final component in a TopFarmGroup maximize : boolean If True: objective is maximised during optimization\n If False: Objective is minimized during optimization output_vals : float or array_like Format of output input_units : list of str Units of the respective input_keys step : dict of {str : float} Finite difference step size for each input key, e.g. {input_key : step_size, input_key2 : step_size2} """ if 'income_model' in kwargs: warnings.warn("""income_model is deprecated; use keyword maximize instead""", DeprecationWarning, stacklevel=2) maximize = kwargs['income_model'] kwargs.pop('income_model') if 'output_key' in kwargs: warnings.warn("""output_key is deprecated; use keyword output_keys instead""", DeprecationWarning, stacklevel=2) output_keys = [kwargs['output_key']] kwargs.pop('output_key') else: self.output_key = output_keys[0] if 'output_val' in kwargs: warnings.warn("""output_val is deprecated; use keyword output_vals instead""", DeprecationWarning, stacklevel=2) output_vals = [kwargs['output_val']] kwargs.pop('output_val') super().__init__(**kwargs) assert isinstance(n_wt, int), n_wt self.input_keys = list(input_keys) self.input_keys_only = list([(i, i[0])[isinstance(i, tuple)] for i in self.input_keys]) self.additional_input = additional_input self.all_input_keys = self.input_keys_only + list([(i, i[0])[isinstance(i, tuple)] for i in self.additional_input]) self.cost_function = cost_function self.cost_gradient_function = cost_gradient_function self.n_wt = n_wt if not isinstance(output_keys, list): output_keys = [output_keys] if not isinstance(output_vals, list): output_vals = [output_vals] self.output_keys = output_keys self.out_keys_only = list([(o, o[0])[isinstance(o, tuple)] for o in self.output_keys]) self.output_unit = output_unit self.output_key = output_keys[0] if isinstance(self.output_key, tuple): self.output_key = self.output_key[0] self.additional_output = [((x, np.zeros(self.n_wt)), x)[isinstance(x, tuple)] for x in additional_output] self.max_eval = max_eval or 1e100 self.objective = objective if maximize: self.cost_factor = -1.0 else: self.cost_factor = 1.0 output_vals = output_vals[:len(output_keys)] + [output_vals[0]] * (len(output_keys) - len(output_vals)) # extend output_vals if it is not same length as output_keys self.output_vals = output_vals n_input = len(self.input_keys) + len(additional_input) self.input_units = (input_units + [None] * n_input)[:n_input] self.n_func_eval = 0 self.func_time_sum = 0 self.n_grad_eval = 0 self.grad_time_sum = 0 self.step = step self.use_constraint_violation = use_constraint_violation
def setup(self): for i, u in zip(self.input_keys + self.additional_input, self.input_units): if isinstance(i, tuple) and len(i) == 2: self.add_input(i[0], val=i[1], units=u) else: self.add_input(i, val=np.zeros(self.n_wt), units=u) if self.use_constraint_violation: self.add_input('constraint_violation', val=0.0) if self.objective: self.add_output('cost', val=0.0) self.add_output('cost_comp_eval', val=0.0) for o, v in zip(self.output_keys, self.output_vals): if isinstance(o, tuple) and len(o) == 2: self.add_output(o[0], val=o[1]) else: self.add_output(o, val=v) for key, val in self.additional_output: self.add_output(key, val=val) if self.cost_gradient_function: if self.objective: self.declare_partials('cost', self.input_keys_only, method='exact') for o in self.out_keys_only: self.declare_partials(o, self.input_keys_only, method='exact') else: if self.step == {}: if self.objective: self.declare_partials('cost', self.input_keys_only, method='fd') for o in self.out_keys_only: self.declare_partials(o, self.input_keys_only, method='fd') else: for i in self.input_keys_only: if self.objective: self.declare_partials('cost', i, step=self.step[i], method='fd') for o in self.out_keys_only: self.declare_partials(o, i, step=self.step[i], method='fd') @property def counter(self): counter = float(self.n_func_eval) if self.grad_time_sum > 0 and self.func_time_sum > 0 and self.n_grad_eval > 0 and self.n_func_eval > 0: ratio = ((self.grad_time_sum / self.n_grad_eval) / (self.func_time_sum / self.n_func_eval)) counter += self.n_grad_eval * max(ratio, 1) else: counter += self.n_grad_eval return int(counter) def compute(self, inputs, outputs): """Compute cost model""" if self.use_constraint_violation: if (inputs['constraint_violation'] > 1e5) and (self.n_func_eval > 0): return if self.counter >= self.max_eval: return t = time.time() if self.additional_output: c, additional_output = self.cost_function(**{x: inputs[x] for x in self.all_input_keys}) for k, v in additional_output.items(): outputs[k] = v else: c = self.cost_function(**{x: inputs[x] for x in self.all_input_keys}) if not isinstance(c, list): c = [c] if self.objective: outputs['cost'] = c[0] * self.cost_factor outputs['cost_comp_eval'] = self.counter for o, _c in zip(self.out_keys_only, c): outputs[o] = _c self.func_time_sum += time.time() - t self.n_func_eval += 1 def compute_partials(self, inputs, J): if self.counter >= self.max_eval: return if hasattr(self, 'skip_linearize'): if self.skip_linearize: return t = time.time() if self.cost_gradient_function: for k, dCostdk in zip(self.input_keys_only, self.cost_gradient_function(**{x: inputs[x] for x in self.all_input_keys})): if dCostdk is not None: if not isinstance(dCostdk, list): dCostdk = [dCostdk] if self.objective: J['cost', k] = dCostdk[0] * self.cost_factor for o, _d in zip(self.out_keys_only, dCostdk): J[o, k] = _d self.grad_time_sum += time.time() - t self.n_grad_eval += 1
[docs]class AEPCostModelComponent(CostModelComponent): """Wrapper for pure-Python cost functions"""
[docs] def __init__(self, input_keys, n_wt, cost_function, cost_gradient_function=None, output_unit="", additional_input=[], additional_output=[], max_eval=None, output_key="AEP", **kwargs): """Initialize Sub class of the CostModelComponent for AEP maximization Parameters ---------- input_keys : list of str Inputs to the cost function n_wt : int Number of wind turbines cost_function : function handle Function to evaluate cost cost_gradient_function : function handle, optional Function to evaluate gradient of the cost function output_key : list Name of output key, default is AEP output_unit : str Units of output of cost function additional_input : list of str Other (non-design-variable) inputs required by the cost function\n Gradients will not be computed for these inputs additional_output : list of str or list of tuples Other outputs to request\n if list of str: ['add_out_name',...]\n if list of tuples [('add_out_name', val),...], where val is a template value of the output The cost function must return: cost, {'add_out1_name': add_out1, ...} max_eval : int Maximum number of function evaluations """ CostModelComponent.__init__(self, input_keys, n_wt, cost_function, cost_gradient_function=cost_gradient_function, output_key=output_key, output_unit=output_unit, additional_input=additional_input, additional_output=additional_output, max_eval=max_eval, maximize=True, **kwargs)
[docs]class AEPMaxLoadCostModelComponent(CostModelComponent): """Wrapper for pure-Python cost functions"""
[docs] def __init__(self, input_keys, n_wt, aep_load_function, max_loads, aep_load_gradient=None, output_keys=["AEP", 'loads'], step={}, maximize=True, **kwargs): """Initialize Sub class of the CostModelComponent for AEP maximization and load constraints Parameters ---------- input_keys : list of str Inputs to the cost function n_wt : int Number of wind turbines aep_load_function : function handle Function to evaluate cost aep_load_gradient : function handle, optional Function to evaluate gradient of the cost function output_keys : list Name of output key, default is AEP and loads step : dict of {str : float} Finite difference step size for each input key, e.g. {input_key : step_size, input_key2 : step_size2} maximize : boolean If True: objective is maximised during optimization\n If False: Objective is minimized during optimization """ self.max_loads = max_loads def cost_function(**kwargs): aep, load = aep_load_function(**kwargs) return [aep, load] if aep_load_gradient: def gradient_function(**kwargs): d_aep, d_load = aep_load_gradient(**kwargs) return [[d_aep, d_load]] else: gradient_function = None # additional_output = kwargs.get('additional_output', []) + [('loads', max_loads)] CostModelComponent.__init__(self, input_keys, n_wt, cost_function=cost_function, cost_gradient_function=gradient_function, output_keys=output_keys, step=step, maximize=maximize, **kwargs) self.post_constraint = ('loads', {'upper': max_loads})
# PostConstraint.__init__(self, 'loads', upper=max_loads) # def setup(self): # AEPCostModelComponent.setup(self) # input_keys = list([(i, i[0])[isinstance(i, tuple)] for i in self.input_keys]) # self.declare_partials('loads', input_keys, method='fd')