import numpy as np
from numpy import newaxis as na
from scipy.spatial import ConvexHull
from topfarm.constraint_components import Constraint, ConstraintComponent
from topfarm.utils import smooth_max, smooth_max_gradient
import topfarm
from shapely.geometry import Polygon, MultiPolygon, LineString
from shapely.ops import unary_union
import warnings
from tqdm import tqdm
[docs]class XYBoundaryConstraint(Constraint):
[docs] def __init__(self, boundary, boundary_type='convex_hull', units=None, relaxation=False, **kwargs):
"""Initialize XYBoundaryConstraint
Parameters
----------
boundary : array_like (n,2) or list of tuples (array_like (n,2), boolean)
boundary coordinates. If boundary is array_like (n,2) it indicates a single boundary and can be used with
boundary types: 'convex_hull', 'polygon', 'rectangle','square'. If boundary is list of tuples (array_like (n,2), boolean),
it is multiple boundaries where the boolean is 1 for inclusion zones and 0 for exclusion zones and can be used with the
boundary type: 'multi_polygon'.
boundary_type : 'convex_hull', 'polygon', 'rectangle','square'
- 'convex_hull' (default): Convex hul around boundary points\n
- 'polygon': Polygon boundary (may be non convex). Less suitable for gradient-based optimization\n
- 'rectangle': Smallest axis-aligned rectangle covering the boundary points\n
- 'square': Smallest axis-aligned square covering the boundary points
- 'multi_polygon': Mulitple polygon boundaries incl. exclusion zones (may be non convex).\n
- 'turbine_specific': Set of multiple polygon boundaries that depend on the wind turbine type. \n
"""
if boundary_type == 'multi_polygon':
self.zones = boundary
self.boundary = np.asarray(self.zones[0].boundary)
elif boundary_type == 'turbine_specific':
self.zones = boundary
assert 'turbines' in list(kwargs)
self.turbines = kwargs['turbines']
self.boundary = np.asarray(self.zones[0].boundary)
else:
self.boundary = np.asarray(boundary)
self.boundary_type = boundary_type
self.const_id = 'xyboundary_comp_{}'.format(boundary_type)
self.units = units
self.relaxation = relaxation
def get_comp(self, n_wt):
if not hasattr(self, 'boundary_comp'):
if self.boundary_type == 'polygon':
self.boundary_comp = PolygonBoundaryComp(
n_wt, self.boundary, self.const_id, self.units, self.relaxation)
elif self.boundary_type == 'multi_polygon':
self.boundary_comp = MultiPolygonBoundaryComp(n_wt, self.zones, const_id=self.const_id, units=self.units, relaxation=self.relaxation)
elif self.boundary_type == 'turbine_specific':
self.boundary_comp = TurbineSpecificBoundaryComp(n_wt, self.turbines, self.zones, const_id=self.const_id, units=self.units, relaxation=self.relaxation)
else:
self.boundary_comp = ConvexBoundaryComp(n_wt, self.boundary, self.boundary_type, self.const_id, self.units)
return self.boundary_comp
@property
def constraintComponent(self):
return self.boundary_comp
def set_design_var_limits(self, design_vars):
if self.boundary_type in ['multi_polygon', 'turbine_specific']:
bound_min = np.vstack([(bound).min(0) for bound, _ in self.boundary_comp.boundaries]).min(0)
bound_max = np.vstack([(bound).max(0) for bound, _ in self.boundary_comp.boundaries]).max(0)
else:
bound_min = self.boundary_comp.xy_boundary.min(0)
bound_max = self.boundary_comp.xy_boundary.max(0)
for k, l, u in zip([topfarm.x_key, topfarm.y_key], bound_min, bound_max):
if k in design_vars:
if len(design_vars[k]) == 4:
design_vars[k] = (design_vars[k][0], np.maximum(design_vars[k][1], l),
np.minimum(design_vars[k][2], u), design_vars[k][-1])
else:
design_vars[k] = (design_vars[k][0], l, u, design_vars[k][-1])
def _setup(self, problem, group='constraint_group'):
n_wt = problem.n_wt
self.boundary_comp = self.get_comp(n_wt)
self.boundary_comp.problem = problem
self.set_design_var_limits(problem.design_vars)
# problem.xy_boundary = np.r_[self.boundary_comp.xy_boundary, self.boundary_comp.xy_boundary[:1]]
problem.indeps.add_output('xy_boundary', self.boundary_comp.xy_boundary)
getattr(problem.model, group).add_subsystem('xy_bound_comp', self.boundary_comp, promotes=['*'])
def setup_as_constraint(self, problem, group='constraint_group'):
self._setup(problem, group=group)
if problem.n_wt == 1:
lower = 0
else:
lower = self.boundary_comp.zeros
problem.model.add_constraint('boundaryDistances', lower=lower)
def setup_as_penalty(self, problem, group='constraint_group'):
self._setup(problem, group=group)
[docs]class CircleBoundaryConstraint(XYBoundaryConstraint):
[docs] def __init__(self, center, radius):
"""Initialize CircleBoundaryConstraint
Parameters
----------
center : (float, float)
center position (x,y)
radius : int or float
circle radius
"""
self.center = np.array(center)
self.radius = radius
self.const_id = 'circle_boundary_comp_{}_{}'.format(
'_'.join([str(int(c)) for c in center]), int(radius)).replace('.', '_')
def get_comp(self, n_wt):
if not hasattr(self, 'boundary_comp'):
self.boundary_comp = CircleBoundaryComp(n_wt, self.center, self.radius, self.const_id)
return self.boundary_comp
def set_design_var_limits(self, design_vars):
for k, l, u in zip([topfarm.x_key, topfarm.y_key],
self.center - self.radius,
self.center + self.radius):
if len(design_vars[k]) == 4:
design_vars[k] = (design_vars[k][0], np.maximum(design_vars[k][1], l),
np.minimum(design_vars[k][2], u), design_vars[k][-1])
else:
design_vars[k] = (design_vars[k][0], l, u, design_vars[k][-1])
class BoundaryBaseComp(ConstraintComponent):
def __init__(self, n_wt, xy_boundary=None, const_id=None, units=None, relaxation=False, **kwargs):
super().__init__(**kwargs)
self.n_wt = n_wt
self.xy_boundary = np.array(xy_boundary)
self.const_id = const_id
self.units = units
self.relaxation = relaxation
if np.any(self.xy_boundary[0] != self.xy_boundary[-1]):
self.xy_boundary = np.r_[self.xy_boundary, self.xy_boundary[:1]]
def setup(self):
# Explicitly size input arrays
self.add_input(topfarm.x_key, np.zeros(self.n_wt),
desc='x coordinates of turbines in global ref. frame', units=self.units)
self.add_input(topfarm.y_key, np.zeros(self.n_wt),
desc='y coordinates of turbines in global ref. frame', units=self.units)
if self.relaxation:
self.add_input('time', 0)
if hasattr(self, 'types'):
self.add_input('type', np.zeros(self.n_wt))
# self.add_output('constraint_violation_' + self.const_id, val=0.0)
# Explicitly size output array
# (vector with positive elements if turbines outside of hull)
self.add_output('boundaryDistances', self.zeros,
desc="signed perpendicular distances from each turbine to each face CCW; + is inside")
self.declare_partials('boundaryDistances', [topfarm.x_key, topfarm.y_key])
if self.relaxation:
self.declare_partials('boundaryDistances', 'time')
# self.declare_partials('boundaryDistances', ['boundaryVertices', 'boundaryNormals'], method='fd')
def compute(self, inputs, outputs):
# calculate distances from each point to each face
args = {x: inputs[x] for x in [topfarm.x_key, topfarm.y_key, topfarm.type_key] if x in inputs}
boundaryDistances = self.distances(**args)
outputs['boundaryDistances'] = boundaryDistances
# outputs['constraint_violation_' + self.const_id] = np.sum(np.minimum(boundaryDistances, 0) ** 2)
def compute_partials(self, inputs, partials):
# return Jacobian dict
if not self.relaxation:
dx, dy = self.gradients(**{xy: inputs[k] for xy, k in zip('xy', [topfarm.x_key, topfarm.y_key])})
else:
dx, dy, dt = self.gradients(**{xy: inputs[k] for xy, k in zip('xy', [topfarm.x_key, topfarm.y_key])})
partials['boundaryDistances', topfarm.x_key] = dx
partials['boundaryDistances', topfarm.y_key] = dy
if self.relaxation:
partials['boundaryDistances', 'time'] = dt
def plot(self, ax):
"""Plot boundary"""
if isinstance(self, TurbineSpecificBoundaryComp):
linestyles = ['--', '-']
colors = np.array(['b', 'r', 'm', 'c', 'g', 'y', 'orange', 'indigo', 'grey'] * 10)
for n, t in enumerate(self.types):
line, = ax.plot(*self.ts_merged_xy_boundaries[n][0][0][0, :], color=colors[t], linewidth=1, label=f'{self.wind_turbines._names[n]} boundary')
for bound, io in self.ts_merged_xy_boundaries[n]:
ax.plot(np.asarray(bound)[:, 0].tolist() + [np.asarray(bound)[0, 0]],
np.asarray(bound)[:, 1].tolist() + [np.asarray(bound)[0, 1]], color=colors[t], linewidth=1, linestyle=linestyles[io])
elif isinstance(self, MultiPolygonBoundaryComp):
colors = ['--k', 'k']
if self.relaxation != 0:
for bound, io in self.relaxed_polygons():
ax.plot(np.asarray(bound)[:, 0].tolist() + [np.asarray(bound)[0, 0]],
np.asarray(bound)[:, 1].tolist() + [np.asarray(bound)[0, 1]], c='r', linewidth=1, linestyle='--')
ax.plot([], c='r', linewidth=1, linestyle='--', label='Relaxed boundaries')
for bound, io in self.boundaries:
ax.plot(np.asarray(bound)[:, 0].tolist() + [np.asarray(bound)[0, 0]],
np.asarray(bound)[:, 1].tolist() + [np.asarray(bound)[0, 1]], colors[io], linewidth=1)
else:
ax.plot(self.xy_boundary[:, 0].tolist() + [self.xy_boundary[0, 0]],
self.xy_boundary[:, 1].tolist() + [self.xy_boundary[0, 1]], 'k', linewidth=1)
class ConvexBoundaryComp(BoundaryBaseComp):
def __init__(self, n_wt, xy_boundary=None, boundary_type='convex_hull', const_id=None, units=None):
self.boundary_type = boundary_type
# self.const_id = const_id
self.calculate_boundary_and_normals(xy_boundary)
super().__init__(n_wt, self.xy_boundary, const_id, units)
self.calculate_gradients()
self.zeros = np.zeros([self.n_wt, self.nVertices])
# self.units = units
def calculate_boundary_and_normals(self, xy_boundary):
xy_boundary = np.asarray(xy_boundary)
if self.boundary_type == 'convex_hull':
# find the points that actually comprise a convex hull
hull = ConvexHull(list(xy_boundary))
# keep only xy_vertices that actually comprise a convex hull and arrange in CCW order
self.xy_boundary = xy_boundary[hull.vertices]
elif self.boundary_type == 'square':
min_ = xy_boundary.min(0)
max_ = xy_boundary.max(0)
range_ = (max_ - min_)
x_c, y_c = min_ + range_ / 2
r = range_.max() / 2
self.xy_boundary = np.array([(x_c - r, y_c - r), (x_c + r, y_c - r),
(x_c + r, y_c + r), (x_c - r, y_c + r)])
elif self.boundary_type == 'rectangle':
min_ = xy_boundary.min(0)
max_ = xy_boundary.max(0)
range_ = (max_ - min_)
x_c, y_c = min_ + range_ / 2
r = range_ / 2
self.xy_boundary = np.array([(x_c - r[0], y_c - r[1]), (x_c + r[0], y_c - r[1]),
(x_c + r[0], y_c + r[1]), (x_c - r[0], y_c + r[1])])
else:
raise NotImplementedError("Boundary type '%s' is not implemented" % self.boundary_type)
# get the real number of xy_vertices
self.nVertices = self.xy_boundary.shape[0]
# initialize normals array
unit_normals = np.zeros([self.nVertices, 2])
# determine if point is inside or outside of each face, and distances from each face
for j in range(0, self.nVertices):
# calculate the unit normal vector of the current face (taking points CCW)
if j < self.nVertices - 1: # all but the set of point that close the shape
normal = np.array([self.xy_boundary[j + 1, 1] - self.xy_boundary[j, 1],
-(self.xy_boundary[j + 1, 0] - self.xy_boundary[j, 0])])
unit_normals[j] = normal / np.linalg.norm(normal)
else: # the set of points that close the shape
normal = np.array([self.xy_boundary[0, 1] - self.xy_boundary[j, 1],
-(self.xy_boundary[0, 0] - self.xy_boundary[j, 0])])
unit_normals[j] = normal / np.linalg.norm(normal)
self.unit_normals = unit_normals
def calculate_gradients(self):
unit_normals = self.unit_normals
# initialize array to hold distances from each point to each face
dfaceDistance_dx = np.zeros([self.n_wt * self.nVertices, self.n_wt])
dfaceDistance_dy = np.zeros([self.n_wt * self.nVertices, self.n_wt])
for i in range(0, self.n_wt):
# determine if point is inside or outside of each face, and distances from each face
for j in range(0, self.nVertices):
# define the derivative vectors from the point of interest to the first point of the face
dpa_dx = np.array([-1.0, 0.0])
dpa_dy = np.array([0.0, -1.0])
# find perpendicular distances derivatives from point to current surface (vector projection)
ddistanceVec_dx = np.vdot(dpa_dx, unit_normals[j]) * unit_normals[j]
ddistanceVec_dy = np.vdot(dpa_dy, unit_normals[j]) * unit_normals[j]
# calculate derivatives for the sign of perpendicular distances from point to current face
dfaceDistance_dx[i * self.nVertices + j, i] = np.vdot(ddistanceVec_dx, unit_normals[j])
dfaceDistance_dy[i * self.nVertices + j, i] = np.vdot(ddistanceVec_dy, unit_normals[j])
# return Jacobian dict
self.dfaceDistance_dx = dfaceDistance_dx
self.dfaceDistance_dy = dfaceDistance_dy
def calculate_distance_to_boundary(self, points):
"""
:param points: points that you want to calculate the distances from to the faces of the convex hull
:return face_distace: signed perpendicular distances from each point to each face; + is inside
"""
nPoints = np.array(points).shape[0]
xy_boundary = self.xy_boundary[:-1]
nVertices = xy_boundary.shape[0]
vertices = xy_boundary
unit_normals = self.unit_normals
# initialize array to hold distances from each point to each face
face_distance = np.zeros([nPoints, nVertices])
from numpy import newaxis as na
# define the vector from the point of interest to the first point of the face
PA = (vertices[:, na] - points[na])
# find perpendicular distances from point to current surface (vector projection)
dist = np.sum(PA * unit_normals[:, na], 2)
# calculate the sign of perpendicular distances from point to current face (+ is inside, - is outside)
d_vec = dist[:, :, na] * unit_normals[:, na]
face_distance = np.sum(d_vec * unit_normals[:, na], 2)
return face_distance.T
def distances(self, x, y):
return self.calculate_distance_to_boundary(np.array([x, y]).T)
def gradients(self, x, y):
return self.dfaceDistance_dx, self.dfaceDistance_dy
def satisfy(self, state, pad=1.1):
x, y = [np.asarray(state[xyz], dtype=float) for xyz in [topfarm.x_key, topfarm.y_key]]
dist = self.distances(x, y)
dist = np.where(dist < 0, np.minimum(dist, -.01), dist)
dx, dy = self.gradients(x, y) # independent of position
dx = dx[:self.nVertices, 0]
dy = dy[:self.nVertices, 0]
for i in np.where(dist.min(1) < 0)[0]: # loop over turbines that violate edges
# find smallest movement that where the constraints are satisfied
d = dist[i]
v = np.linspace(-np.abs(d.min()), np.abs(d.min()), 100)
X, Y = np.meshgrid(v, v)
m = np.ones_like(X)
for dx_, dy_, d in zip(dx, dy, dist.T):
m = np.logical_and(m, X * dx_ + Y * dy_ >= -d[i])
index = np.argmin(X[m]**2 + Y[m]**2)
x[i] += X[m][index]
y[i] += Y[m][index]
state[topfarm.x_key] = x
state[topfarm.y_key] = y
return state
class PolygonBoundaryComp(BoundaryBaseComp):
def __init__(self, n_wt, xy_boundary, const_id=None, units=None, relaxation=False):
self.nTurbines = n_wt
self.const_id = const_id
self.zeros = np.zeros(self.nTurbines)
self.units = units
self.boundary_properties = self.get_boundary_properties(xy_boundary)
BoundaryBaseComp.__init__(self, n_wt, xy_boundary=self.boundary_properties[0], const_id=const_id,
units=units, relaxation=relaxation)
self._cache_input = None
self._cache_output = None
self.relaxation = relaxation
def get_boundary_properties(self, xy_boundary, inclusion_zone=True):
vertices = np.array(xy_boundary)
def get_edges(vertices, counter_clockwise):
if np.any(vertices[0] != vertices[-1]):
vertices = np.r_[vertices, vertices[:1]]
x1, y1 = A = vertices[:-1].T
x2, y2 = B = vertices[1:].T
double_area = np.sum((x1 - x2) * (y1 + y2)) # 2 x Area (+: counterclockwise
assert double_area != 0, "Area must be non-zero"
if (counter_clockwise and double_area < 0) or (not counter_clockwise and double_area > 0): #
return get_edges(vertices[::-1], counter_clockwise)
else:
return vertices[:-1], A, B
# inclusion zones are defined counter clockwise (unit-normal vector pointing in) while
# exclusion zones are defined clockwise (unit-normal vector pointing out)
xy_boundary, A, B = get_edges(vertices, inclusion_zone)
dx, dy = AB = B - A
AB_len = np.linalg.norm(AB, axis=0)
edge_unit_normal = (np.array([-dy, dx]) / AB_len)
# A_normal and B_normal are the normal vectors at the nodes A,B (the mean of the adjacent edge normal vectors
A_normal = (edge_unit_normal + np.roll(edge_unit_normal, 1, 1)) / 2
B_normal = np.roll(A_normal, -1, 1)
# import matplotlib.pyplot as plt
# for (x, y), (dx, dy), (unx, uny) in zip(A.T, AB.T, edge_unit_normal.T):
# plt.arrow(x, y, dx, dy, color='k', head_width=.2)
# plt.arrow(x, y, unx, uny, color='r', head_width=.2)
# for (x, y), (nx, ny) in zip(A.T, A_normal.T):
# plt.arrow(x, y, nx, ny, color='b', head_width=.2)
# for (x, y), (nx, ny) in zip(B.T, B_normal.T):
# plt.arrow(x, y, nx / 2, ny / 2, color='g', head_width=.2)
return (xy_boundary, A, B, AB, AB_len, edge_unit_normal, A_normal, B_normal)
def _calc_distance_and_gradients(self, x, y, boundary_properties=None):
"""
distances point, P=(x,y) to edge(A->B)
+/-: inside/outside
"""
def vec_len(vec):
return np.linalg.norm(vec, axis=0)
boundary_properties = boundary_properties or self.boundary_properties[1:]
A, B, AB, AB_len, edge_unit_normal, A_normal, B_normal = boundary_properties
"""
A: edge start point
B: edge end point
edge_unit_normal: unit vector perpendicular to edge pointing to the good side
(i.e. inside for inclusion zones and outside for exclusion zones)
AB: Vector from A to B (edge)
AB_len: length of AB (edge)
A_normal: mean of edge unit normal vectors adjacent to A
B_normal: mean of edge unit normal vectors adjacent to B
"""
# Add dim to match (2, #P, #Edges), where the first dimension is (x,y)
P = np.array([x, y])[:, :, na]
A, B, AB = A[:, na], B[:, na], AB[:, na]
edge_unit_normal, A_normal, B_normal = edge_unit_normal[:, na], A_normal[:, na], B_normal[:, na]
AB_len = AB_len[na]
# ===============================================================================================================
# Determine if P is closer to A, B or the edge (between A and B)
# ===============================================================================================================
AP = P - A # vector from edge start to point
BP = P - B # vector from edge end to point
# signed component of AP on the edge vector
a_tilde = np.sum(AP * AB, axis=0) / AB_len
# a_tilde < 0: closer to A
# a_tilde > |AB|: closer to B
# else: closer to edge (between A and B)
use_A = 0 > a_tilde
use_B = a_tilde > AB_len
# ===============================================================================================================
# Calculate distance from P to closer point on edge
# ===============================================================================================================
# Perpendicular distances to edge (AP dot edge_unit_normal product).
# This is the distance to the edge if not use_A or use_B
distance = np.sum((AP) * edge_unit_normal, 0)
# Update distance for points closer to A
good_side_of_A = (np.sum((AP * A_normal)[:, use_A], 0) > 0)
sign_use_A = np.where(good_side_of_A, 1, -1)
distance[use_A] = (vec_len(AP[:, use_A]) * sign_use_A)
# Update distance for points closer to B
good_side_of_B = np.sum((BP * B_normal)[:, use_B], 0) > 0
sign_use_B = np.where(good_side_of_B, 1, -1)
distance[use_B] = (vec_len(BP[:, use_B]) * sign_use_B)
# ===============================================================================================================
# Calculate gradient of distance from P to closer point on edge wrt. x and y
# ===============================================================================================================
# Gradient of perpendicular distances to edge.
# This is the gradient if not use_A or use_B
ddist_dxy = np.tile(edge_unit_normal, (1, len(x), 1))
# Update gradient for points closer to A or B
ddist_dxy[:, use_A] = sign_use_A * (AP[:, use_A] / vec_len(AP[:, use_A]))
ddist_dxy[:, use_B] = sign_use_B * (BP[:, use_B] / vec_len(BP[:, use_B]))
ddist_dX, ddist_dY = ddist_dxy
return distance, ddist_dX, ddist_dY
def calc_distance_and_gradients(self, x, y):
if not np.shape([x, y]) == np.shape(self._cache_input):
pass
elif np.all(np.array([x, y]) == self._cache_input):
return self._cache_output
distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x, y)
closest_edge_index = np.argmin(np.abs(distance), 1)
self._cache_input = np.array([x, y])
self._cache_output = [np.choose(closest_edge_index, v.T) for v in [distance, ddist_dX, ddist_dY]]
return self._cache_output
def distances(self, x, y):
return self.calc_distance_and_gradients(x, y)[0]
def gradients(self, x, y):
_, dx, dy = self.calc_distance_and_gradients(x, y)
return np.diagflat(dx), np.diagflat(dy)
def satisfy(self, state, pad=1.1):
x, y = [np.asarray(state[xy], dtype=float) for xy in [topfarm.x_key, topfarm.y_key]]
dist = self.distances(x, y)
dx, dy = map(np.diag, self.gradients(x, y))
m = dist < 0
x[m] -= dx[m] * dist[m] * pad
y[m] -= dy[m] * dist[m] * pad
state[topfarm.x_key] = x
state[topfarm.y_key] = y
return state
class CircleBoundaryComp(PolygonBoundaryComp):
def __init__(self, n_wt, center, radius, const_id=None, units=None):
self.center = center
self.radius = radius
t = np.linspace(0, 2 * np.pi, 100)
xy_boundary = self.center + np.array([np.cos(t), np.sin(t)]).T * self.radius
BoundaryBaseComp.__init__(self, n_wt, xy_boundary, const_id, units)
self.zeros = np.zeros(self.n_wt)
def plot(self, ax=None):
from matplotlib.pyplot import Circle
import matplotlib.pyplot as plt
ax = ax or plt.gca()
circle = Circle(self.center, self.radius, color='k', fill=False)
ax.add_artist(circle)
def distances(self, x, y):
return self.radius - np.sqrt((x - self.center[0])**2 + (y - self.center[1])**2)
def gradients(self, x, y):
theta = np.arctan2(y - self.center[1], x - self.center[0])
dx = -1 * np.ones_like(x)
dy = -1 * np.ones_like(x)
dist = self.radius - np.sqrt((x - self.center[0])**2 + (y - self.center[1])**2)
not_center = dist != self.radius
dx[not_center], dy[not_center] = -np.cos(theta[not_center]), -np.sin(theta[not_center])
return np.diagflat(dx), np.diagflat(dy)
class Zone(object):
def __init__(self, boundary, dist2wt, geometry_type, incl, name):
self.name = name
self.boundary = boundary
self.dist2wt = dist2wt
self.geometry_type = geometry_type
self.incl = incl
class InclusionZone(Zone):
def __init__(self, boundary, dist2wt=None, geometry_type='polygon', name=''):
super().__init__(np.asarray(boundary), dist2wt, geometry_type, incl=1, name=name)
class ExclusionZone(Zone):
def __init__(self, boundary, dist2wt=None, geometry_type='polygon', name=''):
super().__init__(np.asarray(boundary), dist2wt, geometry_type, incl=0, name=name)
class MultiPolygonBoundaryComp(PolygonBoundaryComp):
def __init__(self, n_wt, zones, const_id=None, units=None, relaxation=False, method='nearest',
simplify_geometry=False):
'''
Parameters
----------
n_wt : TYPE
DESCRIPTION.
zones : list
list of InclusionZone and ExclusionZone objects
const_id : TYPE, optional
DESCRIPTION. The default is None.
units : TYPE, optional
DESCRIPTION. The default is None.
method : {'nearest' or 'smooth_min'}, optional
'nearest' calculate the distance to the nearest edge or point'smooth_min'
calculates the weighted minimum distance to all edges/points. The default is 'nearest'.
simplify : float or dict
if float, simplification tolerance. if dict, shapely.simplify keyword arguments
Returns
-------
None.
'''
self.zones = zones
self.bounds_poly, xy_boundaries = self.get_xy_boundaries()
PolygonBoundaryComp.__init__(self, n_wt, xy_boundary=xy_boundaries[0], const_id=const_id, units=units, relaxation=relaxation)
# self.bounds_poly = [Polygon(x) for x in xy_boundaries]
self.incl_excls = [x.incl for x in zones]
self._setup_boundaries(self.bounds_poly, self.incl_excls)
self.relaxation = relaxation
self.method = method
if simplify_geometry:
self.simplify(simplify_geometry)
def simplify(self, simplify_geometry):
bounds = [bi[0] for bi in self.boundaries]
self.incl_excls = [bi[1] for bi in self.boundaries]
polygons = [Polygon(b) for b in bounds]
if isinstance(simplify_geometry, dict):
self.bounds_poly = [rp.simplify(**simplify_geometry) for rp in polygons]
else:
self.bounds_poly = [rp.simplify(simplify_geometry) for rp in polygons]
self._setup_boundaries(self.bounds_poly, self.incl_excls)
# def line_to_xy_boundary(self, line, buffer):
# return np.asarray(Polygon(LineString(line).buffer(buffer, join_style=2).exterior).exterior.coords)
def get_xy_boundaries(self):
polygons = []
bounds = []
for z in self.zones:
if hasattr(z.dist2wt, '__code__'):
buffer = z.dist2wt(**{k: 100 for k in z.dist2wt.__code__.co_varnames})
else:
buffer = 0
if z.geometry_type == 'line':
poly = Polygon(LineString(z.boundary).buffer(buffer, join_style=2).exterior)
elif z.geometry_type == 'polygon':
poly = Polygon(z.boundary).buffer(buffer, join_style=2)
polygons.append(poly)
bounds.append(np.asarray(poly.exterior.coords))
return polygons, bounds
def _setup_boundaries(self, bounds_poly, incl_excl):
self.res_poly = self._calc_resulting_polygons(bounds_poly, incl_excl)
self.boundaries = self._poly_to_bound(self.res_poly)
boundary_properties_list_all = list(zip(*[self.get_boundary_properties(bound, incl_excl)[1:]
for bound, incl_excl in self.boundaries]))
self.boundary_properties_list_all = [np.concatenate(v, -1)
for v in boundary_properties_list_all]
def _poly_to_bound(self, polygons):
boundaries = []
for bound in polygons:
x, y = bound.exterior.xy
boundaries.append((np.asarray([x, y]).T[:-1, :], 1))
for interior in bound.interiors:
x, y = interior.xy
boundaries.append((np.asarray([x, y]).T[:-1, :], 0))
return boundaries
def _calc_resulting_polygons(self, boundary_polygons, incl_excls):
'''
Parameters
----------
boundary_polygons : list
list of shapely polygons as specifed or inferred from user input
Returns
-------
list of merged shapely polygons. Resolves issues arrising if any are overlapping, touching or contained in each other
'''
domain = []
for i in tqdm(range(len(boundary_polygons))):
b = boundary_polygons[i]
if len(domain) == 0:
if incl_excls[i]:
domain.append(b)
else:
warnings.warn("First boundary should be an inclusion zone or it will be ignored")
pass
else:
if incl_excls[i]:
temp = []
for j, d in enumerate(domain):
if d.intersects(b):
b = unary_union([d, b])
else:
if d.contains(b):
warnings.warn("Boundary is fully contained preceding polygon and will be ignored")
pass
elif b.contains(d):
b = d
warnings.warn("Boundary is fully containing preceding polygon and will override it")
pass
else:
if b.area > 1e-3:
temp.append(d)
if j == len(domain) - 1:
if b.area > 1e-3:
temp.append(b)
domain = temp
else:
temp = []
for j, d in enumerate(domain):
if d.intersects(b):
nonoverlap = (d.symmetric_difference(b)).difference(b)
if isinstance(nonoverlap, type(Polygon())):
temp.append(nonoverlap)
elif isinstance(nonoverlap, type(MultiPolygon())):
for x in nonoverlap.geoms:
if x.area > 1e-3:
temp.append(x)
else:
if b.contains(d):
warnings.warn("Exclusion boundary fully consumes preceding polygon")
pass
else:
if d.contains(b):
d = Polygon(d.exterior.coords, [b.exterior.coords])
if d.area > 1e-3:
temp.append(d)
domain = temp
return domain
def sign(self, Dist_ij):
return np.sign(Dist_ij[np.arange(Dist_ij.shape[0]), np.argmin(abs(Dist_ij), axis=1)])
def calc_distance_and_gradients(self, x, y):
'''
Parameters
----------
x : 1d array
Array of x-positions.
y : 1d array
Array of y-positions.
Returns
-------
D_ij : 2d array
Array of point-edge distances. index 'i' is points and index 'j' is total number of edges.
sign_i : 1d array
Array of signs of the governing distance.
dDdk_jk : 2d array
Jacobian of the distance matrix D_ij with respect to x and y.
'''
if not np.shape([x, y]) == np.shape(self._cache_input):
pass
elif np.all(np.array([x, y]) == self._cache_input) & (not self.relaxation):
return self._cache_output
Dist_ij, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x, y, self.boundary_properties_list_all)
dDdk_ijk = np.moveaxis([ddist_dX, ddist_dY], 0, -1)
sign_i = self.sign(Dist_ij)
self._cache_input = np.array([x, y])
self._cache_output = [Dist_ij, dDdk_ijk, sign_i]
return self._cache_output
def calc_relaxation(self, iteration_no=None):
'''
The tupple relaxation contains a first term for the penalty constant
and a second term for the n first iterations to apply relaxation.
'''
if iteration_no is None:
iteration_no = self.problem.cost_comp.n_grad_eval + 1
return max(0, self.relaxation[0] * (self.relaxation[1] - iteration_no))
def distances(self, x, y):
Dist_ij, _, sign_i = self.calc_distance_and_gradients(x, y)
if self.method == 'smooth_min':
Dist_i = smooth_max(np.abs(Dist_ij), -np.abs(Dist_ij).max(), axis=1) * sign_i
elif self.method == 'nearest':
Dist_i = Dist_ij[np.arange(x.size), np.argmin(np.abs(Dist_ij), axis=1)]
else:
warning = f'method: {self.method} is not implemented. Available options are smooth_min and nearest.'
warnings.warn(warning)
if self.relaxation:
Dist_i += self.calc_relaxation()
return Dist_i
def gradients(self, x, y):
'''
The derivate of the smooth maximum with respect to x and y is calculated with the chain rule:
dS/dk = dS/dD * dD/dk
where S is smooth maximum, D is distance to edge and k is the spacial dimension
'''
Dist_ij, dDdk_ijk, _ = self.calc_distance_and_gradients(x, y)
if self.relaxation:
Dist_ij += self.calc_relaxation()
# dDdt = -self.relaxation[1]
if self.method == 'smooth_min':
dSdDist_ij = smooth_max_gradient(np.abs(Dist_ij), -np.abs(Dist_ij).max(), axis=1)
dSdkx_i, dSdky_i = (dSdDist_ij[:, :, na] * dDdk_ijk).sum(axis=1).T
elif self.method == 'nearest':
dSdkx_i, dSdky_i = dDdk_ijk[np.arange(x.size), np.argmin(np.abs(Dist_ij), axis=1), :].T
if self.relaxation:
# as relaxed distance is relaxation + distance, the gradient with respect to x and y is unchanged
gradients = np.diagflat(dSdkx_i), np.diagflat(dSdky_i), np.ones(self.n_wt) * self.relaxation[1]
else:
gradients = np.diagflat(dSdkx_i), np.diagflat(dSdky_i)
return gradients
def relaxed_polygons(self, iteration_no=None):
poly = [Polygon(x.boundary) for x in self.zones]
booleans = [x.incl for x in self.zones]
relaxed_poly = []
for i, p in enumerate(poly):
if booleans[i] == 0:
pb = p.buffer(-self.calc_relaxation(iteration_no), join_style=2)
relaxed_poly.append(pb)
else:
pb = p.buffer(self.calc_relaxation(iteration_no), join_style=2)
relaxed_poly.append(pb)
merged_poly = self._calc_resulting_polygons(relaxed_poly, booleans)
return self._poly_to_bound(merged_poly)
class TurbineSpecificBoundaryComp(MultiPolygonBoundaryComp):
def __init__(self, n_wt, wind_turbines, zones, const_id=None,
units=None, relaxation=False, method='nearest', simplify_geometry=False):
# self.dependencies = [d or {'type': None, 'multiplier': None, 'ref': None} for d in dependencies]
# self.multi_boundary = xy_boundaries = self.get_xy_boundaries(boundaries, geometry_types, incl_excls)
self.wind_turbines = wind_turbines
self.types = wind_turbines.types()
# self.incl_excls = incl_excls
self.n_wt = n_wt
self.zones = zones
self.ts_polygon_boundaries, ts_xy_boundaries = self.get_ts_boundaries()
MultiPolygonBoundaryComp.__init__(self, n_wt=n_wt, zones=zones, const_id=const_id, units=units,
relaxation=relaxation, method=method, simplify_geometry=simplify_geometry)
# self.polygon_boundaries = [Polygon(x) for x, _ in xy_boundaries]
# self.ts_polygon_boundaries = self.get_ts_polygon_boundaries(self.types)
self.ts_merged_polygon_boundaries = self.merge_boundaries()
self.ts_merged_xy_boundaries = self.get_ts_xy_boundaries()
self.ts_boundary_properties = self.get_ts_boundary_properties()
self.ts_item_indices = self.get_ts_item_indices()
def get_ts_boundaries(self):
polygons = []
bounds = []
for t in set(self.types):
temp1 = []
temp2 = []
dist2wt_input = dict(D=self.wind_turbines.diameter(t),
H=self.wind_turbines.hub_height(t))
for z in self.zones:
if hasattr(z.dist2wt, '__code__'):
buffer = z.dist2wt(**{k: dist2wt_input[k] for k in z.dist2wt.__code__.co_varnames})
else:
buffer = 0
if z.geometry_type == 'line':
poly = Polygon(LineString(z.boundary).buffer(buffer, join_style=2).exterior)
elif z.geometry_type == 'polygon':
poly = Polygon(z.boundary).buffer(buffer, join_style=2)
bound = np.asarray(poly)
temp1.append(poly)
temp2.append(bound)
polygons.append(temp1)
bounds.append(temp2)
return polygons, bounds
# for wt
# temp = []
# for n, (b, t, ie) in enumerate(zip(boundaries, geometry_types, incl_excls)):
# if t == 'line':
# bound = np.asarray(Polygon(LineString(b).buffer(default_ref, join_style=2).exterior).exterior.coords)
# self.dependencies[n]['ref'] = default_ref
# elif t == 'polygon':
# bound = b
# else:
# raise NotImplementedError("Geometry type '%s' is not implemented" % b)
# temp.append((bound, ie))
# def get_ts_polygon_boundaries(self, types):
# temp = []
# for t in set(types):
# d = self.wind_turbines.diameter(t)
# h = self.wind_turbines.hub_height(t)
# temp.append(self.get_ts_polygon_boundary(d, h))
# return temp
def get_ts_xy_boundaries(self):
return [self._poly_to_bound(b) for b in self.ts_merged_polygon_boundaries]
# def get_ts_polygon_boundary(self, d=None, h=None):
# temp = []
# for bound, dep in zip(self.polygon_boundaries, self.dependencies):
# ref = dep['ref'] or 0
# if dep['type'] == 'D':
# ts_polygon_boundary = bound.buffer(dep['multiplier']*d-ref, join_style=2)
# elif dep['type'] == 'H':
# ts_polygon_boundary = bound.buffer(dep['multiplier']*h-ref, join_style=2)
# else:
# ts_polygon_boundary = bound
# temp.append(ts_polygon_boundary)
# return temp
def merge_boundaries(self):
return [self._calc_resulting_polygons(bounds, self.incl_excls) for bounds in self.ts_polygon_boundaries]
def get_ts_boundary_properties(self,):
return [[self.get_boundary_properties(bound) for bound, _ in bounds] for bounds in self.ts_merged_xy_boundaries]
def get_ts_item_indices(self):
temp = []
for bounds in self.ts_merged_xy_boundaries:
n_edges = np.asarray([len(bound) for bound, _ in bounds])
n_edges_tot = np.sum(n_edges)
start_at = np.cumsum(n_edges) - n_edges
end_at = start_at + n_edges
item_indices = [n_edges_tot, start_at, end_at]
temp.append(item_indices)
return temp
def calc_distance_and_gradients(self, x, y, types=None):
if self._cache_input is None:
pass
elif not np.shape([x, y]) == np.shape(self._cache_input[0]) or not np.shape(types) == np.shape(self._cache_input[1]):
pass
elif np.all(np.array([x, y]) == self._cache_input[0]) & (not self.relaxation) & np.all(np.asarray([types]) == self._cache_input[1]):
return self._cache_output
if types is None:
types = np.zeros(self.n_wt)
Dist_i = np.zeros(self.n_wt)
sign_i = np.zeros(self.n_wt)
dDdx_i = np.zeros(self.n_wt)
dDdy_i = np.zeros(self.n_wt)
for t in set(types):
t = int(t)
idx = (types == t)
n_edges_tot, start_at, end_at = self.ts_item_indices[t]
Dist_ij = np.zeros((sum(idx), n_edges_tot))
dDdk_ijk = np.zeros((sum(idx), n_edges_tot, 2))
for n, (bound, bound_type) in enumerate(self.ts_merged_xy_boundaries[t]):
sa = start_at[n]
ea = end_at[n]
distance, ddist_dX, ddist_dY = self._calc_distance_and_gradients(x[idx], y[idx], self.ts_boundary_properties[t][n][1:])
if bound_type == 0:
distance *= -1
ddist_dX *= -1
ddist_dY *= -1
Dist_ij[:, sa:ea] = distance
dDdk_ijk[:, sa:ea, 0] = ddist_dX
dDdk_ijk[:, sa:ea, 1] = ddist_dY
sign_i[idx] = self.sign(Dist_ij)
Dist_i[idx] = Dist_ij[np.arange(sum(idx)), np.argmin(np.abs(Dist_ij), axis=1)]
dDdx_i[idx], dDdy_i[idx] = dDdk_ijk[np.arange(sum(idx)), np.argmin(np.abs(Dist_ij), axis=1), :].T
self._cache_input = (np.array([x, y]), np.asarray(types))
self._cache_output = [Dist_i, dDdx_i, dDdy_i, sign_i]
return self._cache_output
def distances(self, x, y, type=None):
Dist_i, _, _, _ = self.calc_distance_and_gradients(x, y, types=type)
if self.relaxation:
Dist_i += self.calc_relaxation()
return Dist_i
def gradients(self, x, y, type=None):
Dist_i, dDdx_i, dDdy_i, _ = self.calc_distance_and_gradients(x, y, types=type)
if self.relaxation:
Dist_i += self.calc_relaxation()
dDdt = -self.relaxation[0]
if self.relaxation:
gradients = np.diagflat(dDdx_i), np.diagflat(dDdy_i), np.ones(self.n_wt) * dDdt
else:
gradients = np.diagflat(dDdx_i), np.diagflat(dDdy_i)
return gradients
def main():
if __name__ == '__main__':
import matplotlib.pyplot as plt
from py_wake.wind_turbines import WindTurbines
from py_wake.wind_turbines.power_ct_functions import CubePowerSimpleCt
plt.close('all')
i1 = np.array([[2, 17], [6, 23], [16, 23], [26, 15], [19, 0], [14, 4], [4, 4]])
e1 = np.array([[0, 10], [20, 21], [22, 12], [10, 12], [9, 6], [2, 7]])
i2 = np.array([[12, 13], [14, 17], [18, 15], [17, 10], [15, 11]])
e2 = np.array([[5, 17], [5, 18], [8, 19], [8, 18]])
i3 = np.array([[5, 0], [5, 1], [10, 3], [10, 0]])
e3 = np.array([[6, -1], [6, 18], [7, 18], [7, -1]])
e4 = np.array([[15, 9], [15, 11], [20, 11], [20, 9]])
e5 = np.array([[10, 25], [20, 0]])
zones = [
InclusionZone(i1, name='i1'),
InclusionZone(i2, name='i2'),
InclusionZone(i3, name='i3'),
ExclusionZone(e1, name='e1'),
ExclusionZone(e2, name='e2'),
ExclusionZone(e3, name='e3'),
ExclusionZone(e4, name='e4'),
ExclusionZone(e5, name='e5', dist2wt=lambda: 1, geometry_type='line'),
]
N_points = 50
xs = np.linspace(-1, 30, N_points)
ys = np.linspace(-1, 30, N_points)
y_grid, x_grid = np.meshgrid(xs, ys)
x = x_grid.ravel()
y = y_grid.ravel()
n_wt = len(x)
MPBC = MultiPolygonBoundaryComp(n_wt, zones, method='nearest')
distances = MPBC.distances(x, y)
delta = 1e-9
distances2 = MPBC.distances(x + delta, y)
dx_fd = (distances2 - distances) / delta
dx = np.diag(MPBC.gradients(x + delta / 2, y)[0])
plt.figure()
plt.plot(dx_fd, dx, '.')
plt.figure()
for n, bound in enumerate(MPBC.boundaries):
x_bound, y_bound = bound[0].T
x_bound = np.append(x_bound, x_bound[0])
y_bound = np.append(y_bound, y_bound[0])
line, = plt.plot(x_bound, y_bound, label=f'{n}')
plt.plot(x_bound[0], y_bound[0], color=line.get_color(), marker='o')
plt.legend()
plt.grid()
plt.axis('square')
plt.contourf(x_grid, y_grid, distances.reshape(N_points, N_points), np.linspace(-10, 10, 100), cmap='seismic')
plt.colorbar()
plt.figure()
ax = plt.axes(projection='3d')
ax.contour3D(
x.reshape(
N_points, N_points), y.reshape(
N_points, N_points), distances.reshape(
N_points, N_points), np.linspace(-10, 10, 100), cmap='seismic')
ax.set_xlabel('x')
ax.set_ylabel('y')
ax.set_zlabel('z')
if 0:
for smpl in [0, 1, 2, 3, 4, 5, 6, 7, 8]:
MPBC = MultiPolygonBoundaryComp(n_wt, zones, simplify_geometry=smpl)
plt.figure()
ax = plt.gca()
MPBC.plot(ax)
wind_turbines = WindTurbines(names=['tb1', 'tb2'],
diameters=[80, 120],
hub_heights=[70, 110],
powerCtFunctions=[
CubePowerSimpleCt(ws_cutin=3, ws_cutout=25, ws_rated=12,
power_rated=2000, power_unit='kW',
ct=8 / 9, additional_models=[]),
CubePowerSimpleCt(ws_cutin=3, ws_cutout=25, ws_rated=12,
power_rated=3000, power_unit='kW',
ct=8 / 9, additional_models=[])])
x1 = [0, 3000, 3000, 0]
y1 = [0, 0, 3000, 3000]
b1 = np.transpose((x1, y1))
# Buildings
x2 = [600, 1400, 1400, 600]
y2 = [1700, 1700, 2500, 2500]
b2 = np.transpose((x2, y2))
# River
x3 = np.linspace(520, 2420, 16)
y3 = [0, 133, 266, 400, 500, 600, 700, 733, 866, 1300, 1633,
2100, 2400, 2533, 2700, 3000]
b3 = np.transpose((x3, y3))
# Roads
x4 = np.linspace(0, 3000, 16)
y4 = [1095, 1038, 1110, 1006, 1028, 992, 977, 1052, 1076, 1064, 1073,
1027, 964, 981, 1015, 1058]
b4 = np.transpose((x4, y4))
zones = [
InclusionZone(b1, name='i1'),
ExclusionZone(b2, dist2wt=lambda H: 4 * H - 360, name='building'),
ExclusionZone(b3, geometry_type='line', dist2wt=lambda D: 3 * D, name='river'),
ExclusionZone(b4, geometry_type='line', dist2wt=lambda D, H: max(D * 2, H * 3), name='road'),
]
N_points = 50
xs = np.linspace(0, 3000, N_points)
ys = np.linspace(0, 3000, N_points)
y_grid, x_grid = np.meshgrid(xs, ys)
x = x_grid.ravel()
y = y_grid.ravel()
n_wt = len(x)
types = np.zeros(n_wt)
TSBC = TurbineSpecificBoundaryComp(n_wt, wind_turbines, zones)
distances = TSBC.distances(x, y, type=types)
delta = 1e-9
distances2 = TSBC.distances(x + delta, y, type=types)
dx_fd = (distances2 - distances) / delta
dx = np.diag(TSBC.gradients(x + delta / 2, y, type=types)[0])
plt.figure()
plt.plot(dx_fd, dx, '.')
plt.figure()
for ll, t in enumerate(TSBC.types):
line, = plt.plot(*TSBC.ts_merged_xy_boundaries[ll][0][0][0, :], label=f'type {ll}')
for n, bound in enumerate(TSBC.ts_merged_xy_boundaries[ll]):
x_bound, y_bound = bound[0].T
x_bound = np.append(x_bound, x_bound[0])
y_bound = np.append(y_bound, y_bound[0])
plt.plot(x_bound, y_bound, color=line.get_color())
plt.legend()
plt.grid()
plt.axis('square')
for ll, t in enumerate(TSBC.types):
plt.figure()
for n, bound in enumerate(TSBC.ts_merged_xy_boundaries[ll]):
x_bound, y_bound = bound[0].T
x_bound = np.append(x_bound, x_bound[0])
y_bound = np.append(y_bound, y_bound[0])
plt.plot(x_bound, y_bound, 'b')
plt.grid()
plt.title(f'type {ll}')
plt.axis('square')
plt.contourf(x_grid, y_grid, TSBC.distances(x, y, type=t * np.ones(n_wt)).reshape(N_points, N_points), 50)
plt.colorbar()
main()