Source code for topfarm.constraint_components.spacing

import numpy as np
from numpy import newaxis as na
from topfarm.constraint_components import Constraint, ConstraintComponent
import topfarm


[docs]class SpacingConstraint(Constraint):
[docs] def __init__(self, min_spacing, units=None, aggregation_function=None, full_aggregation=False, name='spacing_comp'): """Initialize SpacingConstraint Parameters ---------- min_spacing : int or float Minimum spacing between turbines [m] aggregation_function : topfarm.utils.AggregationFunction or None if None: compute returns all wt-wt spacings (n_wt *(n_wt-1))/2 if AggregationFunction: compute returns an aggregated (minimum) spacing """ self.min_spacing = min_spacing self.aggregation_function = aggregation_function self.full_aggregation = full_aggregation self.const_id = name self.units = units
@property def constraintComponent(self): return self.spacing_comp def _setup(self, problem): self.n_wt = problem.n_wt self.spacing_comp = SpacingComp(self.n_wt, self.min_spacing, self.const_id, self.units, aggregation_function=self.aggregation_function, full_aggregation=self.full_aggregation) problem.model.constraint_group.add_subsystem(self.const_id, self.spacing_comp, promotes=[topfarm.x_key, topfarm.y_key, 'wtSeparationSquared']) def setup_as_constraint(self, problem): self._setup(problem) problem.model.add_constraint('wtSeparationSquared', lower=self.min_spacing**2) def setup_as_penalty(self, problem): self._setup(problem)
class SpacingComp(ConstraintComponent): """ Calculates inter-turbine spacing for all turbine pairs. """ def __init__(self, n_wt, min_spacing, const_id=None, units=None, aggregation_function=None, full_aggregation=False): super().__init__() self.n_wt = n_wt self.min_spacing = min_spacing self.const_id = const_id if aggregation_function: if full_aggregation: self.veclen = 1 else: self.veclen = n_wt else: self.veclen = int((n_wt - 1.) * n_wt / 2.) self.units = units self.aggregation_function = aggregation_function self.full_aggregation = full_aggregation self.constraint_key = 'wtSeparationSquared' def setup(self): # Explicitly size input arrays self.add_input(topfarm.x_key, val=np.zeros(self.n_wt), desc='x coordinates of turbines in wind dir. ref. frame', units=self.units) self.add_input(topfarm.y_key, val=np.zeros(self.n_wt), desc='y coordinates of turbines in wind dir. ref. frame', units=self.units) # self.add_output('constraint_violation_' + self.const_id, val=0.0) # Explicitly size output array self.add_output(self.constraint_key, val=np.zeros(self.veclen), desc='spacing of all turbines in the wind farm') col_pairs = np.array([(i, j) for i in range(self.n_wt - 1) for j in range(i + 1, self.n_wt)]) if self.aggregation_function: self.declare_partials(self.constraint_key, [topfarm.x_key, topfarm.y_key]) self.partial_indices = np.array([np.r_[np.where(col_pairs[:, 1] == i)[0], np.where(col_pairs[:, 0] == i)[0]] for i in range(self.n_wt)]).T self.partial_sign = (np.ones((self.n_wt, self.n_wt)) - 2 * np.triu(np.ones((self.n_wt, self.n_wt)), 1))[:-1] self.col_pairs = col_pairs else: # Sparse partial declaration cols = col_pairs.flatten() rows = np.repeat(np.arange(self.veclen), 2) self.declare_partials(self.constraint_key, [topfarm.x_key, topfarm.y_key], rows=rows, cols=cols) def compute(self, inputs, outputs): self.x = inputs[topfarm.x_key] self.y = inputs[topfarm.y_key] separation_squared = self._compute(self.x, self.y) if self.aggregation_function: if self.full_aggregation: outputs[self.constraint_key] = self.aggregation_function(separation_squared) else: outputs[self.constraint_key] = self.aggregation_function( separation_squared[self.partial_indices], 0) # print(outputs[self.constraint_key]) else: outputs[self.constraint_key] = separation_squared # outputs['constraint_violation_' + self.const_id] = -np.minimum(separation_squared - self.min_spacing**2, 0).sum() def _compute(self, x, y): n_wt = self.n_wt # compute distance matrixes dX, dY = [np.subtract(*np.meshgrid(xy, xy, indexing='ij')).T for xy in [x, y]] dXY2 = dX**2 + dY**2 # return upper triangle (above diagonal) return dXY2[np.triu_indices(n_wt, 1)] def compute_partials(self, inputs, J): # obtain necessary inputs x = inputs[topfarm.x_key] y = inputs[topfarm.y_key] # gradient of spacing [(i,j) for i=0..n_wt-1 and j=i+1..n_wt] wrt (wt_x[i], wt_x[j]) and (wt_y[i], wt_y[j]) dS_dxij, dS_dyij = self._compute_partials(x, y) if self.aggregation_function: if self.full_aggregation: # gradient of aggregated (minimum) spacing wrt. spacing(i,j) dSagg_dS = self.aggregation_function.gradient(self._compute(x, y)).flatten() # partial_indices extracts the spacing elements that each wt contributes to # partial sign gives it the right sign # and finally we sum the contributions of each wt J[self.constraint_key, topfarm.x_key] = ( (dS_dxij[:, 0] * dSagg_dS)[self.partial_indices] * self.partial_sign).sum(0).T J[self.constraint_key, topfarm.y_key] = ( (dS_dyij[:, 0] * dSagg_dS)[self.partial_indices] * self.partial_sign).sum(0).T else: # gradient of aggregated (minimum) spacing wrt. spacing(i,j) dSdwtx = (dS_dxij[:, 0])[self.partial_indices] * self.partial_sign dSdwty = (dS_dyij[:, 0])[self.partial_indices] * self.partial_sign S = self._compute(x, y)[self.partial_indices] dSagg_dS = self.aggregation_function.gradient(S, 0) # partial_indices extracts the spacing elements that each wt contributes to # partial sign gives it the right sign # and finally we sum the contributions of each wt dSagg_dwtx = dSdwtx * dSagg_dS * self.partial_sign dSagg_dwty = dSdwty * dSagg_dS * self.partial_sign dSagg_dx = np.zeros((self.n_wt, self.n_wt)) # np.diag((dSagg_dwtx).sum(0)) dSagg_dy = np.zeros((self.n_wt, self.n_wt)) # np.diag((dSagg_dwty).sum(0)) i = range(self.n_wt) for j in range(dSagg_dwtx.shape[0]): ai, bi = self.col_pairs[self.partial_indices][j, i, :].T dSagg_dx[ai, i] += dSagg_dwtx[j, i] dSagg_dx[bi, i] -= dSagg_dwtx[j, i] dSagg_dy[ai, i] += dSagg_dwty[j, i] dSagg_dy[bi, i] -= dSagg_dwty[j, i] J[self.constraint_key, topfarm.x_key] = dSagg_dx.T J[self.constraint_key, topfarm.y_key] = dSagg_dy.T else: # populate Jacobian dict J[self.constraint_key, topfarm.x_key] = dS_dxij.flatten() J[self.constraint_key, topfarm.y_key] = dS_dyij.flatten() def _compute_partials(self, x, y): # get number of turbines n_wt = self.n_wt # S = (xi-xj)^2 + (yi-yj)^2 = dx^2 + dy^2 # dS/dx = [dS/dx_i, dS/dx_j] = -2dx, 2dx # compute distance matrixes dX, dY = [np.subtract(*np.meshgrid(xy, xy, indexing='ij')).T for xy in [x, y]] # upper triangle -> 1 row per WT pair [(0,1), (0,2),..(n-1,n)] dx, dy = dX[np.triu_indices(n_wt, 1)], dY[np.triu_indices(n_wt, 1)] dSdx = np.array([-2 * dx, 2 * dx]).T dSdy = np.array([-2 * dy, 2 * dy]).T return dSdx, dSdy def plot(self, ax=None): from matplotlib.pyplot import Circle import matplotlib.pyplot as plt ax = ax or plt.gca() def get_xy(xy): if not hasattr(self, xy): setattr(self, xy, dict(self.list_inputs(out_stream=None))[f'constraint_group.{self.name}.{xy}']['value']) xy = getattr(self, xy) return xy if not isinstance(xy, tuple) else xy[0] for x, y in zip(get_xy('x'), get_xy('y')): circle = Circle((x, y), self.min_spacing / 2, color='k', ls='--', fill=False) ax.add_artist(circle) def satisfy(self, state, n_iter=100, step_size=0.1): x, y = [state[xy].astype(float) for xy in [topfarm.x_key, topfarm.y_key]] pair_i, pair_j = np.triu_indices(len(x), 1) for _ in range(n_iter): dist = self._compute(x, y) dx, dy = self._compute_partials(x, y) index = np.argmin(dist) if dist[index] < self.min_spacing**2: i, j = pair_i[index], pair_j[index] x[i] += dx[index, 0] * step_size x[j] += dx[index, 1] * step_size y[i] += dy[index, 0] * step_size y[j] += dy[index, 1] * step_size else: break state.update({topfarm.x_key: x, topfarm.y_key: y}) return state class SpacingTypeConstraint(SpacingConstraint): def __init__(self, min_spacing, units=None, aggregation_function=None, full_aggregation=False, name='spacing_type_comp'): """Initialize SpacingConstraint Parameters ---------- min_spacing : array_like Minimum spacing around turbines [m] (diameter of circle around a turbine that no other turbines can occupy) aggregation_function : topfarm.utils.AggregationFunction or None if None: compute returns all wt-wt spacings (n_wt *(n_wt-1))/2 if AggregationFunction: compute returns an aggregated (minimum) spacing """ super().__init__(min_spacing=min_spacing, units=units, aggregation_function=aggregation_function, full_aggregation=full_aggregation, name=name) self.min_spacing = np.asarray(min_spacing) def _setup(self, problem): self.n_wt = problem.n_wt self.spacing_comp = SpacingTypeComp(self.n_wt, self.min_spacing, self.const_id, self.units, aggregation_function=self.aggregation_function, full_aggregation=self.full_aggregation) problem.model.constraint_group.add_subsystem(self.const_id, self.spacing_comp, promotes=[topfarm.x_key, topfarm.y_key, topfarm.type_key, 'wtRelativeSeparationSquared']) def setup_as_constraint(self, problem): self._setup(problem) problem.model.add_constraint('wtRelativeSeparationSquared', lower=0) class SpacingTypeComp(SpacingComp): """ Calculates inter-turbine spacing for all turbine pairs. """ def __init__(self, n_wt, min_spacing, const_id=None, units=None, aggregation_function=None, full_aggregation=False, types=None): super().__init__(n_wt=n_wt, min_spacing=min_spacing, const_id=const_id, units=units, aggregation_function=aggregation_function, full_aggregation=full_aggregation) self.constraint_key = 'wtRelativeSeparationSquared' self.types = types def setup(self): super().setup() self.add_input(topfarm.type_key, val=self.types or np.zeros(self.n_wt), desc='turbine type number') def compute(self, inputs, outputs): self.x = inputs[topfarm.x_key] self.y = inputs[topfarm.y_key] self.type = inputs[topfarm.type_key] relative_separation_squared = self._compute(self.x, self.y, self.type) if self.aggregation_function: if self.full_aggregation: outputs[self.constraint_key] = self.aggregation_function(relative_separation_squared) else: outputs[self.constraint_key] = self.aggregation_function( relative_separation_squared[self.partial_indices], 0) # print(outputs[self.constraint_key]) else: outputs[self.constraint_key] = relative_separation_squared # outputs['constraint_violation_' + self.const_id] = -np.minimum(relative_separation_squared, 0).sum() def get_min_eff_spacing(self, t): return (self.min_spacing[np.atleast_1d(t).astype(int)][:, na] + self.min_spacing[np.atleast_1d(t).astype(int)][na, :]) / 2 def _compute(self, x, y, t): n_wt = self.n_wt # compute distance matrixes dX, dY = [np.subtract(*np.meshgrid(xy, xy, indexing='ij')).T for xy in [x, y]] dXY2 = dX**2 + dY**2 - self.get_min_eff_spacing(t)**2 # return upper triangle (above diagonal) return dXY2[np.triu_indices(n_wt, 1)] def plot(self, ax=None): from matplotlib.pyplot import Circle import matplotlib.pyplot as plt ax = ax or plt.gca() def get_xy(xy): if not hasattr(self, xy): setattr(self, xy, dict(self.list_inputs(out_stream=None))[f'constraint_group.{self.name}.{xy}']['value']) xy = getattr(self, xy) return xy if not isinstance(xy, tuple) else xy[0] for x, y, t in zip(get_xy('x'), get_xy('y'), get_xy('type')): circle = Circle((x, y), self.get_min_eff_spacing(t).ravel() / 2, color='k', ls='--', fill=False) ax.add_artist(circle) def satisfy(self, state, n_iter=100, step_size=0.1): x, y, t = [state[xy].astype(float) for xy in [topfarm.x_key, topfarm.y_key, topfarm.type_key]] pair_i, pair_j = np.triu_indices(len(x), 1) for _ in range(n_iter): dist = self._compute(x, y, t) dx, dy = self._compute_partials(x, y) index = np.argmin(dist) if dist[index] < self.get_min_eff_spacing(t)[np.triu_indices(self.n_wt, 1)][index]**2: i, j = pair_i[index], pair_j[index] x[i] += dx[index, 0] * step_size x[j] += dx[index, 1] * step_size y[i] += dy[index, 0] * step_size y[j] += dy[index, 1] * step_size else: break state.update({topfarm.x_key: x, topfarm.y_key: y}) return state