Source code for topfarm.constraint_components.boundary

import numpy as np
from numpy import newaxis as na
from scipy.spatial import ConvexHull
from topfarm.constraint_components import Constraint, ConstraintComponent
from topfarm.utils import smooth_max, smooth_max_gradient, is_number
import topfarm
from shapely.geometry import Polygon, MultiPolygon, LineString
from shapely.ops import unary_union
import warnings
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import matplotlib.pyplot as plt
from matplotlib.pyplot import Circle


[docs] class XYBoundaryConstraint(Constraint):
[docs] def __init__(self, boundary, boundary_type='convex_hull', units=None, relaxation=False, **kwargs): """Initialize XYBoundaryConstraint Parameters ---------- boundary : array_like (n,2) or list of tuples (array_like (n,2), boolean) boundary coordinates. If boundary is array_like (n,2) it indicates a single boundary and can be used with boundary types: 'convex_hull', 'polygon', 'rectangle','square'. If boundary is list of tuples (array_like (n,2), boolean), it is multiple boundaries where the boolean is 1 for inclusion zones and 0 for exclusion zones and can be used with the boundary type: 'multi_polygon'. boundary_type : 'convex_hull', 'polygon', 'rectangle','square' - 'convex_hull' (default): Convex hul around boundary points\n - 'polygon': Polygon boundary (may be non convex). Less suitable for gradient-based optimization\n - 'rectangle': Smallest axis-aligned rectangle covering the boundary points\n - 'square': Smallest axis-aligned square covering the boundary points - 'multi_polygon': Mulitple polygon boundaries incl. exclusion zones (may be non convex).\n - 'turbine_specific': Set of multiple polygon boundaries that depend on the wind turbine type. \n """ if boundary_type == 'multi_polygon': self.zones = boundary self.boundary = np.asarray(self.zones[0].boundary) elif boundary_type == 'turbine_specific': self.zones = boundary assert 'turbines' in list(kwargs) self.turbines = kwargs['turbines'] self.boundary = np.asarray(self.zones[0].boundary) else: self.boundary = np.asarray(boundary) self.boundary_type = boundary_type self.const_id = 'xyboundary_comp_{}'.format(boundary_type) self.units = units self.relaxation = relaxation
def get_comp(self, n_wt): if not hasattr(self, 'boundary_comp'): if self.boundary_type == 'polygon': self.boundary_comp = PolygonBoundaryComp( n_wt, self.boundary, self.const_id, self.units, self.relaxation) elif self.boundary_type == 'multi_polygon': self.boundary_comp = MultiPolygonBoundaryComp(n_wt, self.zones, const_id=self.const_id, units=self.units, relaxation=self.relaxation) elif self.boundary_type == 'turbine_specific': self.boundary_comp = TurbineSpecificBoundaryComp(n_wt, self.turbines, self.zones, const_id=self.const_id, units=self.units, relaxation=self.relaxation) else: self.boundary_comp = ConvexBoundaryComp(n_wt, self.boundary, self.boundary_type, self.const_id, self.units) return self.boundary_comp @property def constraintComponent(self): assert hasattr( self, "boundary_comp" ), "Boundary component not initialized, call setup first" return self.boundary_comp def set_design_var_limits(self, design_vars): if self.boundary_type in ['multi_polygon', 'turbine_specific']: bound_min = np.vstack([(bound).min(0) for bound, _ in self.boundary_comp.boundaries]).min(0) bound_max = np.vstack([(bound).max(0) for bound, _ in self.boundary_comp.boundaries]).max(0) else: bound_min = self.boundary_comp.xy_boundary.min(0) bound_max = self.boundary_comp.xy_boundary.max(0) for k, l, u in zip([topfarm.x_key, topfarm.y_key], bound_min, bound_max): if k in design_vars: if len(design_vars[k]) == 4: design_vars[k] = (design_vars[k][0], np.maximum(design_vars[k][1], l), np.minimum(design_vars[k][2], u), design_vars[k][-1]) else: design_vars[k] = (design_vars[k][0], l, u, design_vars[k][-1]) def _setup(self, problem, group='constraint_group'): n_wt = problem.n_wt self.boundary_comp = self.get_comp(n_wt) self.boundary_comp.problem = problem self.set_design_var_limits(problem.design_vars) # problem.xy_boundary = np.r_[self.boundary_comp.xy_boundary, self.boundary_comp.xy_boundary[:1]] problem.indeps.add_output('xy_boundary', self.boundary_comp.xy_boundary) getattr(problem.model, group).add_subsystem('xy_bound_comp', self.boundary_comp, promotes=['*']) def setup_as_constraint(self, problem, group='constraint_group'): self._setup(problem, group=group) if problem.n_wt == 1: lower = 0 else: lower = self.boundary_comp.zeros problem.model.add_constraint('boundaryDistances', lower=lower) def setup_as_penalty(self, problem, group='constraint_group'): self._setup(problem, group=group)
[docs] class CircleBoundaryConstraint(XYBoundaryConstraint):
[docs] def __init__(self, center, radius): """Initialize CircleBoundaryConstraint Parameters ---------- center : (float, float) center position (x,y) radius : int or float circle radius """ self.center = np.array(center) self.radius = radius self.const_id = 'circle_boundary_comp_{}_{}'.format( '_'.join([str(int(c)) for c in center]), int(radius)).replace('.', '_')
def get_comp(self, n_wt): if not hasattr(self, 'boundary_comp'): self.boundary_comp = CircleBoundaryComp(n_wt, self.center, self.radius, self.const_id) return self.boundary_comp def set_design_var_limits(self, design_vars): for k, l, u in zip([topfarm.x_key, topfarm.y_key], self.center - self.radius, self.center + self.radius): if len(design_vars[k]) == 4: design_vars[k] = (design_vars[k][0], np.maximum(design_vars[k][1], l), np.minimum(design_vars[k][2], u), design_vars[k][-1]) else: design_vars[k] = (design_vars[k][0], l, u, design_vars[k][-1])
class BoundaryBaseComp(ConstraintComponent): def __init__(self, n_wt, xy_boundary=None, const_id=None, units=None, relaxation=False, **kwargs): super().__init__(**kwargs) self.n_wt = n_wt self.xy_boundary = np.array(xy_boundary) self.const_id = const_id self.units = units self.relaxation = relaxation if xy_boundary is not None and np.any(self.xy_boundary[0] != self.xy_boundary[-1]): self.xy_boundary = np.r_[self.xy_boundary, self.xy_boundary[:1]] def setup(self): # Explicitly size input arrays self.add_input(topfarm.x_key, np.zeros(self.n_wt), desc='x coordinates of turbines in global ref. frame', units=self.units) self.add_input(topfarm.y_key, np.zeros(self.n_wt), desc='y coordinates of turbines in global ref. frame', units=self.units) if self.relaxation: self.add_input('time', 0) if hasattr(self, 'types'): self.add_input('type', np.zeros(self.n_wt)) # self.add_output('constraint_violation_' + self.const_id, val=0.0) # Explicitly size output array # (vector with positive elements if turbines outside of hull) self.add_output('boundaryDistances', self.zeros, desc="signed perpendicular distances from each turbine to each face CCW; + is inside") self.declare_partials('boundaryDistances', [topfarm.x_key, topfarm.y_key]) if self.relaxation: self.declare_partials('boundaryDistances', 'time') # self.declare_partials('boundaryDistances', ['boundaryVertices', 'boundaryNormals'], method='fd') def compute(self, inputs, outputs): # calculate distances from each point to each face args = {x: inputs[x] for x in [topfarm.x_key, topfarm.y_key, topfarm.type_key] if x in inputs} boundaryDistances = self.distances(**args) outputs['boundaryDistances'] = boundaryDistances # outputs['constraint_violation_' + self.const_id] = np.sum(np.minimum(boundaryDistances, 0) ** 2) def compute_partials(self, inputs, partials): # return Jacobian dict if not self.relaxation: dx, dy = self.gradients(**{xy: inputs[k] for xy, k in zip('xy', [topfarm.x_key, topfarm.y_key])}) else: dx, dy, dt = self.gradients(**{xy: inputs[k] for xy, k in zip('xy', [topfarm.x_key, topfarm.y_key])}) partials['boundaryDistances', topfarm.x_key] = dx partials['boundaryDistances', topfarm.y_key] = dy if self.relaxation: partials['boundaryDistances', 'time'] = dt def plot(self, ax): """Plot boundary""" ax.plot( self.xy_boundary[:, 0].tolist() + [self.xy_boundary[0, 0]], self.xy_boundary[:, 1].tolist() + [self.xy_boundary[0, 1]], "k", linewidth=1, ) class ConvexBoundaryComp(BoundaryBaseComp): def __init__(self, n_wt, xy_boundary=None, boundary_type='convex_hull', const_id=None, units=None): self.boundary_type = boundary_type self.calculate_boundary_and_normals(xy_boundary) super().__init__(n_wt, self.xy_boundary, const_id, units) self.calculate_gradients() self.zeros = np.zeros([self.n_wt, self.nVertices]) def calculate_boundary_and_normals(self, xy_boundary): xy_boundary = np.asarray(xy_boundary) if self.boundary_type == 'convex_hull': # find the points that actually comprise a convex hull hull = ConvexHull(list(xy_boundary)) # keep only xy_vertices that actually comprise a convex hull and arrange in CCW order self.xy_boundary = xy_boundary[hull.vertices] elif self.boundary_type == 'square': min_ = xy_boundary.min(0) max_ = xy_boundary.max(0) range_ = (max_ - min_) x_c, y_c = min_ + range_ / 2 r = range_.max() / 2 self.xy_boundary = np.array([(x_c - r, y_c - r), (x_c + r, y_c - r), (x_c + r, y_c + r), (x_c - r, y_c + r)]) elif self.boundary_type == 'rectangle': min_ = xy_boundary.min(0) max_ = xy_boundary.max(0) range_ = (max_ - min_) x_c, y_c = min_ + range_ / 2 r = range_ / 2 self.xy_boundary = np.array([(x_c - r[0], y_c - r[1]), (x_c + r[0], y_c - r[1]), (x_c + r[0], y_c + r[1]), (x_c - r[0], y_c + r[1])]) else: raise NotImplementedError("Boundary type '%s' is not implemented" % self.boundary_type) # get the real number of xy_vertices self.nVertices = self.xy_boundary.shape[0] # initialize normals array unit_normals = np.zeros([self.nVertices, 2]) # determine if point is inside or outside of each face, and distances from each face for j in range(0, self.nVertices): # calculate the unit normal vector of the current face (taking points CCW) if j < self.nVertices - 1: # all but the set of point that close the shape normal = np.array([self.xy_boundary[j + 1, 1] - self.xy_boundary[j, 1], -(self.xy_boundary[j + 1, 0] - self.xy_boundary[j, 0])]) unit_normals[j] = normal / np.linalg.norm(normal) else: # the set of points that close the shape normal = np.array([self.xy_boundary[0, 1] - self.xy_boundary[j, 1], -(self.xy_boundary[0, 0] - self.xy_boundary[j, 0])]) unit_normals[j] = normal / np.linalg.norm(normal) self.unit_normals = unit_normals def calculate_gradients(self): unit_normals = self.unit_normals # initialize array to hold distances from each point to each face dfaceDistance_dx = np.zeros([self.n_wt * self.nVertices, self.n_wt]) dfaceDistance_dy = np.zeros([self.n_wt * self.nVertices, self.n_wt]) for i in range(0, self.n_wt): # determine if point is inside or outside of each face, and distances from each face for j in range(0, self.nVertices): # define the derivative vectors from the point of interest to the first point of the face dpa_dx = np.array([-1.0, 0.0]) dpa_dy = np.array([0.0, -1.0]) # find perpendicular distances derivatives from point to current surface (vector projection) ddistanceVec_dx = np.vdot(dpa_dx, unit_normals[j]) * unit_normals[j] ddistanceVec_dy = np.vdot(dpa_dy, unit_normals[j]) * unit_normals[j] # calculate derivatives for the sign of perpendicular distances from point to current face dfaceDistance_dx[i * self.nVertices + j, i] = np.vdot(ddistanceVec_dx, unit_normals[j]) dfaceDistance_dy[i * self.nVertices + j, i] = np.vdot(ddistanceVec_dy, unit_normals[j]) # return Jacobian dict self.dfaceDistance_dx = dfaceDistance_dx self.dfaceDistance_dy = dfaceDistance_dy def calculate_distance_to_boundary(self, points): """ :param points: points that you want to calculate the distances from to the faces of the convex hull :return face_distace: signed perpendicular distances from each point to each face; + is inside """ nPoints = np.array(points).shape[0] xy_boundary = self.xy_boundary[:-1] nVertices = xy_boundary.shape[0] vertices = xy_boundary unit_normals = self.unit_normals # initialize array to hold distances from each point to each face face_distance = np.zeros([nPoints, nVertices]) from numpy import newaxis as na # define the vector from the point of interest to the first point of the face PA = (vertices[:, na] - points[na]) # find perpendicular distances from point to current surface (vector projection) dist = np.sum(PA * unit_normals[:, na], 2) # calculate the sign of perpendicular distances from point to current face (+ is inside, - is outside) d_vec = dist[:, :, na] * unit_normals[:, na] face_distance = np.sum(d_vec * unit_normals[:, na], 2) return face_distance.T def distances(self, x, y): return self.calculate_distance_to_boundary(np.array([x, y]).T) def gradients(self, x, y): return self.dfaceDistance_dx, self.dfaceDistance_dy def satisfy(self, state, pad=1.1): x, y = [np.asarray(state[xyz], dtype=float) for xyz in [topfarm.x_key, topfarm.y_key]] dist = self.distances(x, y) dist = np.where(dist < 0, np.minimum(dist, -.01), dist) dx, dy = self.gradients(x, y) # independent of position dx = dx[:self.nVertices, 0] dy = dy[:self.nVertices, 0] for i in np.where(dist.min(1) < 0)[0]: # loop over turbines that violate edges # find smallest movement that where the constraints are satisfied d = dist[i] v = np.linspace(-np.abs(d.min()), np.abs(d.min()), 100) X, Y = np.meshgrid(v, v) m = np.ones_like(X) for dx_, dy_, d in zip(dx, dy, dist.T): m = np.logical_and(m, X * dx_ + Y * dy_ >= -d[i]) index = np.argmin(X[m]**2 + Y[m]**2) x[i] += X[m][index] y[i] += Y[m][index] state[topfarm.x_key] = x state[topfarm.y_key] = y return state class PolygonBoundaryComp(BoundaryBaseComp): def __init__(self, n_wt, xy_boundary, const_id=None, units=None, relaxation=False): self.nTurbines = n_wt self.const_id = const_id self.zeros = np.zeros(self.nTurbines) self.units = units self.boundary_properties = self.get_boundary_properties(xy_boundary) BoundaryBaseComp.__init__(self, n_wt, xy_boundary=self.boundary_properties[0], const_id=const_id, units=units, relaxation=relaxation) self._cache_input = None self._cache_output = None self.relaxation = relaxation def get_boundary_properties(self, xy_boundary, inclusion_zone=True): vertices = np.array(xy_boundary) def get_edges(vertices, counter_clockwise): if np.any(vertices[0] != vertices[-1]): vertices = np.r_[vertices, vertices[:1]] x1, y1 = A = vertices[:-1].T x2, y2 = B = vertices[1:].T double_area = np.sum((x1 - x2) * (y1 + y2)) # 2 x Area (+: counterclockwise assert double_area != 0, "Area must be non-zero" if (counter_clockwise and double_area < 0) or (not counter_clockwise and double_area > 0): # return get_edges(vertices[::-1], counter_clockwise) else: return vertices[:-1], A, B # inclusion zones are defined counter clockwise (unit-normal vector pointing in) while # exclusion zones are defined clockwise (unit-normal vector pointing out) xy_boundary, A, B = get_edges(vertices, inclusion_zone) dx, dy = AB = B - A AB_len = np.linalg.norm(AB, axis=0) edge_unit_normal = (np.array([-dy, dx]) / AB_len) # A_normal and B_normal are the normal vectors at the nodes A,B (the mean of the adjacent edge normal vectors A_normal = (edge_unit_normal + np.roll(edge_unit_normal, 1, 1)) / 2 B_normal = np.roll(A_normal, -1, 1) # for (x, y), (dx, dy), (unx, uny) in zip(A.T, AB.T, edge_unit_normal.T): # plt.arrow(x, y, dx, dy, color='k', head_width=.2) # plt.arrow(x, y, unx, uny, color='r', head_width=.2) # for (x, y), (nx, ny) in zip(A.T, A_normal.T): # plt.arrow(x, y, nx, ny, color='b', head_width=.2) # for (x, y), (nx, ny) in zip(B.T, B_normal.T): # plt.arrow(x, y, nx / 2, ny / 2, color='g', head_width=.2) return (xy_boundary, A, B, AB, AB_len, edge_unit_normal, A_normal, B_normal) def _calc_distance_and_gradients(self, x, y, boundary_properties=None): """ distances point, P=(x,y) to edge(A->B) +/-: inside/outside """ def vec_len(vec): return np.linalg.norm(vec, axis=0) boundary_properties = boundary_properties or self.boundary_properties[1:] A, B, AB, AB_len, edge_unit_normal, A_normal, B_normal = boundary_properties """ A: edge start point B: edge end point edge_unit_normal: unit vector perpendicular to edge pointing to the good side (i.e. inside for inclusion zones and outside for exclusion zones) AB: Vector from A to B (edge) AB_len: length of AB (edge) A_normal: mean of edge unit normal vectors adjacent to A B_normal: mean of edge unit normal vectors adjacent to B """ # Add dim to match (2, #P, #Edges), where the first dimension is (x,y) P = np.array([x, y])[:, :, na] A, B, AB = A[:, na], B[:, na], AB[:, na] edge_unit_normal, A_normal, B_normal = edge_unit_normal[:, na], A_normal[:, na], B_normal[:, na] AB_len = AB_len[na] # =============================================================================================================== # Determine if P is closer to A, B or the edge (between A and B) # =============================================================================================================== AP = P - A # vector from edge start to point BP = P - B # vector from edge end to point # signed component of AP on the edge vector a_tilde = np.sum(AP * AB, axis=0) / AB_len # a_tilde < 0: closer to A # a_tilde > |AB|: closer to B # else: closer to edge (between A and B) use_A = 0 > a_tilde use_B = a_tilde > AB_len # =============================================================================================================== # Calculate distance from P to closer point on edge # =============================================================================================================== # Perpendicular distances to edge (AP dot edge_unit_normal product). # This is the distance to the edge if not use_A or use_B distance = np.sum((AP) * edge_unit_normal, 0) # Update distance for points closer to A good_side_of_A = (np.sum((AP * A_normal)[:, use_A], 0) > 0) sign_use_A = np.where(good_side_of_A, 1, -1) distance[use_A] = (vec_len(AP[:, use_A]) * sign_use_A) # Update distance for points closer to B good_side_of_B = np.sum((BP * B_normal)[:, use_B], 0) > 0 sign_use_B = np.where(good_side_of_B, 1, -1) distance[use_B] = (vec_len(BP[:, use_B]) * sign_use_B) # =============================================================================================================== # Calculate gradient of distance from P to closer point on edge wrt. x and y # =============================================================================================================== # Gradient of perpendicular distances to edge. # This is the gradient if not use_A or use_B ddist_dxy = np.tile(edge_unit_normal, (1, len(x), 1)) # Update gradient for points closer to A or B eps = 1e-7 # avoid division by zero ddist_dxy[:, use_A] = sign_use_A * (AP[:, use_A] / (vec_len(AP[:, use_A]) + eps)) ddist_dxy[:, use_B] = sign_use_B * (BP[:, use_B] / (vec_len(BP[:, use_B]) + eps)) ddist_dX, ddist_dY = ddist_dxy return distance, ddist_dX, ddist_dY def calc_distance_and_gradients(self, x, y): if not np.shape([x, y]) == np.shape(self._cache_input): pass elif np.all(np.array([x, y]) == self._cache_input): return self._cache_output distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x, y) closest_edge_index = np.argmin(np.abs(distance), 1) self._cache_input = np.array([x, y]) self._cache_output = [ # pick only closest edge v[np.arange(len(closest_edge_index)), closest_edge_index] for v in [distance, ddist_dX, ddist_dY] ] return self._cache_output def distances(self, x, y): return self.calc_distance_and_gradients(x, y)[0] def gradients(self, x, y): _, dx, dy = self.calc_distance_and_gradients(x, y) return np.diagflat(dx), np.diagflat(dy) def satisfy(self, state, pad=1.1): x, y = [np.asarray(state[xy], dtype=float) for xy in [topfarm.x_key, topfarm.y_key]] dist = self.distances(x, y) dx, dy = map(np.diag, self.gradients(x, y)) m = dist < 0 x[m] -= dx[m] * dist[m] * pad y[m] -= dy[m] * dist[m] * pad state[topfarm.x_key] = x state[topfarm.y_key] = y return state class CircleBoundaryComp(PolygonBoundaryComp): def __init__(self, n_wt, center, radius, const_id=None, units=None): self.center = center self.radius = radius t = np.linspace(0, 2 * np.pi, 100) xy_boundary = self.center + np.array([np.cos(t), np.sin(t)]).T * self.radius BoundaryBaseComp.__init__(self, n_wt, xy_boundary, const_id, units) self.zeros = np.zeros(self.n_wt) def plot(self, ax=None): ax = ax or plt.gca() circle = Circle(self.center, self.radius, color='k', fill=False) ax.add_artist(circle) def distances(self, x, y): return self.radius - np.sqrt((x - self.center[0])**2 + (y - self.center[1])**2) def gradients(self, x, y): theta = np.arctan2(y - self.center[1], x - self.center[0]) dx = -1 * np.ones_like(x) dy = -1 * np.ones_like(x) dist = self.radius - np.sqrt((x - self.center[0])**2 + (y - self.center[1])**2) not_center = dist != self.radius dx[not_center], dy[not_center] = -np.cos(theta[not_center]), -np.sin(theta[not_center]) return np.diagflat(dx), np.diagflat(dy) class Zone(object): def __init__(self, boundary, dist2wt, geometry_type, incl, name): self.name = name self.boundary = boundary self.dist2wt = dist2wt self.geometry_type = geometry_type self.incl = incl class InclusionZone(Zone): def __init__(self, boundary, dist2wt=None, geometry_type='polygon', name=''): super().__init__(np.asarray(boundary), dist2wt, geometry_type, incl=1, name=name) class ExclusionZone(Zone): def __init__(self, boundary, dist2wt=None, geometry_type='polygon', name=''): super().__init__(np.asarray(boundary), dist2wt, geometry_type, incl=0, name=name) class MultiPolygonBoundaryComp(PolygonBoundaryComp): def __init__(self, n_wt, zones, const_id=None, units=None, relaxation=False, method='nearest', simplify_geometry=False): ''' Parameters ---------- n_wt : TYPE DESCRIPTION. zones : list list of InclusionZone and ExclusionZone objects const_id : TYPE, optional DESCRIPTION. The default is None. units : TYPE, optional DESCRIPTION. The default is None. method : {'nearest' or 'smooth_min'}, optional 'nearest' calculate the distance to the nearest edge or point'smooth_min' calculates the weighted minimum distance to all edges/points. The default is 'nearest'. simplify : float or dict if float, simplification tolerance. if dict, shapely.simplify keyword arguments Returns ------- None. ''' self.zones = zones self.bounds_poly, xy_boundaries = self.get_xy_boundaries() PolygonBoundaryComp.__init__(self, n_wt, xy_boundary=xy_boundaries[0], const_id=const_id, units=units, relaxation=relaxation) # self.bounds_poly = [Polygon(x) for x in xy_boundaries] self.incl_excls = [x.incl for x in zones] self._setup_boundaries(self.bounds_poly, self.incl_excls) self.relaxation = relaxation self.method = method if simplify_geometry: self.simplify(simplify_geometry) def simplify(self, simplify_geometry): bounds = [bi[0] for bi in self.boundaries] self.incl_excls = [bi[1] for bi in self.boundaries] polygons = [Polygon(b) for b in bounds] if isinstance(simplify_geometry, dict): self.bounds_poly = [rp.simplify(**simplify_geometry) for rp in polygons] else: self.bounds_poly = [rp.simplify(simplify_geometry) for rp in polygons] self._setup_boundaries(self.bounds_poly, self.incl_excls) # def line_to_xy_boundary(self, line, buffer): # return np.asarray(Polygon(LineString(line).buffer(buffer, join_style=2).exterior).exterior.coords) def get_xy_boundaries(self): polygons = [] bounds = [] for z in self.zones: # hardcoded values passed to this function... it does not impact the # result because this only happends due to turbine specific component # calling into parent constructor; poly = self._zone2poly(z, D=-1, H=-1) polygons.append(poly) bounds.append(np.asarray(poly.exterior.coords)) return polygons, bounds def _setup_boundaries(self, bounds_poly, incl_excl): self.res_poly = self._calc_resulting_polygons(bounds_poly, incl_excl) self.boundaries = self._poly_to_bound(self.res_poly) boundary_properties_list_all = list(zip(*[self.get_boundary_properties(bound, incl_excl)[1:] for bound, incl_excl in self.boundaries])) self.boundary_properties_list_all = [np.concatenate(v, -1) for v in boundary_properties_list_all] def _poly_to_bound(self, polygons): boundaries = [] for bound in polygons: x, y = bound.exterior.xy boundaries.append((np.asarray([x, y]).T[:-1, :], 1)) for interior in bound.interiors: x, y = interior.xy boundaries.append((np.asarray([x, y]).T[:-1, :], 0)) return boundaries @staticmethod def _calc_resulting_polygons(boundary_polygons, incl_excls): """ Parameters ---------- boundary_polygons : list[shapely.Polygon] list of shapely polygons as specifed or inferred from user input incl_excls : list[bool] list of boolean values specifying whether the polygon is an inclusion or exclusion Returns ------- list of merged shapely polygons. Resolves issues arrising if any are overlapping, touching or contained in each other """ included_polygons = [ boundary_polygons[i] for i, x in enumerate(incl_excls) if x == 1 ] excluded_polygons = [ boundary_polygons[i] for i, x in enumerate(incl_excls) if x == 0 ] included_polygons = unary_union(included_polygons) excluded_polygons = unary_union(excluded_polygons) remain_polygons = included_polygons.difference(excluded_polygons) if isinstance(remain_polygons, Polygon): return [remain_polygons] if remain_polygons.area > 1e-3 else [] if isinstance(remain_polygons, MultiPolygon): return [ poly for poly in remain_polygons.geoms if poly.area > 1e-3 ] return [] def sign(self, Dist_ij): return np.sign(Dist_ij[np.arange(Dist_ij.shape[0]), np.argmin(abs(Dist_ij), axis=1)]) def calc_distance_and_gradients(self, x, y): ''' Parameters ---------- x : 1d array Array of x-positions. y : 1d array Array of y-positions. Returns ------- D_ij : 2d array Array of point-edge distances. index 'i' is points and index 'j' is total number of edges. sign_i : 1d array Array of signs of the governing distance. dDdk_jk : 2d array Jacobian of the distance matrix D_ij with respect to x and y. ''' if not np.shape([x, y]) == np.shape(self._cache_input): pass elif np.all(np.array([x, y]) == self._cache_input) & (not self.relaxation): return self._cache_output Dist_ij, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x, y, self.boundary_properties_list_all) dDdk_ijk = np.moveaxis([ddist_dX, ddist_dY], 0, -1) sign_i = self.sign(Dist_ij) self._cache_input = np.array([x, y]) self._cache_output = [Dist_ij, dDdk_ijk, sign_i] return self._cache_output def calc_relaxation(self, iteration_no=None): ''' The tupple relaxation contains a first term for the penalty constant and a second term for the n first iterations to apply relaxation. ''' if iteration_no is None: iteration_no = self.problem.cost_comp.n_grad_eval + 1 return max(0, self.relaxation[0] * (self.relaxation[1] - iteration_no)) def distances(self, x, y): Dist_ij, _, sign_i = self.calc_distance_and_gradients(x, y) if self.method == 'smooth_min': Dist_i = smooth_max(np.abs(Dist_ij), -np.abs(Dist_ij).max(), axis=1) * sign_i elif self.method == 'nearest': Dist_i = Dist_ij[np.arange(x.size), np.argmin(np.abs(Dist_ij), axis=1)] else: warning = f'method: {self.method} is not implemented. Available options are smooth_min and nearest.' warnings.warn(warning) if self.relaxation: Dist_i += self.calc_relaxation() return Dist_i def gradients(self, x, y): ''' The derivate of the smooth maximum with respect to x and y is calculated with the chain rule: dS/dk = dS/dD * dD/dk where S is smooth maximum, D is distance to edge and k is the spacial dimension ''' Dist_ij, dDdk_ijk, _ = self.calc_distance_and_gradients(x, y) if self.relaxation: Dist_ij += self.calc_relaxation() # dDdt = -self.relaxation[1] if self.method == 'smooth_min': dSdDist_ij = smooth_max_gradient(np.abs(Dist_ij), -np.abs(Dist_ij).max(), axis=1) dSdkx_i, dSdky_i = (dSdDist_ij[:, :, na] * dDdk_ijk).sum(axis=1).T elif self.method == 'nearest': dSdkx_i, dSdky_i = dDdk_ijk[np.arange(x.size), np.argmin(np.abs(Dist_ij), axis=1), :].T if self.relaxation: # as relaxed distance is relaxation + distance, the gradient with respect to x and y is unchanged gradients = np.diagflat(dSdkx_i), np.diagflat(dSdky_i), np.ones(self.n_wt) * self.relaxation[1] else: gradients = np.diagflat(dSdkx_i), np.diagflat(dSdky_i) return gradients def relaxed_polygons(self, iteration_no=None): poly = [Polygon(x.boundary) for x in self.zones] booleans = [x.incl for x in self.zones] relaxed_poly = [] for i, p in enumerate(poly): if booleans[i] == 0: pb = p.buffer(-self.calc_relaxation(iteration_no), join_style=2) relaxed_poly.append(pb) else: pb = p.buffer(self.calc_relaxation(iteration_no), join_style=2) relaxed_poly.append(pb) merged_poly = self._calc_resulting_polygons(relaxed_poly, booleans) return self._poly_to_bound(merged_poly) @staticmethod def _zone2poly(z: Zone, **kwargs): buffer = 0 if hasattr(z.dist2wt, "__code__"): buffer = z.dist2wt( **{k: kwargs[k] for k in z.dist2wt.__code__.co_varnames} ) if not is_number(buffer): raise ValueError(f"dist2wt must return a float, not {type(buffer)}") elif is_number(z.dist2wt): buffer = z.dist2wt elif z.dist2wt is not None: warnings.warn( f"dist2wt is not a function or a float and is ignored. Zone buffer is set to 0." ) buf_direction = -1 if z.incl else 1 buffer = abs(buffer) * buf_direction if z.geometry_type == "line": poly = Polygon(LineString(z.boundary).buffer(buffer, join_style=2).exterior) elif z.geometry_type == "polygon": poly = Polygon(z.boundary).buffer(buffer, join_style=2) else: raise NotImplementedError( f"Geometry type '{z.geometry_type}' is not implemented" ) return poly def plot(self, ax): """Plot original and buffered boundaries""" legend_mask = [0, 0] for i, (zone, buffered_poly) in enumerate(zip(self.zones, self.bounds_poly)): original_coords = zone.boundary if zone.geometry_type == "line": ax.plot( original_coords[:, 0], original_coords[:, 1], "k--", linewidth=1, alpha=0.5, label="Original line" if i == 0 else "", ) else: original_poly = np.vstack([original_coords, original_coords[0]]) ax.plot( original_poly[:, 0], original_poly[:, 1], "k--", linewidth=1, alpha=0.5, label="Original boundary" if i == 0 else "", ) x, y = buffered_poly.exterior.xy color = "green" if zone.incl else "red" label = "Inclusion" if zone.incl else "Exclusion" if legend_mask[zone.incl]: label = "" legend_mask[zone.incl] = 1 ax.plot( x, y, color=color, linewidth=1, linestyle="-", label=label, ) ax.fill(x, y, color=color, alpha=0.1) ax.legend() class TurbineSpecificBoundaryComp(MultiPolygonBoundaryComp): def __init__(self, n_wt, wind_turbines, zones, const_id=None, units=None, relaxation=False, method='nearest', simplify_geometry=False): # self.dependencies = [d or {'type': None, 'multiplier': None, 'ref': None} for d in dependencies] # self.multi_boundary = xy_boundaries = self.get_xy_boundaries(boundaries, geometry_types, incl_excls) self.wind_turbines = wind_turbines self.types = wind_turbines.types() # self.incl_excls = incl_excls self.n_wt = n_wt self.zones = zones self.ts_polygon_boundaries, ts_xy_boundaries = self.get_ts_boundaries() MultiPolygonBoundaryComp.__init__(self, n_wt=n_wt, zones=zones, const_id=const_id, units=units, relaxation=relaxation, method=method, simplify_geometry=simplify_geometry) # self.polygon_boundaries = [Polygon(x) for x, _ in xy_boundaries] # self.ts_polygon_boundaries = self.get_ts_polygon_boundaries(self.types) self.ts_merged_polygon_boundaries = self.merge_boundaries() self.ts_merged_xy_boundaries = self.get_ts_xy_boundaries() self.ts_boundary_properties = self.get_ts_boundary_properties() self.ts_item_indices = self.get_ts_item_indices() def get_ts_boundaries(self): polygons = [] bounds = [] for t in set(self.types): temp1 = [] temp2 = [] dist2wt_input = dict( D=self.wind_turbines.diameter(t), H=self.wind_turbines.hub_height(t), ) for z in self.zones: poly = self._zone2poly(z, **dist2wt_input) bound = np.asarray(poly) temp1.append(poly) temp2.append(bound) polygons.append(temp1) bounds.append(temp2) return polygons, bounds # for wt # temp = [] # for n, (b, t, ie) in enumerate(zip(boundaries, geometry_types, incl_excls)): # if t == 'line': # bound = np.asarray(Polygon(LineString(b).buffer(default_ref, join_style=2).exterior).exterior.coords) # self.dependencies[n]['ref'] = default_ref # elif t == 'polygon': # bound = b # else: # raise NotImplementedError("Geometry type '%s' is not implemented" % b) # temp.append((bound, ie)) # def get_ts_polygon_boundaries(self, types): # temp = [] # for t in set(types): # d = self.wind_turbines.diameter(t) # h = self.wind_turbines.hub_height(t) # temp.append(self.get_ts_polygon_boundary(d, h)) # return temp def get_ts_xy_boundaries(self): return [self._poly_to_bound(b) for b in self.ts_merged_polygon_boundaries] # def get_ts_polygon_boundary(self, d=None, h=None): # temp = [] # for bound, dep in zip(self.polygon_boundaries, self.dependencies): # ref = dep['ref'] or 0 # if dep['type'] == 'D': # ts_polygon_boundary = bound.buffer(dep['multiplier']*d-ref, join_style=2) # elif dep['type'] == 'H': # ts_polygon_boundary = bound.buffer(dep['multiplier']*h-ref, join_style=2) # else: # ts_polygon_boundary = bound # temp.append(ts_polygon_boundary) # return temp def merge_boundaries(self): return [self._calc_resulting_polygons(bounds, self.incl_excls) for bounds in self.ts_polygon_boundaries] def get_ts_boundary_properties(self,): return [[self.get_boundary_properties(bound) for bound, _ in bounds] for bounds in self.ts_merged_xy_boundaries] def get_ts_item_indices(self): temp = [] for bounds in self.ts_merged_xy_boundaries: n_edges = np.asarray([len(bound) for bound, _ in bounds]) n_edges_tot = np.sum(n_edges) start_at = np.cumsum(n_edges) - n_edges end_at = start_at + n_edges item_indices = [n_edges_tot, start_at, end_at] temp.append(item_indices) return temp def calc_distance_and_gradients(self, x, y, types=None): if self._cache_input is None: pass elif not np.shape([x, y]) == np.shape(self._cache_input[0]) or not np.shape(types) == np.shape(self._cache_input[1]): pass elif np.all(np.array([x, y]) == self._cache_input[0]) & (not self.relaxation) & np.all(np.asarray([types]) == self._cache_input[1]): return self._cache_output if types is None: types = np.zeros(self.n_wt) Dist_i = np.zeros(self.n_wt) sign_i = np.zeros(self.n_wt) dDdx_i = np.zeros(self.n_wt) dDdy_i = np.zeros(self.n_wt) for t in set(types): t = int(t) idx = (types == t) n_edges_tot, start_at, end_at = self.ts_item_indices[t] Dist_ij = np.zeros((sum(idx), n_edges_tot)) dDdk_ijk = np.zeros((sum(idx), n_edges_tot, 2)) for n, (bound, bound_type) in enumerate(self.ts_merged_xy_boundaries[t]): sa = start_at[n] ea = end_at[n] distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x[idx], y[idx], self.ts_boundary_properties[t][n][1:]) if bound_type == 0: distance *= -1 ddist_dX *= -1 ddist_dY *= -1 Dist_ij[:, sa:ea] = distance dDdk_ijk[:, sa:ea, 0] = ddist_dX dDdk_ijk[:, sa:ea, 1] = ddist_dY sign_i[idx] = self.sign(Dist_ij) Dist_i[idx] = Dist_ij[np.arange(sum(idx)), np.argmin(np.abs(Dist_ij), axis=1)] dDdx_i[idx], dDdy_i[idx] = dDdk_ijk[np.arange(sum(idx)), np.argmin(np.abs(Dist_ij), axis=1), :].T self._cache_input = (np.array([x, y]), np.asarray(types)) self._cache_output = [Dist_i, dDdx_i, dDdy_i, sign_i] return self._cache_output def distances(self, x, y, type=None): Dist_i, _, _, _ = self.calc_distance_and_gradients(x, y, types=type) if self.relaxation: Dist_i += self.calc_relaxation() return Dist_i def gradients(self, x, y, type=None): Dist_i, dDdx_i, dDdy_i, _ = self.calc_distance_and_gradients(x, y, types=type) if self.relaxation: Dist_i += self.calc_relaxation() dDdt = -self.relaxation[0] if self.relaxation: gradients = np.diagflat(dDdx_i), np.diagflat(dDdy_i), np.ones(self.n_wt) * dDdt else: gradients = np.diagflat(dDdx_i), np.diagflat(dDdy_i) return gradients def plot(self, ax): linestyles = ["--", "-"] colors = ["b", "r", "m", "c", "g", "y", "orange", "indigo", "grey"] for n, t in enumerate(self.types): _ = ax.plot( *self.ts_merged_xy_boundaries[n][0][0][0, :], color=colors[t % len(colors)], linewidth=1, label=f"{self.wind_turbines._names[n]} boundary", ) for bound, io in self.ts_merged_xy_boundaries[n]: ax.plot( np.asarray(bound)[:, 0].tolist() + [np.asarray(bound)[0, 0]], np.asarray(bound)[:, 1].tolist() + [np.asarray(bound)[0, 1]], color=colors[t % len(colors)], linewidth=1, linestyle=linestyles[io], ) for i, zone in enumerate(self.zones): original_coords = zone.boundary if zone.geometry_type == "line": ax.plot( original_coords[:, 0], original_coords[:, 1], "k-.", linewidth=0.8, alpha=0.7, label=f"Original" if (i == 0) else "", ) else: original_poly = np.vstack([original_coords, original_coords[0]]) ax.plot( original_poly[:, 0], original_poly[:, 1], "k-.", linewidth=0.8, alpha=0.7, label=f"Original" if (i == 0) else "", ) ax.legend() class MultiCircleBoundaryComp(PolygonBoundaryComp): def __init__(self, n_wt, geometry, wt_groups, const_id=None, units=None): self.__validate_input(geometry, wt_groups) self.center = [g[0] for g in geometry] self.radius = [g[1] for g in geometry] # ones where the indices in each wt_group are active, zeros elsewhere self.masks = [np.isin(np.arange(n_wt), g) for g in wt_groups] BoundaryBaseComp.__init__(self, n_wt, None, const_id, units) self.zeros = np.zeros(self.n_wt) def __validate_input(self, geometry, wt_groups): does_len_match = len(geometry) == len(wt_groups) centers_and_radii_given = all(len(g) == 2 for g in geometry) from topfarm.utils import _np2scalar # fmt:skip are_radii_scalar = True try: _ = [_np2scalar(g[1]) for g in geometry] except BaseException: are_radii_scalar = False are_centers_all_2x_coords = all( hasattr(g[0], "__iter__") and len(g[0]) == 2 for g in geometry ) assert all( [ does_len_match, centers_and_radii_given, are_radii_scalar, are_centers_all_2x_coords, ] ), ( "Invalid input for Circle: Ensure the number of geometries matches the number of wt_groups, " "each geometry is a tuple of center and radius, and the center is a tuple of x and y " "with the radius being a scalar. For instance, geometry = [((0, 0), 100), ...] and " "wt_groups = [[0, 1], ...]; Here, the first geometry is a circle with center at (0, 0) " "and radius 100, and turbines 0 and 1 are assigned for the circle constraint 0." ) def plot(self, ax=None): ax = ax or plt.gca() for center, radius in zip(self.center, self.radius): circle = Circle(center, radius, color="k", fill=False) ax.add_artist(circle) def distances(self, x, y): assert ( x.shape == y.shape == self.masks[0].shape ), f"{x.shape} != {y.shape} != {self.masks[0].shape}" distances = np.zeros_like(x) for center, radius, mask in zip(self.center, self.radius, self.masks): distances += mask * ( radius - np.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) ) return distances def gradients(self, x, y): dx = np.zeros_like(x) dy = np.zeros_like(x) for center, radius, mask in zip(self.center, self.radius, self.masks): theta = np.arctan2(y - center[1], x - center[0]) dx_tmp = -1 * np.ones_like(x) dy_tmp = -1 * np.ones_like(x) dist = radius - np.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2) not_center = dist != radius dx_tmp[not_center], dy_tmp[not_center] = mask[not_center] * -np.cos( theta[not_center] ), mask[not_center] * -np.sin(theta[not_center]) dx += dx_tmp dy += dy_tmp return np.diagflat(dx), np.diagflat(dy) @dataclass class Boundary(object): _vertices: np.ndarray design_var_mask: np.ndarray normals: np.ndarray = None @property def n_turbines(self): return self.design_var_mask.sum() @property def n_vertices(self): if np.all(self.vertices[0] == self.vertices[-1]): return self.vertices.shape[0] - 1 return self.vertices.shape[0] @property def vertices(self): return self._vertices @vertices.setter def vertices(self, v): self._vertices = v def __post_init__(self): self.__validate() def __validate(self): self.vertices = np.asarray(self.vertices) assert self.vertices.ndim == 2, "Boundary must be a 2D array" assert any( [x for x in self.vertices.shape if x == 2] ), "Boundary must have shape (n, 2) or (2, n)" self.vertices = self.vertices.reshape(-1, 2) assert self.vertices.shape[0] > 2, "Boundary must have at least 3 vertices" assert self.design_var_mask.ndim == 1, "design_var_mask must 1 dimensional" self.design_var_mask = np.asarray(self.design_var_mask, dtype=bool) class MultiConvexBoundaryComp(BoundaryBaseComp): def __init__( self, n_wt, geometry, wt_groups, const_id=None, units=None, ): xy_boundaries = [ Boundary(g, np.isin(np.arange(n_wt), mask)) for g, mask in zip(geometry, wt_groups) ] self.xy_boundaries = self.sort_boundaries(xy_boundaries) self.xy_boundaries = self.calculate_boundary_and_normals(self.xy_boundaries) super().__init__(n_wt, None, const_id, units) self.turbine_vertice_prod = 0 total_n_active_turbines = 0 for b in self.xy_boundaries: if np.any(b.vertices[0] != b.vertices[-1]): b.vertices = np.r_[b.vertices, b.vertices[:1]] self.turbine_vertice_prod += b.n_turbines * b.n_vertices total_n_active_turbines += b.n_turbines assert ( total_n_active_turbines == n_wt ), "Number of active turbines in boundaries must match number of total turbines; Check that masks sum up to n_wt i.e. np.concatenate(all_masks).sum() == n_wt." self.calculate_gradients() self.zeros = np.zeros(self.turbine_vertice_prod) def sort_boundaries(self, boundaries): def centroid(boundary): # fmt: skip return tuple(np.mean(boundary.vertices, axis=0)) return sorted(boundaries, key=lambda b: centroid(b)) def calculate_boundary_and_normals( self, xy_boundaries: list[Boundary] ) -> list[Boundary]: def __compute_normal(boundary_pts, ii, jj): """Calculate the unit normal vector of the current face (taking points CCW)""" normal = np.array( [ boundary_pts[ii, 1] - boundary_pts[jj, 1], -(boundary_pts[ii, 0] - boundary_pts[jj, 0]), ] ) return normal / np.linalg.norm(normal) for boundary in xy_boundaries: hull = ConvexHull(list(boundary.vertices)) # keep only vertices that actually comprise a convex hull and arrange in CCW order boundary.vertices = boundary.vertices[hull.vertices] # initialize normals array unit_normals = np.zeros([boundary.n_vertices, 2]) # determine if point is inside or outside and distances from each face nvtm1 = boundary.n_vertices - 1 for j in range(0, nvtm1): # all but the points that close the shape unit_normals[j] = __compute_normal(boundary.vertices, j + 1, j) # points that close the shape unit_normals[nvtm1] = __compute_normal(boundary.vertices, 0, nvtm1) boundary.normals = unit_normals return xy_boundaries def calculate_gradients(self): # this is flawed if the order of boundaries is switched; # test with arbitrary number of vertices and arbitrary number # of turbines in a boundary; For now it's sorted at the top.. final_dx = np.zeros([self.turbine_vertice_prod, self.n_wt]) final_dy = np.zeros([self.turbine_vertice_prod, self.n_wt]) for bi, boundary in enumerate(self.xy_boundaries): assert ( boundary.design_var_mask is not None ), "Design variable mask must be provided" assert self.n_wt == len( boundary.design_var_mask ), "Design variable mask must must be the same length as the number of turbines" unit_normals = boundary.normals n_vertices = boundary.n_vertices n_turbines = boundary.n_turbines dfaceDistance_dx = np.zeros([n_turbines * n_vertices, n_turbines]) dfaceDistance_dy = np.zeros([n_turbines * n_vertices, n_turbines]) for i in range(0, n_turbines): # determine if point is inside or outside of each face, and distances from each face for j in range(0, n_vertices): # define the derivative vectors from the point of interest to the first point of the face dpa_dx = np.array([-1.0, 0.0]) dpa_dy = np.array([0.0, -1.0]) # find perpendicular distances derivatives from point to current surface (vector projection) ddistanceVec_dx = np.vdot(dpa_dx, unit_normals[j]) * unit_normals[j] ddistanceVec_dy = np.vdot(dpa_dy, unit_normals[j]) * unit_normals[j] # calculate derivatives for the sign of perpendicular distances from point to current face dfaceDistance_dx[i * n_vertices + j, i] = np.vdot( ddistanceVec_dx, unit_normals[j] ) dfaceDistance_dy[i * n_vertices + j, i] = np.vdot( ddistanceVec_dy, unit_normals[j] ) seek = sum([(b.n_vertices * b.n_turbines) for b in self.xy_boundaries[:bi]]) final_dx[ seek: seek + (n_turbines * n_vertices), boundary.design_var_mask ] = dfaceDistance_dx final_dy[ seek: seek + (n_turbines * n_vertices), boundary.design_var_mask ] = dfaceDistance_dy dfaceDistance_dx = final_dx dfaceDistance_dy = final_dy def __wrap_sparse(m): # fmt: skip if m.size < 1e4: return m from scipy.sparse import csr_matrix # fmt: skip return csr_matrix(m) # store Jacobians self.dfaceDistance_dx = __wrap_sparse(dfaceDistance_dx) self.dfaceDistance_dy = __wrap_sparse(dfaceDistance_dy) def distances(self, x, y): """ :param points: points that you want to calculate the distances from to the faces of the convex hull :return face_distace: signed perpendicular distances from each point to each face; + is inside """ points = np.array([x, y]).T face_distances = np.zeros(self.turbine_vertice_prod) for bi, boundary in enumerate(self.xy_boundaries): mask = boundary.design_var_mask vertices = boundary.vertices[:-1] n_vertices = boundary.n_vertices PA = vertices[:, na] - points[mask][na] dist = np.sum(PA * boundary.normals[:, na], axis=2) d_vec = dist[:, :, na] * boundary.normals[:, na] seek = sum([(b.n_vertices * b.n_turbines) for b in self.xy_boundaries[:bi]]) face_distances[seek: seek + (boundary.n_turbines * n_vertices)] = np.sum( d_vec * boundary.normals[:, na], axis=2 ).T.reshape(-1) return face_distances def gradients(self, x, y): return self.dfaceDistance_dx, self.dfaceDistance_dy def satisfy(self, state): raise NotImplementedError("Not implemented for MultiConvexBoundaryComp") def plot(self, ax): for b in self.xy_boundaries: ax.plot( b.vertices[:, 0].tolist(), b.vertices[:, 1].tolist(), "k", linewidth=1, ) class MultiWFPolygonBoundaryComp(PolygonBoundaryComp): def __init__( self, n_wt: int, geometry, wt_groups, **kwargs, ): boundaries = {i: geom for i, geom in enumerate(geometry)} turbine_groups = {i: group for i, group in enumerate(wt_groups)} self.boundaries = {} # group_id: boundary_coords for group_id, boundary_coords in boundaries.items(): boundary_coords = self.__validate_boundary_coords(boundary_coords) # close the boundary if needed if not np.all(boundary_coords[0] == boundary_coords[-1]): boundary_coords = np.vstack([boundary_coords, boundary_coords[0]]) self.boundaries[group_id] = boundary_coords self.__validate_group_assignments(turbine_groups, n_wt) self.turbine_groups = {i: -1 for i in range(n_wt)} # turbine_idx: group_id for group_id, indices in turbine_groups.items(): if group_id not in self.boundaries: raise ValueError(f"No boundary defined for group {group_id}") for idx in indices: self.turbine_groups[idx] = group_id # check that all turbines are assigned to a group if -1 in self.turbine_groups.values(): raise ValueError(f"All turbines must be assigned to a group; All the -1 should be filled\n{self.turbine_groups}") super().__init__( n_wt, np.array(list(boundaries.values())[0]).reshape(-1, 2), **kwargs ) def __validate_boundary_coords(self, boundary_coords: np.ndarray) -> None: if not isinstance(boundary_coords, (np.ndarray, list)): raise TypeError( "Boundary coordinates must be a numpy array or a list of lists" ) try: boundary_coords = np.array(boundary_coords).reshape(-1, 2) except BaseException: raise ValueError( "Boundary coordinates must be a 2D array with shape (n,2)" ) if boundary_coords.ndim != 2 or boundary_coords.shape[1] != 2: raise ValueError("Boundary coordinates must be a 2D array with shape (n,2)") if len(boundary_coords) < 3: raise ValueError("Boundary must have at least 3 points") return boundary_coords def __validate_group_assignments( self, groups: Dict[int, List[int]], num_turbines: int ) -> None: if not isinstance(groups, dict): raise TypeError("Groups must be a dictionary") valid_types = (int, np.integer) for group_id, turbine_indices in groups.items(): if not isinstance(group_id, valid_types) or group_id < 0: raise ValueError( f"Invalid group ID: {group_id}; Should be an integer >= 0" ) if not all( isinstance(idx, valid_types) and 0 <= idx < num_turbines for idx in turbine_indices ): raise ValueError( f"Invalid turbine indices in group {group_id}; Should be integers in range [0, {num_turbines})" ) def __dist_grad_wrapper(self, x, y, boundary_prop): if not np.shape([x, y]) == np.shape(self._cache_input): pass elif np.all(np.array([x, y]) == self._cache_input): return self._cache_output distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients( x, y, boundary_prop ) closest_edge_index = np.argmin(np.abs(distance), 1) self._cache_input = np.array([x, y]) self._cache_output = [ # pick only closest edge v[np.arange(len(closest_edge_index)), closest_edge_index] for v in [distance, ddist_dX, ddist_dY] ] return self._cache_output def __calculate_group_distances_and_gradients(self, x, y): """Helper method to calculate distances and gradients for all groups.""" n = len(x) ds = np.zeros(n) dx = np.zeros(n) dy = np.zeros(n) for group_id, boundary in self.boundaries.items(): group_turbines = [ i for i in range(n) if self.turbine_groups.get(i) == group_id ] if not group_turbines: continue x_group = x[group_turbines] y_group = y[group_turbines] boundary_properties = self.get_boundary_properties(boundary)[1:] distances, dx_group, dy_group = self.__dist_grad_wrapper( x_group, y_group, boundary_properties ) ds[group_turbines] = distances dx[group_turbines] = dx_group dy[group_turbines] = dy_group return ds, dx, dy def distances(self, x, y): ds, _, _ = self.__calculate_group_distances_and_gradients(x, y) return ds def gradients(self, x, y): _, dx, dy = self.__calculate_group_distances_and_gradients(x, y) return np.diagflat(dx), np.diagflat(dy) def plot(self, ax=None): colors = plt.cm.viridis(np.linspace(0, .8, len(self.boundaries))) for ii, (group_id, boundary) in enumerate(self.boundaries.items()): ax.plot(*boundary.T, c=colors[ii], label=f"Group {group_id}", linewidth=1) ax.legend() class BoundaryType(Enum): CIRCLE = "circle" CONVEX_HULL = "convex_hull" POLYGON = "polygon" class MultiWFBoundaryConstraint(XYBoundaryConstraint): def __init__(self, geometry, wt_groups, boundtype, units=None): """Entry point for creating boundary constraints for joint multi-wind-farm optimization. Parameters ---------- geometry : Iterable Geometry input depends on a specific boundary type: CIRCLE - it is a list of tuples [(center1, radius1), (center2, radius2), ...], where center itself is a tuple (x, y) and radius is a scalar. CONVEX_HULL - it is a list of numpy arrays, where each array represents a convex hull boundary. Each array has shape (n, 2) where n is the number of vertices. POLYGON - it is a list of numpy arrays, where each array represents a polygon boundary. Each array has shape (n, 2) where n is the number of vertices. !!! Input length must match the number of groups in wt_groups. !!! wt_groups : Iterable List of lists where each list contains indices of turbines assigned to a specific boundary. For instance, [[0, 1], [2, 3, 4], ...] assigns turbines 0 and 1 to boundary 0, and turbines 2, 3, and 4 to boundary 1. !!! Input length must match the number of geometries in geometry parameter. !!! boundtype : BoundaryType One of BoundaryType.CIRCLE, BoundaryType.CONVEX_HULL, BoundaryType.POLYGON. The argument specifies the type of boundary constraint to be enforced. units : str, optional Units for the boundary, by default None """ self.geometry = geometry self.boundtype = boundtype self.const_id = f"wf_boundary_comp_{id(self)}" self.units = units self.wt_groups = wt_groups assert len(geometry) == len( wt_groups ), "Number of geometries and groups must match" BD2COMP = { BoundaryType.CIRCLE: MultiCircleBoundaryComp, BoundaryType.CONVEX_HULL: MultiConvexBoundaryComp, BoundaryType.POLYGON: MultiWFPolygonBoundaryComp, } def get_comp(self, n_wt): assert n_wt > 0, "Number of turbines must be greater than 0" assert n_wt >= len( self.wt_groups ), "Number of turbines must be greater than or equal to the number of groups" if hasattr(self, "boundary_comp"): return self.boundary_comp if (comp := self.BD2COMP.get(self.boundtype, None)) is None: raise NotImplementedError(f"Invalid boundary type {self.boundtype}") return comp( n_wt, self.geometry, self.wt_groups, const_id=self.const_id, units=self.units, ) def set_design_var_limits(self, design_vars): _ = design_vars def main(): if __name__ == "__main__": from py_wake.wind_turbines import WindTurbines from py_wake.wind_turbines.power_ct_functions import CubePowerSimpleCt plt.close('all') i1 = np.array([[2, 17], [6, 23], [16, 23], [26, 15], [19, 0], [14, 4], [4, 4]]) e1 = np.array([[0, 10], [20, 21], [22, 12], [10, 12], [9, 6], [2, 7]]) i2 = np.array([[12, 13], [14, 17], [18, 15], [17, 10], [15, 11]]) e2 = np.array([[5, 17], [5, 18], [8, 19], [8, 18]]) i3 = np.array([[5, 0], [5, 1], [10, 3], [10, 0]]) e3 = np.array([[6, -1], [6, 18], [7, 18], [7, -1]]) e4 = np.array([[15, 9], [15, 11], [20, 11], [20, 9]]) e5 = np.array([[10, 25], [20, 0]]) zones = [ InclusionZone(i1, name='i1'), InclusionZone(i2, name='i2'), InclusionZone(i3, name='i3'), ExclusionZone(e1, name='e1'), ExclusionZone(e2, name='e2'), ExclusionZone(e3, name='e3'), ExclusionZone(e4, name='e4'), ExclusionZone(e5, name='e5', dist2wt=lambda: 1, geometry_type='line'), ] N_points = 50 xs = np.linspace(-1, 30, N_points) ys = np.linspace(-1, 30, N_points) y_grid, x_grid = np.meshgrid(xs, ys) x = x_grid.ravel() y = y_grid.ravel() n_wt = len(x) MPBC = MultiPolygonBoundaryComp(n_wt, zones, method='nearest') distances = MPBC.distances(x, y) delta = 1e-9 distances2 = MPBC.distances(x + delta, y) dx_fd = (distances2 - distances) / delta dx = np.diag(MPBC.gradients(x + delta / 2, y)[0]) plt.figure() plt.plot(dx_fd, dx, '.') plt.figure() for n, bound in enumerate(MPBC.boundaries): x_bound, y_bound = bound[0].T x_bound = np.append(x_bound, x_bound[0]) y_bound = np.append(y_bound, y_bound[0]) line, = plt.plot(x_bound, y_bound, label=f'{n}') plt.plot(x_bound[0], y_bound[0], color=line.get_color(), marker='o') plt.legend() plt.grid() plt.axis('square') plt.contourf(x_grid, y_grid, distances.reshape(N_points, N_points), np.linspace(-10, 10, 100), cmap='seismic') plt.colorbar() plt.figure() ax = plt.axes(projection='3d') ax.contour3D( x.reshape( N_points, N_points), y.reshape( N_points, N_points), distances.reshape( N_points, N_points), np.linspace(-10, 10, 100), cmap='seismic') ax.set_xlabel('x') ax.set_ylabel('y') ax.set_zlabel('z') if 0: for smpl in [0, 1, 2, 3, 4, 5, 6, 7, 8]: MPBC = MultiPolygonBoundaryComp(n_wt, zones, simplify_geometry=smpl) plt.figure() ax = plt.gca() MPBC.plot(ax) wind_turbines = WindTurbines(names=['tb1', 'tb2'], diameters=[80, 120], hub_heights=[70, 110], powerCtFunctions=[ CubePowerSimpleCt(ws_cutin=3, ws_cutout=25, ws_rated=12, power_rated=2000, power_unit='kW', ct=8 / 9, additional_models=[]), CubePowerSimpleCt(ws_cutin=3, ws_cutout=25, ws_rated=12, power_rated=3000, power_unit='kW', ct=8 / 9, additional_models=[])]) x1 = [0, 3000, 3000, 0] y1 = [0, 0, 3000, 3000] b1 = np.transpose((x1, y1)) # Buildings x2 = [600, 1400, 1400, 600] y2 = [1700, 1700, 2500, 2500] b2 = np.transpose((x2, y2)) # River x3 = np.linspace(520, 2420, 16) y3 = [0, 133, 266, 400, 500, 600, 700, 733, 866, 1300, 1633, 2100, 2400, 2533, 2700, 3000] b3 = np.transpose((x3, y3)) # Roads x4 = np.linspace(0, 3000, 16) y4 = [1095, 1038, 1110, 1006, 1028, 992, 977, 1052, 1076, 1064, 1073, 1027, 964, 981, 1015, 1058] b4 = np.transpose((x4, y4)) zones = [ InclusionZone(b1, name='i1'), ExclusionZone(b2, dist2wt=lambda H: 4 * H - 360, name='building'), ExclusionZone(b3, geometry_type='line', dist2wt=lambda D: 3 * D, name='river'), ExclusionZone(b4, geometry_type='line', dist2wt=lambda D, H: max(D * 2, H * 3), name='road'), ] N_points = 50 xs = np.linspace(0, 3000, N_points) ys = np.linspace(0, 3000, N_points) y_grid, x_grid = np.meshgrid(xs, ys) x = x_grid.ravel() y = y_grid.ravel() n_wt = len(x) types = np.zeros(n_wt) TSBC = TurbineSpecificBoundaryComp(n_wt, wind_turbines, zones) distances = TSBC.distances(x, y, type=types) delta = 1e-9 distances2 = TSBC.distances(x + delta, y, type=types) dx_fd = (distances2 - distances) / delta dx = np.diag(TSBC.gradients(x + delta / 2, y, type=types)[0]) plt.figure() plt.plot(dx_fd, dx, '.') plt.figure() for ll, t in enumerate(TSBC.types): line, = plt.plot(*TSBC.ts_merged_xy_boundaries[ll][0][0][0, :], label=f'type {ll}') for n, bound in enumerate(TSBC.ts_merged_xy_boundaries[ll]): x_bound, y_bound = bound[0].T x_bound = np.append(x_bound, x_bound[0]) y_bound = np.append(y_bound, y_bound[0]) plt.plot(x_bound, y_bound, color=line.get_color()) plt.legend() plt.grid() plt.axis('square') for ll, t in enumerate(TSBC.types): plt.figure() for n, bound in enumerate(TSBC.ts_merged_xy_boundaries[ll]): x_bound, y_bound = bound[0].T x_bound = np.append(x_bound, x_bound[0]) y_bound = np.append(y_bound, y_bound[0]) plt.plot(x_bound, y_bound, 'b') plt.grid() plt.title(f'type {ll}') plt.axis('square') plt.contourf(x_grid, y_grid, TSBC.distances(x, y, type=t * np.ones(n_wt)).reshape(N_points, N_points), 50) plt.colorbar() main()