import numpy as np
from numpy import newaxis as na
from scipy.spatial import ConvexHull
from topfarm.constraint_components import Constraint, ConstraintComponent
from topfarm.utils import smooth_max, smooth_max_gradient, is_number
import topfarm
from shapely.geometry import Polygon, MultiPolygon, LineString
from shapely.ops import unary_union
import warnings
from dataclasses import dataclass
from typing import Dict, List, Optional
from enum import Enum
import matplotlib.pyplot as plt
from matplotlib.pyplot import Circle
[docs]
class XYBoundaryConstraint(Constraint):
[docs]
def __init__(self, boundary, boundary_type='convex_hull', units=None, relaxation=False, **kwargs):
"""Initialize XYBoundaryConstraint
Parameters
----------
boundary : array_like (n,2) or list of tuples (array_like (n,2), boolean)
boundary coordinates. If boundary is array_like (n,2) it indicates a single boundary and can be used with
boundary types: 'convex_hull', 'polygon', 'rectangle','square'. If boundary is list of tuples (array_like (n,2), boolean),
it is multiple boundaries where the boolean is 1 for inclusion zones and 0 for exclusion zones and can be used with the
boundary type: 'multi_polygon'.
boundary_type : 'convex_hull', 'polygon', 'rectangle','square'
- 'convex_hull' (default): Convex hul around boundary points\n
- 'polygon': Polygon boundary (may be non convex). Less suitable for gradient-based optimization\n
- 'rectangle': Smallest axis-aligned rectangle covering the boundary points\n
- 'square': Smallest axis-aligned square covering the boundary points
- 'multi_polygon': Mulitple polygon boundaries incl. exclusion zones (may be non convex).\n
- 'turbine_specific': Set of multiple polygon boundaries that depend on the wind turbine type. \n
"""
if boundary_type == 'multi_polygon':
self.zones = boundary
self.boundary = np.asarray(self.zones[0].boundary)
elif boundary_type == 'turbine_specific':
self.zones = boundary
assert 'turbines' in list(kwargs)
self.turbines = kwargs['turbines']
self.boundary = np.asarray(self.zones[0].boundary)
else:
self.boundary = np.asarray(boundary)
self.boundary_type = boundary_type
self.const_id = 'xyboundary_comp_{}'.format(boundary_type)
self.units = units
self.relaxation = relaxation
def get_comp(self, n_wt):
if not hasattr(self, 'boundary_comp'):
if self.boundary_type == 'polygon':
self.boundary_comp = PolygonBoundaryComp(
n_wt, self.boundary, self.const_id, self.units, self.relaxation)
elif self.boundary_type == 'multi_polygon':
self.boundary_comp = MultiPolygonBoundaryComp(n_wt, self.zones, const_id=self.const_id, units=self.units, relaxation=self.relaxation)
elif self.boundary_type == 'turbine_specific':
self.boundary_comp = TurbineSpecificBoundaryComp(n_wt, self.turbines, self.zones, const_id=self.const_id, units=self.units, relaxation=self.relaxation)
else:
self.boundary_comp = ConvexBoundaryComp(n_wt, self.boundary, self.boundary_type, self.const_id, self.units)
return self.boundary_comp
@property
def constraintComponent(self):
assert hasattr(
self, "boundary_comp"
), "Boundary component not initialized, call setup first"
return self.boundary_comp
def set_design_var_limits(self, design_vars):
if self.boundary_type in ['multi_polygon', 'turbine_specific']:
bound_min = np.vstack([(bound).min(0) for bound, _ in self.boundary_comp.boundaries]).min(0)
bound_max = np.vstack([(bound).max(0) for bound, _ in self.boundary_comp.boundaries]).max(0)
else:
bound_min = self.boundary_comp.xy_boundary.min(0)
bound_max = self.boundary_comp.xy_boundary.max(0)
for k, l, u in zip([topfarm.x_key, topfarm.y_key], bound_min, bound_max):
if k in design_vars:
if len(design_vars[k]) == 4:
design_vars[k] = (design_vars[k][0], np.maximum(design_vars[k][1], l),
np.minimum(design_vars[k][2], u), design_vars[k][-1])
else:
design_vars[k] = (design_vars[k][0], l, u, design_vars[k][-1])
def _setup(self, problem, group='constraint_group'):
n_wt = problem.n_wt
self.boundary_comp = self.get_comp(n_wt)
self.boundary_comp.problem = problem
self.set_design_var_limits(problem.design_vars)
# problem.xy_boundary = np.r_[self.boundary_comp.xy_boundary, self.boundary_comp.xy_boundary[:1]]
problem.indeps.add_output('xy_boundary', self.boundary_comp.xy_boundary)
getattr(problem.model, group).add_subsystem('xy_bound_comp', self.boundary_comp, promotes=['*'])
def setup_as_constraint(self, problem, group='constraint_group'):
self._setup(problem, group=group)
if problem.n_wt == 1:
lower = 0
else:
lower = self.boundary_comp.zeros
problem.model.add_constraint('boundaryDistances', lower=lower)
def setup_as_penalty(self, problem, group='constraint_group'):
self._setup(problem, group=group)
[docs]
class CircleBoundaryConstraint(XYBoundaryConstraint):
[docs]
def __init__(self, center, radius):
"""Initialize CircleBoundaryConstraint
Parameters
----------
center : (float, float)
center position (x,y)
radius : int or float
circle radius
"""
self.center = np.array(center)
self.radius = radius
self.const_id = 'circle_boundary_comp_{}_{}'.format(
'_'.join([str(int(c)) for c in center]), int(radius)).replace('.', '_')
def get_comp(self, n_wt):
if not hasattr(self, 'boundary_comp'):
self.boundary_comp = CircleBoundaryComp(n_wt, self.center, self.radius, self.const_id)
return self.boundary_comp
def set_design_var_limits(self, design_vars):
for k, l, u in zip([topfarm.x_key, topfarm.y_key],
self.center - self.radius,
self.center + self.radius):
if len(design_vars[k]) == 4:
design_vars[k] = (design_vars[k][0], np.maximum(design_vars[k][1], l),
np.minimum(design_vars[k][2], u), design_vars[k][-1])
else:
design_vars[k] = (design_vars[k][0], l, u, design_vars[k][-1])
class BoundaryBaseComp(ConstraintComponent):
def __init__(self, n_wt, xy_boundary=None, const_id=None, units=None, relaxation=False, **kwargs):
super().__init__(**kwargs)
self.n_wt = n_wt
self.xy_boundary = np.array(xy_boundary)
self.const_id = const_id
self.units = units
self.relaxation = relaxation
if xy_boundary is not None and np.any(self.xy_boundary[0] != self.xy_boundary[-1]):
self.xy_boundary = np.r_[self.xy_boundary, self.xy_boundary[:1]]
def setup(self):
# Explicitly size input arrays
self.add_input(topfarm.x_key, np.zeros(self.n_wt),
desc='x coordinates of turbines in global ref. frame', units=self.units)
self.add_input(topfarm.y_key, np.zeros(self.n_wt),
desc='y coordinates of turbines in global ref. frame', units=self.units)
if self.relaxation:
self.add_input('time', 0)
if hasattr(self, 'types'):
self.add_input('type', np.zeros(self.n_wt))
# self.add_output('constraint_violation_' + self.const_id, val=0.0)
# Explicitly size output array
# (vector with positive elements if turbines outside of hull)
self.add_output('boundaryDistances', self.zeros,
desc="signed perpendicular distances from each turbine to each face CCW; + is inside")
self.declare_partials('boundaryDistances', [topfarm.x_key, topfarm.y_key])
if self.relaxation:
self.declare_partials('boundaryDistances', 'time')
# self.declare_partials('boundaryDistances', ['boundaryVertices', 'boundaryNormals'], method='fd')
def compute(self, inputs, outputs):
# calculate distances from each point to each face
args = {x: inputs[x] for x in [topfarm.x_key, topfarm.y_key, topfarm.type_key] if x in inputs}
boundaryDistances = self.distances(**args)
outputs['boundaryDistances'] = boundaryDistances
# outputs['constraint_violation_' + self.const_id] = np.sum(np.minimum(boundaryDistances, 0) ** 2)
def compute_partials(self, inputs, partials):
# return Jacobian dict
if not self.relaxation:
dx, dy = self.gradients(**{xy: inputs[k] for xy, k in zip('xy', [topfarm.x_key, topfarm.y_key])})
else:
dx, dy, dt = self.gradients(**{xy: inputs[k] for xy, k in zip('xy', [topfarm.x_key, topfarm.y_key])})
partials['boundaryDistances', topfarm.x_key] = dx
partials['boundaryDistances', topfarm.y_key] = dy
if self.relaxation:
partials['boundaryDistances', 'time'] = dt
def plot(self, ax):
"""Plot boundary"""
ax.plot(
self.xy_boundary[:, 0].tolist() + [self.xy_boundary[0, 0]],
self.xy_boundary[:, 1].tolist() + [self.xy_boundary[0, 1]],
"k",
linewidth=1,
)
class ConvexBoundaryComp(BoundaryBaseComp):
def __init__(self, n_wt, xy_boundary=None, boundary_type='convex_hull', const_id=None, units=None):
self.boundary_type = boundary_type
self.calculate_boundary_and_normals(xy_boundary)
super().__init__(n_wt, self.xy_boundary, const_id, units)
self.calculate_gradients()
self.zeros = np.zeros([self.n_wt, self.nVertices])
def calculate_boundary_and_normals(self, xy_boundary):
xy_boundary = np.asarray(xy_boundary)
if self.boundary_type == 'convex_hull':
# find the points that actually comprise a convex hull
hull = ConvexHull(list(xy_boundary))
# keep only xy_vertices that actually comprise a convex hull and arrange in CCW order
self.xy_boundary = xy_boundary[hull.vertices]
elif self.boundary_type == 'square':
min_ = xy_boundary.min(0)
max_ = xy_boundary.max(0)
range_ = (max_ - min_)
x_c, y_c = min_ + range_ / 2
r = range_.max() / 2
self.xy_boundary = np.array([(x_c - r, y_c - r), (x_c + r, y_c - r),
(x_c + r, y_c + r), (x_c - r, y_c + r)])
elif self.boundary_type == 'rectangle':
min_ = xy_boundary.min(0)
max_ = xy_boundary.max(0)
range_ = (max_ - min_)
x_c, y_c = min_ + range_ / 2
r = range_ / 2
self.xy_boundary = np.array([(x_c - r[0], y_c - r[1]), (x_c + r[0], y_c - r[1]),
(x_c + r[0], y_c + r[1]), (x_c - r[0], y_c + r[1])])
else:
raise NotImplementedError("Boundary type '%s' is not implemented" % self.boundary_type)
# get the real number of xy_vertices
self.nVertices = self.xy_boundary.shape[0]
# initialize normals array
unit_normals = np.zeros([self.nVertices, 2])
# determine if point is inside or outside of each face, and distances from each face
for j in range(0, self.nVertices):
# calculate the unit normal vector of the current face (taking points CCW)
if j < self.nVertices - 1: # all but the set of point that close the shape
normal = np.array([self.xy_boundary[j + 1, 1] - self.xy_boundary[j, 1],
-(self.xy_boundary[j + 1, 0] - self.xy_boundary[j, 0])])
unit_normals[j] = normal / np.linalg.norm(normal)
else: # the set of points that close the shape
normal = np.array([self.xy_boundary[0, 1] - self.xy_boundary[j, 1],
-(self.xy_boundary[0, 0] - self.xy_boundary[j, 0])])
unit_normals[j] = normal / np.linalg.norm(normal)
self.unit_normals = unit_normals
def calculate_gradients(self):
unit_normals = self.unit_normals
# initialize array to hold distances from each point to each face
dfaceDistance_dx = np.zeros([self.n_wt * self.nVertices, self.n_wt])
dfaceDistance_dy = np.zeros([self.n_wt * self.nVertices, self.n_wt])
for i in range(0, self.n_wt):
# determine if point is inside or outside of each face, and distances from each face
for j in range(0, self.nVertices):
# define the derivative vectors from the point of interest to the first point of the face
dpa_dx = np.array([-1.0, 0.0])
dpa_dy = np.array([0.0, -1.0])
# find perpendicular distances derivatives from point to current surface (vector projection)
ddistanceVec_dx = np.vdot(dpa_dx, unit_normals[j]) * unit_normals[j]
ddistanceVec_dy = np.vdot(dpa_dy, unit_normals[j]) * unit_normals[j]
# calculate derivatives for the sign of perpendicular distances from point to current face
dfaceDistance_dx[i * self.nVertices + j, i] = np.vdot(ddistanceVec_dx, unit_normals[j])
dfaceDistance_dy[i * self.nVertices + j, i] = np.vdot(ddistanceVec_dy, unit_normals[j])
# return Jacobian dict
self.dfaceDistance_dx = dfaceDistance_dx
self.dfaceDistance_dy = dfaceDistance_dy
def calculate_distance_to_boundary(self, points):
"""
:param points: points that you want to calculate the distances from to the faces of the convex hull
:return face_distace: signed perpendicular distances from each point to each face; + is inside
"""
nPoints = np.array(points).shape[0]
xy_boundary = self.xy_boundary[:-1]
nVertices = xy_boundary.shape[0]
vertices = xy_boundary
unit_normals = self.unit_normals
# initialize array to hold distances from each point to each face
face_distance = np.zeros([nPoints, nVertices])
from numpy import newaxis as na
# define the vector from the point of interest to the first point of the face
PA = (vertices[:, na] - points[na])
# find perpendicular distances from point to current surface (vector projection)
dist = np.sum(PA * unit_normals[:, na], 2)
# calculate the sign of perpendicular distances from point to current face (+ is inside, - is outside)
d_vec = dist[:, :, na] * unit_normals[:, na]
face_distance = np.sum(d_vec * unit_normals[:, na], 2)
return face_distance.T
def distances(self, x, y):
return self.calculate_distance_to_boundary(np.array([x, y]).T)
def gradients(self, x, y):
return self.dfaceDistance_dx, self.dfaceDistance_dy
def satisfy(self, state, pad=1.1):
x, y = [np.asarray(state[xyz], dtype=float) for xyz in [topfarm.x_key, topfarm.y_key]]
dist = self.distances(x, y)
dist = np.where(dist < 0, np.minimum(dist, -.01), dist)
dx, dy = self.gradients(x, y) # independent of position
dx = dx[:self.nVertices, 0]
dy = dy[:self.nVertices, 0]
for i in np.where(dist.min(1) < 0)[0]: # loop over turbines that violate edges
# find smallest movement that where the constraints are satisfied
d = dist[i]
v = np.linspace(-np.abs(d.min()), np.abs(d.min()), 100)
X, Y = np.meshgrid(v, v)
m = np.ones_like(X)
for dx_, dy_, d in zip(dx, dy, dist.T):
m = np.logical_and(m, X * dx_ + Y * dy_ >= -d[i])
index = np.argmin(X[m]**2 + Y[m]**2)
x[i] += X[m][index]
y[i] += Y[m][index]
state[topfarm.x_key] = x
state[topfarm.y_key] = y
return state
class PolygonBoundaryComp(BoundaryBaseComp):
def __init__(self, n_wt, xy_boundary, const_id=None, units=None, relaxation=False):
self.nTurbines = n_wt
self.const_id = const_id
self.zeros = np.zeros(self.nTurbines)
self.units = units
self.boundary_properties = self.get_boundary_properties(xy_boundary)
BoundaryBaseComp.__init__(self, n_wt, xy_boundary=self.boundary_properties[0], const_id=const_id,
units=units, relaxation=relaxation)
self._cache_input = None
self._cache_output = None
self.relaxation = relaxation
def get_boundary_properties(self, xy_boundary, inclusion_zone=True):
vertices = np.array(xy_boundary)
def get_edges(vertices, counter_clockwise):
if np.any(vertices[0] != vertices[-1]):
vertices = np.r_[vertices, vertices[:1]]
x1, y1 = A = vertices[:-1].T
x2, y2 = B = vertices[1:].T
double_area = np.sum((x1 - x2) * (y1 + y2)) # 2 x Area (+: counterclockwise
assert double_area != 0, "Area must be non-zero"
if (counter_clockwise and double_area < 0) or (not counter_clockwise and double_area > 0): #
return get_edges(vertices[::-1], counter_clockwise)
else:
return vertices[:-1], A, B
# inclusion zones are defined counter clockwise (unit-normal vector pointing in) while
# exclusion zones are defined clockwise (unit-normal vector pointing out)
xy_boundary, A, B = get_edges(vertices, inclusion_zone)
dx, dy = AB = B - A
AB_len = np.linalg.norm(AB, axis=0)
edge_unit_normal = (np.array([-dy, dx]) / AB_len)
# A_normal and B_normal are the normal vectors at the nodes A,B (the mean of the adjacent edge normal vectors
A_normal = (edge_unit_normal + np.roll(edge_unit_normal, 1, 1)) / 2
B_normal = np.roll(A_normal, -1, 1)
# for (x, y), (dx, dy), (unx, uny) in zip(A.T, AB.T, edge_unit_normal.T):
# plt.arrow(x, y, dx, dy, color='k', head_width=.2)
# plt.arrow(x, y, unx, uny, color='r', head_width=.2)
# for (x, y), (nx, ny) in zip(A.T, A_normal.T):
# plt.arrow(x, y, nx, ny, color='b', head_width=.2)
# for (x, y), (nx, ny) in zip(B.T, B_normal.T):
# plt.arrow(x, y, nx / 2, ny / 2, color='g', head_width=.2)
return (xy_boundary, A, B, AB, AB_len, edge_unit_normal, A_normal, B_normal)
def _calc_distance_and_gradients(self, x, y, boundary_properties=None):
"""
distances point, P=(x,y) to edge(A->B)
+/-: inside/outside
"""
def vec_len(vec):
return np.linalg.norm(vec, axis=0)
boundary_properties = boundary_properties or self.boundary_properties[1:]
A, B, AB, AB_len, edge_unit_normal, A_normal, B_normal = boundary_properties
"""
A: edge start point
B: edge end point
edge_unit_normal: unit vector perpendicular to edge pointing to the good side
(i.e. inside for inclusion zones and outside for exclusion zones)
AB: Vector from A to B (edge)
AB_len: length of AB (edge)
A_normal: mean of edge unit normal vectors adjacent to A
B_normal: mean of edge unit normal vectors adjacent to B
"""
# Add dim to match (2, #P, #Edges), where the first dimension is (x,y)
P = np.array([x, y])[:, :, na]
A, B, AB = A[:, na], B[:, na], AB[:, na]
edge_unit_normal, A_normal, B_normal = edge_unit_normal[:, na], A_normal[:, na], B_normal[:, na]
AB_len = AB_len[na]
# ===============================================================================================================
# Determine if P is closer to A, B or the edge (between A and B)
# ===============================================================================================================
AP = P - A # vector from edge start to point
BP = P - B # vector from edge end to point
# signed component of AP on the edge vector
a_tilde = np.sum(AP * AB, axis=0) / AB_len
# a_tilde < 0: closer to A
# a_tilde > |AB|: closer to B
# else: closer to edge (between A and B)
use_A = 0 > a_tilde
use_B = a_tilde > AB_len
# ===============================================================================================================
# Calculate distance from P to closer point on edge
# ===============================================================================================================
# Perpendicular distances to edge (AP dot edge_unit_normal product).
# This is the distance to the edge if not use_A or use_B
distance = np.sum((AP) * edge_unit_normal, 0)
# Update distance for points closer to A
good_side_of_A = (np.sum((AP * A_normal)[:, use_A], 0) > 0)
sign_use_A = np.where(good_side_of_A, 1, -1)
distance[use_A] = (vec_len(AP[:, use_A]) * sign_use_A)
# Update distance for points closer to B
good_side_of_B = np.sum((BP * B_normal)[:, use_B], 0) > 0
sign_use_B = np.where(good_side_of_B, 1, -1)
distance[use_B] = (vec_len(BP[:, use_B]) * sign_use_B)
# ===============================================================================================================
# Calculate gradient of distance from P to closer point on edge wrt. x and y
# ===============================================================================================================
# Gradient of perpendicular distances to edge.
# This is the gradient if not use_A or use_B
ddist_dxy = np.tile(edge_unit_normal, (1, len(x), 1))
# Update gradient for points closer to A or B
eps = 1e-7 # avoid division by zero
ddist_dxy[:, use_A] = sign_use_A * (AP[:, use_A] / (vec_len(AP[:, use_A]) + eps))
ddist_dxy[:, use_B] = sign_use_B * (BP[:, use_B] / (vec_len(BP[:, use_B]) + eps))
ddist_dX, ddist_dY = ddist_dxy
return distance, ddist_dX, ddist_dY
def calc_distance_and_gradients(self, x, y):
if not np.shape([x, y]) == np.shape(self._cache_input):
pass
elif np.all(np.array([x, y]) == self._cache_input):
return self._cache_output
distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x, y)
closest_edge_index = np.argmin(np.abs(distance), 1)
self._cache_input = np.array([x, y])
self._cache_output = [ # pick only closest edge
v[np.arange(len(closest_edge_index)), closest_edge_index] for v in [distance, ddist_dX, ddist_dY]
]
return self._cache_output
def distances(self, x, y):
return self.calc_distance_and_gradients(x, y)[0]
def gradients(self, x, y):
_, dx, dy = self.calc_distance_and_gradients(x, y)
return np.diagflat(dx), np.diagflat(dy)
def satisfy(self, state, pad=1.1):
x, y = [np.asarray(state[xy], dtype=float) for xy in [topfarm.x_key, topfarm.y_key]]
dist = self.distances(x, y)
dx, dy = map(np.diag, self.gradients(x, y))
m = dist < 0
x[m] -= dx[m] * dist[m] * pad
y[m] -= dy[m] * dist[m] * pad
state[topfarm.x_key] = x
state[topfarm.y_key] = y
return state
class CircleBoundaryComp(PolygonBoundaryComp):
def __init__(self, n_wt, center, radius, const_id=None, units=None):
self.center = center
self.radius = radius
t = np.linspace(0, 2 * np.pi, 100)
xy_boundary = self.center + np.array([np.cos(t), np.sin(t)]).T * self.radius
BoundaryBaseComp.__init__(self, n_wt, xy_boundary, const_id, units)
self.zeros = np.zeros(self.n_wt)
def plot(self, ax=None):
ax = ax or plt.gca()
circle = Circle(self.center, self.radius, color='k', fill=False)
ax.add_artist(circle)
def distances(self, x, y):
return self.radius - np.sqrt((x - self.center[0])**2 + (y - self.center[1])**2)
def gradients(self, x, y):
theta = np.arctan2(y - self.center[1], x - self.center[0])
dx = -1 * np.ones_like(x)
dy = -1 * np.ones_like(x)
dist = self.radius - np.sqrt((x - self.center[0])**2 + (y - self.center[1])**2)
not_center = dist != self.radius
dx[not_center], dy[not_center] = -np.cos(theta[not_center]), -np.sin(theta[not_center])
return np.diagflat(dx), np.diagflat(dy)
class Zone(object):
def __init__(self, boundary, dist2wt, geometry_type, incl, name):
self.name = name
self.boundary = boundary
self.dist2wt = dist2wt
self.geometry_type = geometry_type
self.incl = incl
class InclusionZone(Zone):
def __init__(self, boundary, dist2wt=None, geometry_type='polygon', name=''):
super().__init__(np.asarray(boundary), dist2wt, geometry_type, incl=1, name=name)
class ExclusionZone(Zone):
def __init__(self, boundary, dist2wt=None, geometry_type='polygon', name=''):
super().__init__(np.asarray(boundary), dist2wt, geometry_type, incl=0, name=name)
class MultiPolygonBoundaryComp(PolygonBoundaryComp):
def __init__(self, n_wt, zones, const_id=None, units=None, relaxation=False, method='nearest',
simplify_geometry=False):
'''
Parameters
----------
n_wt : TYPE
DESCRIPTION.
zones : list
list of InclusionZone and ExclusionZone objects
const_id : TYPE, optional
DESCRIPTION. The default is None.
units : TYPE, optional
DESCRIPTION. The default is None.
method : {'nearest' or 'smooth_min'}, optional
'nearest' calculate the distance to the nearest edge or point'smooth_min'
calculates the weighted minimum distance to all edges/points. The default is 'nearest'.
simplify : float or dict
if float, simplification tolerance. if dict, shapely.simplify keyword arguments
Returns
-------
None.
'''
self.zones = zones
self.bounds_poly, xy_boundaries = self.get_xy_boundaries()
PolygonBoundaryComp.__init__(self, n_wt, xy_boundary=xy_boundaries[0], const_id=const_id, units=units, relaxation=relaxation)
# self.bounds_poly = [Polygon(x) for x in xy_boundaries]
self.incl_excls = [x.incl for x in zones]
self._setup_boundaries(self.bounds_poly, self.incl_excls)
self.relaxation = relaxation
self.method = method
if simplify_geometry:
self.simplify(simplify_geometry)
def simplify(self, simplify_geometry):
bounds = [bi[0] for bi in self.boundaries]
self.incl_excls = [bi[1] for bi in self.boundaries]
polygons = [Polygon(b) for b in bounds]
if isinstance(simplify_geometry, dict):
self.bounds_poly = [rp.simplify(**simplify_geometry) for rp in polygons]
else:
self.bounds_poly = [rp.simplify(simplify_geometry) for rp in polygons]
self._setup_boundaries(self.bounds_poly, self.incl_excls)
# def line_to_xy_boundary(self, line, buffer):
# return np.asarray(Polygon(LineString(line).buffer(buffer, join_style=2).exterior).exterior.coords)
def get_xy_boundaries(self):
polygons = []
bounds = []
for z in self.zones:
# hardcoded values passed to this function... it does not impact the
# result because this only happends due to turbine specific component
# calling into parent constructor;
poly = self._zone2poly(z, D=-1, H=-1)
polygons.append(poly)
bounds.append(np.asarray(poly.exterior.coords))
return polygons, bounds
def _setup_boundaries(self, bounds_poly, incl_excl):
self.res_poly = self._calc_resulting_polygons(bounds_poly, incl_excl)
self.boundaries = self._poly_to_bound(self.res_poly)
boundary_properties_list_all = list(zip(*[self.get_boundary_properties(bound, incl_excl)[1:]
for bound, incl_excl in self.boundaries]))
self.boundary_properties_list_all = [np.concatenate(v, -1)
for v in boundary_properties_list_all]
def _poly_to_bound(self, polygons):
boundaries = []
for bound in polygons:
x, y = bound.exterior.xy
boundaries.append((np.asarray([x, y]).T[:-1, :], 1))
for interior in bound.interiors:
x, y = interior.xy
boundaries.append((np.asarray([x, y]).T[:-1, :], 0))
return boundaries
@staticmethod
def _calc_resulting_polygons(boundary_polygons, incl_excls):
"""
Parameters
----------
boundary_polygons : list[shapely.Polygon]
list of shapely polygons as specifed or inferred from user input
incl_excls : list[bool]
list of boolean values specifying whether the polygon is an inclusion or exclusion
Returns
-------
list of merged shapely polygons. Resolves issues arrising if any are overlapping, touching or contained in each other
"""
included_polygons = [
boundary_polygons[i] for i, x in enumerate(incl_excls) if x == 1
]
excluded_polygons = [
boundary_polygons[i] for i, x in enumerate(incl_excls) if x == 0
]
included_polygons = unary_union(included_polygons)
excluded_polygons = unary_union(excluded_polygons)
remain_polygons = included_polygons.difference(excluded_polygons)
if isinstance(remain_polygons, Polygon):
return [remain_polygons] if remain_polygons.area > 1e-3 else []
if isinstance(remain_polygons, MultiPolygon):
return [
poly for poly in remain_polygons.geoms if poly.area > 1e-3
]
return []
def sign(self, Dist_ij):
return np.sign(Dist_ij[np.arange(Dist_ij.shape[0]), np.argmin(abs(Dist_ij), axis=1)])
def calc_distance_and_gradients(self, x, y):
'''
Parameters
----------
x : 1d array
Array of x-positions.
y : 1d array
Array of y-positions.
Returns
-------
D_ij : 2d array
Array of point-edge distances. index 'i' is points and index 'j' is total number of edges.
sign_i : 1d array
Array of signs of the governing distance.
dDdk_jk : 2d array
Jacobian of the distance matrix D_ij with respect to x and y.
'''
if not np.shape([x, y]) == np.shape(self._cache_input):
pass
elif np.all(np.array([x, y]) == self._cache_input) & (not self.relaxation):
return self._cache_output
Dist_ij, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x, y, self.boundary_properties_list_all)
dDdk_ijk = np.moveaxis([ddist_dX, ddist_dY], 0, -1)
sign_i = self.sign(Dist_ij)
self._cache_input = np.array([x, y])
self._cache_output = [Dist_ij, dDdk_ijk, sign_i]
return self._cache_output
def calc_relaxation(self, iteration_no=None):
'''
The tupple relaxation contains a first term for the penalty constant
and a second term for the n first iterations to apply relaxation.
'''
if iteration_no is None:
iteration_no = self.problem.cost_comp.n_grad_eval + 1
return max(0, self.relaxation[0] * (self.relaxation[1] - iteration_no))
def distances(self, x, y):
Dist_ij, _, sign_i = self.calc_distance_and_gradients(x, y)
if self.method == 'smooth_min':
Dist_i = smooth_max(np.abs(Dist_ij), -np.abs(Dist_ij).max(), axis=1) * sign_i
elif self.method == 'nearest':
Dist_i = Dist_ij[np.arange(x.size), np.argmin(np.abs(Dist_ij), axis=1)]
else:
warning = f'method: {self.method} is not implemented. Available options are smooth_min and nearest.'
warnings.warn(warning)
if self.relaxation:
Dist_i += self.calc_relaxation()
return Dist_i
def gradients(self, x, y):
'''
The derivate of the smooth maximum with respect to x and y is calculated with the chain rule:
dS/dk = dS/dD * dD/dk
where S is smooth maximum, D is distance to edge and k is the spacial dimension
'''
Dist_ij, dDdk_ijk, _ = self.calc_distance_and_gradients(x, y)
if self.relaxation:
Dist_ij += self.calc_relaxation()
# dDdt = -self.relaxation[1]
if self.method == 'smooth_min':
dSdDist_ij = smooth_max_gradient(np.abs(Dist_ij), -np.abs(Dist_ij).max(), axis=1)
dSdkx_i, dSdky_i = (dSdDist_ij[:, :, na] * dDdk_ijk).sum(axis=1).T
elif self.method == 'nearest':
dSdkx_i, dSdky_i = dDdk_ijk[np.arange(x.size), np.argmin(np.abs(Dist_ij), axis=1), :].T
if self.relaxation:
# as relaxed distance is relaxation + distance, the gradient with respect to x and y is unchanged
gradients = np.diagflat(dSdkx_i), np.diagflat(dSdky_i), np.ones(self.n_wt) * self.relaxation[1]
else:
gradients = np.diagflat(dSdkx_i), np.diagflat(dSdky_i)
return gradients
def relaxed_polygons(self, iteration_no=None):
poly = [Polygon(x.boundary) for x in self.zones]
booleans = [x.incl for x in self.zones]
relaxed_poly = []
for i, p in enumerate(poly):
if booleans[i] == 0:
pb = p.buffer(-self.calc_relaxation(iteration_no), join_style=2)
relaxed_poly.append(pb)
else:
pb = p.buffer(self.calc_relaxation(iteration_no), join_style=2)
relaxed_poly.append(pb)
merged_poly = self._calc_resulting_polygons(relaxed_poly, booleans)
return self._poly_to_bound(merged_poly)
@staticmethod
def _zone2poly(z: Zone, **kwargs):
buffer = 0
if hasattr(z.dist2wt, "__code__"):
buffer = z.dist2wt(
**{k: kwargs[k] for k in z.dist2wt.__code__.co_varnames}
)
if not is_number(buffer):
raise ValueError(f"dist2wt must return a float, not {type(buffer)}")
elif is_number(z.dist2wt):
buffer = z.dist2wt
elif z.dist2wt is not None:
warnings.warn(
f"dist2wt is not a function or a float and is ignored. Zone buffer is set to 0."
)
buf_direction = -1 if z.incl else 1
buffer = abs(buffer) * buf_direction
if z.geometry_type == "line":
poly = Polygon(LineString(z.boundary).buffer(buffer, join_style=2).exterior)
elif z.geometry_type == "polygon":
poly = Polygon(z.boundary).buffer(buffer, join_style=2)
else:
raise NotImplementedError(
f"Geometry type '{z.geometry_type}' is not implemented"
)
return poly
def plot(self, ax):
"""Plot original and buffered boundaries"""
legend_mask = [0, 0]
for i, (zone, buffered_poly) in enumerate(zip(self.zones, self.bounds_poly)):
original_coords = zone.boundary
if zone.geometry_type == "line":
ax.plot(
original_coords[:, 0],
original_coords[:, 1],
"k--",
linewidth=1,
alpha=0.5,
label="Original line" if i == 0 else "",
)
else:
original_poly = np.vstack([original_coords, original_coords[0]])
ax.plot(
original_poly[:, 0],
original_poly[:, 1],
"k--",
linewidth=1,
alpha=0.5,
label="Original boundary" if i == 0 else "",
)
x, y = buffered_poly.exterior.xy
color = "green" if zone.incl else "red"
label = "Inclusion" if zone.incl else "Exclusion"
if legend_mask[zone.incl]:
label = ""
legend_mask[zone.incl] = 1
ax.plot(
x,
y,
color=color,
linewidth=1,
linestyle="-",
label=label,
)
ax.fill(x, y, color=color, alpha=0.1)
ax.legend()
class TurbineSpecificBoundaryComp(MultiPolygonBoundaryComp):
def __init__(self, n_wt, wind_turbines, zones, const_id=None,
units=None, relaxation=False, method='nearest', simplify_geometry=False):
# self.dependencies = [d or {'type': None, 'multiplier': None, 'ref': None} for d in dependencies]
# self.multi_boundary = xy_boundaries = self.get_xy_boundaries(boundaries, geometry_types, incl_excls)
self.wind_turbines = wind_turbines
self.types = wind_turbines.types()
# self.incl_excls = incl_excls
self.n_wt = n_wt
self.zones = zones
self.ts_polygon_boundaries, ts_xy_boundaries = self.get_ts_boundaries()
MultiPolygonBoundaryComp.__init__(self, n_wt=n_wt, zones=zones, const_id=const_id, units=units,
relaxation=relaxation, method=method, simplify_geometry=simplify_geometry)
# self.polygon_boundaries = [Polygon(x) for x, _ in xy_boundaries]
# self.ts_polygon_boundaries = self.get_ts_polygon_boundaries(self.types)
self.ts_merged_polygon_boundaries = self.merge_boundaries()
self.ts_merged_xy_boundaries = self.get_ts_xy_boundaries()
self.ts_boundary_properties = self.get_ts_boundary_properties()
self.ts_item_indices = self.get_ts_item_indices()
def get_ts_boundaries(self):
polygons = []
bounds = []
for t in set(self.types):
temp1 = []
temp2 = []
dist2wt_input = dict(
D=self.wind_turbines.diameter(t),
H=self.wind_turbines.hub_height(t),
)
for z in self.zones:
poly = self._zone2poly(z, **dist2wt_input)
bound = np.asarray(poly)
temp1.append(poly)
temp2.append(bound)
polygons.append(temp1)
bounds.append(temp2)
return polygons, bounds
# for wt
# temp = []
# for n, (b, t, ie) in enumerate(zip(boundaries, geometry_types, incl_excls)):
# if t == 'line':
# bound = np.asarray(Polygon(LineString(b).buffer(default_ref, join_style=2).exterior).exterior.coords)
# self.dependencies[n]['ref'] = default_ref
# elif t == 'polygon':
# bound = b
# else:
# raise NotImplementedError("Geometry type '%s' is not implemented" % b)
# temp.append((bound, ie))
# def get_ts_polygon_boundaries(self, types):
# temp = []
# for t in set(types):
# d = self.wind_turbines.diameter(t)
# h = self.wind_turbines.hub_height(t)
# temp.append(self.get_ts_polygon_boundary(d, h))
# return temp
def get_ts_xy_boundaries(self):
return [self._poly_to_bound(b) for b in self.ts_merged_polygon_boundaries]
# def get_ts_polygon_boundary(self, d=None, h=None):
# temp = []
# for bound, dep in zip(self.polygon_boundaries, self.dependencies):
# ref = dep['ref'] or 0
# if dep['type'] == 'D':
# ts_polygon_boundary = bound.buffer(dep['multiplier']*d-ref, join_style=2)
# elif dep['type'] == 'H':
# ts_polygon_boundary = bound.buffer(dep['multiplier']*h-ref, join_style=2)
# else:
# ts_polygon_boundary = bound
# temp.append(ts_polygon_boundary)
# return temp
def merge_boundaries(self):
return [self._calc_resulting_polygons(bounds, self.incl_excls) for bounds in self.ts_polygon_boundaries]
def get_ts_boundary_properties(self,):
return [[self.get_boundary_properties(bound) for bound, _ in bounds] for bounds in self.ts_merged_xy_boundaries]
def get_ts_item_indices(self):
temp = []
for bounds in self.ts_merged_xy_boundaries:
n_edges = np.asarray([len(bound) for bound, _ in bounds])
n_edges_tot = np.sum(n_edges)
start_at = np.cumsum(n_edges) - n_edges
end_at = start_at + n_edges
item_indices = [n_edges_tot, start_at, end_at]
temp.append(item_indices)
return temp
def calc_distance_and_gradients(self, x, y, types=None):
if self._cache_input is None:
pass
elif not np.shape([x, y]) == np.shape(self._cache_input[0]) or not np.shape(types) == np.shape(self._cache_input[1]):
pass
elif np.all(np.array([x, y]) == self._cache_input[0]) & (not self.relaxation) & np.all(np.asarray([types]) == self._cache_input[1]):
return self._cache_output
if types is None:
types = np.zeros(self.n_wt)
Dist_i = np.zeros(self.n_wt)
sign_i = np.zeros(self.n_wt)
dDdx_i = np.zeros(self.n_wt)
dDdy_i = np.zeros(self.n_wt)
for t in set(types):
t = int(t)
idx = (types == t)
n_edges_tot, start_at, end_at = self.ts_item_indices[t]
Dist_ij = np.zeros((sum(idx), n_edges_tot))
dDdk_ijk = np.zeros((sum(idx), n_edges_tot, 2))
for n, (bound, bound_type) in enumerate(self.ts_merged_xy_boundaries[t]):
sa = start_at[n]
ea = end_at[n]
distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x[idx], y[idx], self.ts_boundary_properties[t][n][1:])
if bound_type == 0:
distance *= -1
ddist_dX *= -1
ddist_dY *= -1
Dist_ij[:, sa:ea] = distance
dDdk_ijk[:, sa:ea, 0] = ddist_dX
dDdk_ijk[:, sa:ea, 1] = ddist_dY
sign_i[idx] = self.sign(Dist_ij)
Dist_i[idx] = Dist_ij[np.arange(sum(idx)), np.argmin(np.abs(Dist_ij), axis=1)]
dDdx_i[idx], dDdy_i[idx] = dDdk_ijk[np.arange(sum(idx)), np.argmin(np.abs(Dist_ij), axis=1), :].T
self._cache_input = (np.array([x, y]), np.asarray(types))
self._cache_output = [Dist_i, dDdx_i, dDdy_i, sign_i]
return self._cache_output
def distances(self, x, y, type=None):
Dist_i, _, _, _ = self.calc_distance_and_gradients(x, y, types=type)
if self.relaxation:
Dist_i += self.calc_relaxation()
return Dist_i
def gradients(self, x, y, type=None):
Dist_i, dDdx_i, dDdy_i, _ = self.calc_distance_and_gradients(x, y, types=type)
if self.relaxation:
Dist_i += self.calc_relaxation()
dDdt = -self.relaxation[0]
if self.relaxation:
gradients = np.diagflat(dDdx_i), np.diagflat(dDdy_i), np.ones(self.n_wt) * dDdt
else:
gradients = np.diagflat(dDdx_i), np.diagflat(dDdy_i)
return gradients
def plot(self, ax):
linestyles = ["--", "-"]
colors = ["b", "r", "m", "c", "g", "y", "orange", "indigo", "grey"]
for n, t in enumerate(self.types):
_ = ax.plot(
*self.ts_merged_xy_boundaries[n][0][0][0, :],
color=colors[t % len(colors)],
linewidth=1,
label=f"{self.wind_turbines._names[n]} boundary",
)
for bound, io in self.ts_merged_xy_boundaries[n]:
ax.plot(
np.asarray(bound)[:, 0].tolist() + [np.asarray(bound)[0, 0]],
np.asarray(bound)[:, 1].tolist() + [np.asarray(bound)[0, 1]],
color=colors[t % len(colors)],
linewidth=1,
linestyle=linestyles[io],
)
for i, zone in enumerate(self.zones):
original_coords = zone.boundary
if zone.geometry_type == "line":
ax.plot(
original_coords[:, 0],
original_coords[:, 1],
"k-.",
linewidth=0.8,
alpha=0.7,
label=f"Original" if (i == 0) else "",
)
else:
original_poly = np.vstack([original_coords, original_coords[0]])
ax.plot(
original_poly[:, 0],
original_poly[:, 1],
"k-.",
linewidth=0.8,
alpha=0.7,
label=f"Original" if (i == 0) else "",
)
ax.legend()
class MultiCircleBoundaryComp(PolygonBoundaryComp):
def __init__(self, n_wt, geometry, wt_groups, const_id=None, units=None):
self.__validate_input(geometry, wt_groups)
self.center = [g[0] for g in geometry]
self.radius = [g[1] for g in geometry]
# ones where the indices in each wt_group are active, zeros elsewhere
self.masks = [np.isin(np.arange(n_wt), g) for g in wt_groups]
BoundaryBaseComp.__init__(self, n_wt, None, const_id, units)
self.zeros = np.zeros(self.n_wt)
def __validate_input(self, geometry, wt_groups):
does_len_match = len(geometry) == len(wt_groups)
centers_and_radii_given = all(len(g) == 2 for g in geometry)
from topfarm.utils import _np2scalar # fmt:skip
are_radii_scalar = True
try:
_ = [_np2scalar(g[1]) for g in geometry]
except BaseException:
are_radii_scalar = False
are_centers_all_2x_coords = all(
hasattr(g[0], "__iter__") and len(g[0]) == 2 for g in geometry
)
assert all(
[
does_len_match,
centers_and_radii_given,
are_radii_scalar,
are_centers_all_2x_coords,
]
), (
"Invalid input for Circle: Ensure the number of geometries matches the number of wt_groups, "
"each geometry is a tuple of center and radius, and the center is a tuple of x and y "
"with the radius being a scalar. For instance, geometry = [((0, 0), 100), ...] and "
"wt_groups = [[0, 1], ...]; Here, the first geometry is a circle with center at (0, 0) "
"and radius 100, and turbines 0 and 1 are assigned for the circle constraint 0."
)
def plot(self, ax=None):
ax = ax or plt.gca()
for center, radius in zip(self.center, self.radius):
circle = Circle(center, radius, color="k", fill=False)
ax.add_artist(circle)
def distances(self, x, y):
assert (
x.shape == y.shape == self.masks[0].shape
), f"{x.shape} != {y.shape} != {self.masks[0].shape}"
distances = np.zeros_like(x)
for center, radius, mask in zip(self.center, self.radius, self.masks):
distances += mask * (
radius - np.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2)
)
return distances
def gradients(self, x, y):
dx = np.zeros_like(x)
dy = np.zeros_like(x)
for center, radius, mask in zip(self.center, self.radius, self.masks):
theta = np.arctan2(y - center[1], x - center[0])
dx_tmp = -1 * np.ones_like(x)
dy_tmp = -1 * np.ones_like(x)
dist = radius - np.sqrt((x - center[0]) ** 2 + (y - center[1]) ** 2)
not_center = dist != radius
dx_tmp[not_center], dy_tmp[not_center] = mask[not_center] * -np.cos(
theta[not_center]
), mask[not_center] * -np.sin(theta[not_center])
dx += dx_tmp
dy += dy_tmp
return np.diagflat(dx), np.diagflat(dy)
@dataclass
class Boundary(object):
_vertices: np.ndarray
design_var_mask: np.ndarray
normals: np.ndarray = None
@property
def n_turbines(self):
return self.design_var_mask.sum()
@property
def n_vertices(self):
if np.all(self.vertices[0] == self.vertices[-1]):
return self.vertices.shape[0] - 1
return self.vertices.shape[0]
@property
def vertices(self):
return self._vertices
@vertices.setter
def vertices(self, v):
self._vertices = v
def __post_init__(self):
self.__validate()
def __validate(self):
self.vertices = np.asarray(self.vertices)
assert self.vertices.ndim == 2, "Boundary must be a 2D array"
assert any(
[x for x in self.vertices.shape if x == 2]
), "Boundary must have shape (n, 2) or (2, n)"
self.vertices = self.vertices.reshape(-1, 2)
assert self.vertices.shape[0] > 2, "Boundary must have at least 3 vertices"
assert self.design_var_mask.ndim == 1, "design_var_mask must 1 dimensional"
self.design_var_mask = np.asarray(self.design_var_mask, dtype=bool)
class MultiConvexBoundaryComp(BoundaryBaseComp):
def __init__(
self,
n_wt,
geometry,
wt_groups,
const_id=None,
units=None,
):
xy_boundaries = [
Boundary(g, np.isin(np.arange(n_wt), mask))
for g, mask in zip(geometry, wt_groups)
]
self.xy_boundaries = self.sort_boundaries(xy_boundaries)
self.xy_boundaries = self.calculate_boundary_and_normals(self.xy_boundaries)
super().__init__(n_wt, None, const_id, units)
self.turbine_vertice_prod = 0
total_n_active_turbines = 0
for b in self.xy_boundaries:
if np.any(b.vertices[0] != b.vertices[-1]):
b.vertices = np.r_[b.vertices, b.vertices[:1]]
self.turbine_vertice_prod += b.n_turbines * b.n_vertices
total_n_active_turbines += b.n_turbines
assert (
total_n_active_turbines == n_wt
), "Number of active turbines in boundaries must match number of total turbines; Check that masks sum up to n_wt i.e. np.concatenate(all_masks).sum() == n_wt."
self.calculate_gradients()
self.zeros = np.zeros(self.turbine_vertice_prod)
def sort_boundaries(self, boundaries):
def centroid(boundary): # fmt: skip
return tuple(np.mean(boundary.vertices, axis=0))
return sorted(boundaries, key=lambda b: centroid(b))
def calculate_boundary_and_normals(
self, xy_boundaries: list[Boundary]
) -> list[Boundary]:
def __compute_normal(boundary_pts, ii, jj):
"""Calculate the unit normal vector of the current face (taking points CCW)"""
normal = np.array(
[
boundary_pts[ii, 1] - boundary_pts[jj, 1],
-(boundary_pts[ii, 0] - boundary_pts[jj, 0]),
]
)
return normal / np.linalg.norm(normal)
for boundary in xy_boundaries:
hull = ConvexHull(list(boundary.vertices))
# keep only vertices that actually comprise a convex hull and arrange in CCW order
boundary.vertices = boundary.vertices[hull.vertices]
# initialize normals array
unit_normals = np.zeros([boundary.n_vertices, 2])
# determine if point is inside or outside and distances from each face
nvtm1 = boundary.n_vertices - 1
for j in range(0, nvtm1):
# all but the points that close the shape
unit_normals[j] = __compute_normal(boundary.vertices, j + 1, j)
# points that close the shape
unit_normals[nvtm1] = __compute_normal(boundary.vertices, 0, nvtm1)
boundary.normals = unit_normals
return xy_boundaries
def calculate_gradients(self):
# this is flawed if the order of boundaries is switched;
# test with arbitrary number of vertices and arbitrary number
# of turbines in a boundary; For now it's sorted at the top..
final_dx = np.zeros([self.turbine_vertice_prod, self.n_wt])
final_dy = np.zeros([self.turbine_vertice_prod, self.n_wt])
for bi, boundary in enumerate(self.xy_boundaries):
assert (
boundary.design_var_mask is not None
), "Design variable mask must be provided"
assert self.n_wt == len(
boundary.design_var_mask
), "Design variable mask must must be the same length as the number of turbines"
unit_normals = boundary.normals
n_vertices = boundary.n_vertices
n_turbines = boundary.n_turbines
dfaceDistance_dx = np.zeros([n_turbines * n_vertices, n_turbines])
dfaceDistance_dy = np.zeros([n_turbines * n_vertices, n_turbines])
for i in range(0, n_turbines):
# determine if point is inside or outside of each face, and distances from each face
for j in range(0, n_vertices):
# define the derivative vectors from the point of interest to the first point of the face
dpa_dx = np.array([-1.0, 0.0])
dpa_dy = np.array([0.0, -1.0])
# find perpendicular distances derivatives from point to current surface (vector projection)
ddistanceVec_dx = np.vdot(dpa_dx, unit_normals[j]) * unit_normals[j]
ddistanceVec_dy = np.vdot(dpa_dy, unit_normals[j]) * unit_normals[j]
# calculate derivatives for the sign of perpendicular distances from point to current face
dfaceDistance_dx[i * n_vertices + j, i] = np.vdot(
ddistanceVec_dx, unit_normals[j]
)
dfaceDistance_dy[i * n_vertices + j, i] = np.vdot(
ddistanceVec_dy, unit_normals[j]
)
seek = sum([(b.n_vertices * b.n_turbines) for b in self.xy_boundaries[:bi]])
final_dx[
seek: seek + (n_turbines * n_vertices), boundary.design_var_mask
] = dfaceDistance_dx
final_dy[
seek: seek + (n_turbines * n_vertices), boundary.design_var_mask
] = dfaceDistance_dy
dfaceDistance_dx = final_dx
dfaceDistance_dy = final_dy
def __wrap_sparse(m): # fmt: skip
if m.size < 1e4:
return m
from scipy.sparse import csr_matrix # fmt: skip
return csr_matrix(m)
# store Jacobians
self.dfaceDistance_dx = __wrap_sparse(dfaceDistance_dx)
self.dfaceDistance_dy = __wrap_sparse(dfaceDistance_dy)
def distances(self, x, y):
"""
:param points: points that you want to calculate the distances from to the faces of the convex hull
:return face_distace: signed perpendicular distances from each point to each face; + is inside
"""
points = np.array([x, y]).T
face_distances = np.zeros(self.turbine_vertice_prod)
for bi, boundary in enumerate(self.xy_boundaries):
mask = boundary.design_var_mask
vertices = boundary.vertices[:-1]
n_vertices = boundary.n_vertices
PA = vertices[:, na] - points[mask][na]
dist = np.sum(PA * boundary.normals[:, na], axis=2)
d_vec = dist[:, :, na] * boundary.normals[:, na]
seek = sum([(b.n_vertices * b.n_turbines) for b in self.xy_boundaries[:bi]])
face_distances[seek: seek + (boundary.n_turbines * n_vertices)] = np.sum(
d_vec * boundary.normals[:, na], axis=2
).T.reshape(-1)
return face_distances
def gradients(self, x, y):
return self.dfaceDistance_dx, self.dfaceDistance_dy
def satisfy(self, state):
raise NotImplementedError("Not implemented for MultiConvexBoundaryComp")
def plot(self, ax):
for b in self.xy_boundaries:
ax.plot(
b.vertices[:, 0].tolist(),
b.vertices[:, 1].tolist(),
"k",
linewidth=1,
)
class MultiWFPolygonBoundaryComp(PolygonBoundaryComp):
def __init__(
self,
n_wt: int,
geometry,
wt_groups,
**kwargs,
):
boundaries = {i: geom for i, geom in enumerate(geometry)}
turbine_groups = {i: group for i, group in enumerate(wt_groups)}
self.boundaries = {} # group_id: boundary_coords
for group_id, boundary_coords in boundaries.items():
boundary_coords = self.__validate_boundary_coords(boundary_coords)
# close the boundary if needed
if not np.all(boundary_coords[0] == boundary_coords[-1]):
boundary_coords = np.vstack([boundary_coords, boundary_coords[0]])
self.boundaries[group_id] = boundary_coords
self.__validate_group_assignments(turbine_groups, n_wt)
self.turbine_groups = {i: -1 for i in range(n_wt)} # turbine_idx: group_id
for group_id, indices in turbine_groups.items():
if group_id not in self.boundaries:
raise ValueError(f"No boundary defined for group {group_id}")
for idx in indices:
self.turbine_groups[idx] = group_id
# check that all turbines are assigned to a group
if -1 in self.turbine_groups.values():
raise ValueError(f"All turbines must be assigned to a group; All the -1 should be filled\n{self.turbine_groups}")
super().__init__(
n_wt, np.array(list(boundaries.values())[0]).reshape(-1, 2), **kwargs
)
def __validate_boundary_coords(self, boundary_coords: np.ndarray) -> None:
if not isinstance(boundary_coords, (np.ndarray, list)):
raise TypeError(
"Boundary coordinates must be a numpy array or a list of lists"
)
try:
boundary_coords = np.array(boundary_coords).reshape(-1, 2)
except BaseException:
raise ValueError(
"Boundary coordinates must be a 2D array with shape (n,2)"
)
if boundary_coords.ndim != 2 or boundary_coords.shape[1] != 2:
raise ValueError("Boundary coordinates must be a 2D array with shape (n,2)")
if len(boundary_coords) < 3:
raise ValueError("Boundary must have at least 3 points")
return boundary_coords
def __validate_group_assignments(
self, groups: Dict[int, List[int]], num_turbines: int
) -> None:
if not isinstance(groups, dict):
raise TypeError("Groups must be a dictionary")
valid_types = (int, np.integer)
for group_id, turbine_indices in groups.items():
if not isinstance(group_id, valid_types) or group_id < 0:
raise ValueError(
f"Invalid group ID: {group_id}; Should be an integer >= 0"
)
if not all(
isinstance(idx, valid_types) and 0 <= idx < num_turbines
for idx in turbine_indices
):
raise ValueError(
f"Invalid turbine indices in group {group_id}; Should be integers in range [0, {num_turbines})"
)
def __dist_grad_wrapper(self, x, y, boundary_prop):
if not np.shape([x, y]) == np.shape(self._cache_input):
pass
elif np.all(np.array([x, y]) == self._cache_input):
return self._cache_output
distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients(
x, y, boundary_prop
)
closest_edge_index = np.argmin(np.abs(distance), 1)
self._cache_input = np.array([x, y])
self._cache_output = [ # pick only closest edge
v[np.arange(len(closest_edge_index)), closest_edge_index] for v in [distance, ddist_dX, ddist_dY]
]
return self._cache_output
def __calculate_group_distances_and_gradients(self, x, y):
"""Helper method to calculate distances and gradients for all groups."""
n = len(x)
ds = np.zeros(n)
dx = np.zeros(n)
dy = np.zeros(n)
for group_id, boundary in self.boundaries.items():
group_turbines = [
i for i in range(n) if self.turbine_groups.get(i) == group_id
]
if not group_turbines:
continue
x_group = x[group_turbines]
y_group = y[group_turbines]
boundary_properties = self.get_boundary_properties(boundary)[1:]
distances, dx_group, dy_group = self.__dist_grad_wrapper(
x_group, y_group, boundary_properties
)
ds[group_turbines] = distances
dx[group_turbines] = dx_group
dy[group_turbines] = dy_group
return ds, dx, dy
def distances(self, x, y):
ds, _, _ = self.__calculate_group_distances_and_gradients(x, y)
return ds
def gradients(self, x, y):
_, dx, dy = self.__calculate_group_distances_and_gradients(x, y)
return np.diagflat(dx), np.diagflat(dy)
def plot(self, ax=None):
colors = plt.cm.viridis(np.linspace(0, .8, len(self.boundaries)))
for ii, (group_id, boundary) in enumerate(self.boundaries.items()):
ax.plot(*boundary.T, c=colors[ii], label=f"Group {group_id}", linewidth=1)
ax.legend()
class BoundaryType(Enum):
CIRCLE = "circle"
CONVEX_HULL = "convex_hull"
POLYGON = "polygon"
class MultiWFBoundaryConstraint(XYBoundaryConstraint):
def __init__(self, geometry, wt_groups, boundtype, units=None):
"""Entry point for creating boundary constraints for joint multi-wind-farm optimization.
Parameters
----------
geometry : Iterable
Geometry input depends on a specific boundary type:
CIRCLE - it is a list of tuples [(center1, radius1), (center2, radius2), ...],
where center itself is a tuple (x, y) and radius is a scalar.
CONVEX_HULL - it is a list of numpy arrays, where each array represents a convex hull boundary.
Each array has shape (n, 2) where n is the number of vertices.
POLYGON - it is a list of numpy arrays, where each array represents a polygon boundary.
Each array has shape (n, 2) where n is the number of vertices.
!!! Input length must match the number of groups in wt_groups. !!!
wt_groups : Iterable
List of lists where each list contains indices of turbines assigned to a specific boundary.
For instance, [[0, 1], [2, 3, 4], ...] assigns turbines 0 and 1 to boundary 0, and turbines 2, 3, and 4 to boundary 1.
!!! Input length must match the number of geometries in geometry parameter. !!!
boundtype : BoundaryType
One of BoundaryType.CIRCLE, BoundaryType.CONVEX_HULL, BoundaryType.POLYGON.
The argument specifies the type of boundary constraint to be enforced.
units : str, optional
Units for the boundary, by default None
"""
self.geometry = geometry
self.boundtype = boundtype
self.const_id = f"wf_boundary_comp_{id(self)}"
self.units = units
self.wt_groups = wt_groups
assert len(geometry) == len(
wt_groups
), "Number of geometries and groups must match"
BD2COMP = {
BoundaryType.CIRCLE: MultiCircleBoundaryComp,
BoundaryType.CONVEX_HULL: MultiConvexBoundaryComp,
BoundaryType.POLYGON: MultiWFPolygonBoundaryComp,
}
def get_comp(self, n_wt):
assert n_wt > 0, "Number of turbines must be greater than 0"
assert n_wt >= len(
self.wt_groups
), "Number of turbines must be greater than or equal to the number of groups"
if hasattr(self, "boundary_comp"):
return self.boundary_comp
if (comp := self.BD2COMP.get(self.boundtype, None)) is None:
raise NotImplementedError(f"Invalid boundary type {self.boundtype}")
return comp(
n_wt,
self.geometry,
self.wt_groups,
const_id=self.const_id,
units=self.units,
)
def set_design_var_limits(self, design_vars):
_ = design_vars
def main():
if __name__ == "__main__":
from py_wake.wind_turbines import WindTurbines
from py_wake.wind_turbines.power_ct_functions import CubePowerSimpleCt
plt.close('all')
i1 = np.array([[2, 17], [6, 23], [16, 23], [26, 15], [19, 0], [14, 4], [4, 4]])
e1 = np.array([[0, 10], [20, 21], [22, 12], [10, 12], [9, 6], [2, 7]])
i2 = np.array([[12, 13], [14, 17], [18, 15], [17, 10], [15, 11]])
e2 = np.array([[5, 17], [5, 18], [8, 19], [8, 18]])
i3 = np.array([[5, 0], [5, 1], [10, 3], [10, 0]])
e3 = np.array([[6, -1], [6, 18], [7, 18], [7, -1]])
e4 = np.array([[15, 9], [15, 11], [20, 11], [20, 9]])
e5 = np.array([[10, 25], [20, 0]])
zones = [
InclusionZone(i1, name='i1'),
InclusionZone(i2, name='i2'),
InclusionZone(i3, name='i3'),
ExclusionZone(e1, name='e1'),
ExclusionZone(e2, name='e2'),
ExclusionZone(e3, name='e3'),
ExclusionZone(e4, name='e4'),
ExclusionZone(e5, name='e5', dist2wt=lambda: 1, geometry_type='line'),
]
N_points = 50
xs = np.linspace(-1, 30, N_points)
ys = np.linspace(-1, 30, N_points)
y_grid, x_grid = np.meshgrid(xs, ys)
x = x_grid.ravel()
y = y_grid.ravel()
n_wt = len(x)
MPBC = MultiPolygonBoundaryComp(n_wt, zones, method='nearest')
distances = MPBC.distances(x, y)
delta = 1e-9
distances2 = MPBC.distances(x + delta, y)
dx_fd = (distances2 - distances) / delta
dx = np.diag(MPBC.gradients(x + delta / 2, y)[0])
plt.figure()
plt.plot(dx_fd, dx, '.')
plt.figure()
for n, bound in enumerate(MPBC.boundaries):
x_bound, y_bound = bound[0].T
x_bound = np.append(x_bound, x_bound[0])
y_bound = np.append(y_bound, y_bound[0])
line, = plt.plot(x_bound, y_bound, label=f'{n}')
plt.plot(x_bound[0], y_bound[0], color=line.get_color(), marker='o')
plt.legend()
plt.grid()
plt.axis('square')
plt.contourf(x_grid, y_grid, distances.reshape(N_points, N_points), np.linspace(-10, 10, 100), cmap='seismic')
plt.colorbar()
plt.figure()
ax = plt.axes(projection='3d')
ax.contour3D(
x.reshape(
N_points, N_points), y.reshape(
N_points, N_points), distances.reshape(
N_points, N_points), np.linspace(-10, 10, 100), cmap='seismic')
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
if 0:
for smpl in [0, 1, 2, 3, 4, 5, 6, 7, 8]:
MPBC = MultiPolygonBoundaryComp(n_wt, zones, simplify_geometry=smpl)
plt.figure()
ax = plt.gca()
MPBC.plot(ax)
wind_turbines = WindTurbines(names=['tb1', 'tb2'],
diameters=[80, 120],
hub_heights=[70, 110],
powerCtFunctions=[
CubePowerSimpleCt(ws_cutin=3, ws_cutout=25, ws_rated=12,
power_rated=2000, power_unit='kW',
ct=8 / 9, additional_models=[]),
CubePowerSimpleCt(ws_cutin=3, ws_cutout=25, ws_rated=12,
power_rated=3000, power_unit='kW',
ct=8 / 9, additional_models=[])])
x1 = [0, 3000, 3000, 0]
y1 = [0, 0, 3000, 3000]
b1 = np.transpose((x1, y1))
# Buildings
x2 = [600, 1400, 1400, 600]
y2 = [1700, 1700, 2500, 2500]
b2 = np.transpose((x2, y2))
# River
x3 = np.linspace(520, 2420, 16)
y3 = [0, 133, 266, 400, 500, 600, 700, 733, 866, 1300, 1633,
2100, 2400, 2533, 2700, 3000]
b3 = np.transpose((x3, y3))
# Roads
x4 = np.linspace(0, 3000, 16)
y4 = [1095, 1038, 1110, 1006, 1028, 992, 977, 1052, 1076, 1064, 1073,
1027, 964, 981, 1015, 1058]
b4 = np.transpose((x4, y4))
zones = [
InclusionZone(b1, name='i1'),
ExclusionZone(b2, dist2wt=lambda H: 4 * H - 360, name='building'),
ExclusionZone(b3, geometry_type='line', dist2wt=lambda D: 3 * D, name='river'),
ExclusionZone(b4, geometry_type='line', dist2wt=lambda D, H: max(D * 2, H * 3), name='road'),
]
N_points = 50
xs = np.linspace(0, 3000, N_points)
ys = np.linspace(0, 3000, N_points)
y_grid, x_grid = np.meshgrid(xs, ys)
x = x_grid.ravel()
y = y_grid.ravel()
n_wt = len(x)
types = np.zeros(n_wt)
TSBC = TurbineSpecificBoundaryComp(n_wt, wind_turbines, zones)
distances = TSBC.distances(x, y, type=types)
delta = 1e-9
distances2 = TSBC.distances(x + delta, y, type=types)
dx_fd = (distances2 - distances) / delta
dx = np.diag(TSBC.gradients(x + delta / 2, y, type=types)[0])
plt.figure()
plt.plot(dx_fd, dx, '.')
plt.figure()
for ll, t in enumerate(TSBC.types):
line, = plt.plot(*TSBC.ts_merged_xy_boundaries[ll][0][0][0, :], label=f'type {ll}')
for n, bound in enumerate(TSBC.ts_merged_xy_boundaries[ll]):
x_bound, y_bound = bound[0].T
x_bound = np.append(x_bound, x_bound[0])
y_bound = np.append(y_bound, y_bound[0])
plt.plot(x_bound, y_bound, color=line.get_color())
plt.legend()
plt.grid()
plt.axis('square')
for ll, t in enumerate(TSBC.types):
plt.figure()
for n, bound in enumerate(TSBC.ts_merged_xy_boundaries[ll]):
x_bound, y_bound = bound[0].T
x_bound = np.append(x_bound, x_bound[0])
y_bound = np.append(y_bound, y_bound[0])
plt.plot(x_bound, y_bound, 'b')
plt.grid()
plt.title(f'type {ll}')
plt.axis('square')
plt.contourf(x_grid, y_grid, TSBC.distances(x, y, type=t * np.ones(n_wt)).reshape(N_points, N_points), 50)
plt.colorbar()
main()