Source code for topfarm.plotting

import matplotlib
import os
from openmdao.api import ExplicitComponent
import matplotlib.pyplot as plt
import numpy as np
import topfarm
import sys


def mypause(interval):
    # pause without show
    backend = plt.rcParams['backend']
    if backend in matplotlib.rcsetup.interactive_bk:
        figManager = matplotlib._pylab_helpers.Gcf.get_active()
        if figManager is not None:
            canvas = figManager.canvas
            if canvas.figure.stale:
                canvas.draw()
            canvas.start_event_loop(interval)
            return


[docs]class NoPlot(): """Plotting component for no plotting""" def show(self): pass
[docs]class XYPlotComp(ExplicitComponent): """Plotting component for turbine locations""" # colors = ['b', 'r', 'm', 'c', 'g', 'y', 'orange', 'indigo', 'grey'] * 100 colors = [c['color'] for c in iter(matplotlib.rcParams['axes.prop_cycle'])] * 100
[docs] def __init__(self, memory=10, delay=0.001, plot_initial=True, plot_improvements_only=False, ax=None, legendloc=1, save_plot_per_iteration=False): """Initialize component for plotting turbine locations Parameters ---------- memory : int, optional Number of previous iterations to remember delay : float, optional Time delay in seconds between plotting updates plot_initial : bool, optional Flag to plot the initial turbine locations plot_improvements_only : bool, optional Flag to plot only improvements in cost ax : matplotlib axes, optional Axes into which to make the plot legendloc : int Location of the legend in the plot """ ExplicitComponent.__init__(self) self.delay = delay self.plot_improvements_only = plot_improvements_only self._ax = ax self.memory = memory self.delay = max([delay, 1e-6]) self.plot_initial = plot_initial self.history = [] self.counter = 0 self.by_pass = False self.legendloc = legendloc self.save_plot_per_iteration = save_plot_per_iteration
@property def ax(self): return self._ax or plt.gca() def show(self): plt.show() def setup(self): if topfarm.x_key in self.problem.design_vars: units_x = self.problem.design_vars[topfarm.x_key][-1] else: units_x = None if topfarm.y_key in self.problem.design_vars: units_y = self.problem.design_vars[topfarm.y_key][-1] else: units_y = None self.add_input(topfarm.x_key, np.zeros(self.n_wt), units=units_x) self.add_input(topfarm.y_key, np.zeros(self.n_wt), units=units_y) if hasattr(self.problem, 'xy_boundary'): self.xy_boundary = self.problem.xy_boundary if hasattr(self.problem.cost_comp, 'output_key'): self.cost_key = self.problem.cost_comp.output_key self.cost_unit = self.problem.cost_comp.output_unit else: self.cost_key = "Cost" self.cost_unit = "" self.add_input(self.cost_key, 0.) self.add_output('plot_counter') def init_plot(self, limits): self.ax.cla() self.ax.axis('equal') mi = limits.min(0) ma = limits.max(0) ra = ma - mi + 1 ext = .1 xlim, ylim = np.array([mi - ext * ra, ma + ext * ra]).T self.ax.set_xlim(xlim) self.ax.set_ylim(ylim) # def plot_boundary(self): # b = np.r_[self.xy_boundary[:], self.xy_boundary[:1]] # plt.plot(b[:, 0], b[:, 1], 'k') def plot_constraints(self): for constr in self.problem.model.constraint_components: constr.plot(self.ax) def plot_history(self, x, y): rec = self.problem.recorder if rec.num_cases > 0: def get(xy, xy_key, pw): rec_xy = pw[xy_key][-self.memory:] if len(rec_xy.shape) == 1: rec_xy = rec_xy[:, np.newaxis] return np.r_[rec_xy, [xy]] pw = self.problem.get_vars_from_recorder() x = get(x, topfarm.x_key, pw) y = get(y, topfarm.y_key, pw) for c, x_, y_ in zip(self.colors, x.T, y.T): self.ax.plot(x_, y_, '--', color=c) def plot_initial2current(self, x0, y0, x, y): rec = self.problem.recorder if rec.num_cases > 0: pw = self.problem.get_vars_from_recorder() x0 = np.atleast_1d(pw['x0']) y0 = np.atleast_1d(pw['y0']) for c, x0_, y0_, x_, y_ in zip(self.colors, x0, y0, x, y): self.ax.plot(x0_, y0_, '>', markerfacecolor=c, markeredgecolor='k') self.ax.plot((x0_, x_), (y0_, y_), '-', color=c) self.ax.plot([], [], '>k', markerfacecolor="#00000000", markeredgecolor='k', label='Initial position') def plot_current_position(self, x, y): for c, x_, y_ in zip(self.colors, x, y): self.ax.plot(x_, y_, 'o', color=c, ms=5) self.ax.plot(x_, y_, 'xk', ms=4) self.ax.plot([], [], 'xk', label='Current position') def set_title(self, cost0, cost): rec = self.problem.recorder if hasattr(self.problem.cost_comp, 'inc_or_exp'): inc_or_exp = self.problem.cost_comp.inc_or_exp else: inc_or_exp = 1.0 title = "%d) %s %f %s" % (rec.num_cases, self.cost_key, cost * inc_or_exp, self.cost_unit) if cost0 != 0: title += " (%+.2f%%)" % ((cost - cost0) / cost0 * 100) self.ax.set_title(title) def get_initial(self): rec = self.problem.recorder if rec.num_cases > 0: pw = self.problem.get_vars_from_recorder() cost0 = self.problem.recorder[self.cost_key][0] # cost0 = pw['cost0'] return pw['x0'], pw['y0'], cost0 def compute(self, inputs, outputs): if self.by_pass is False: cost = inputs[self.cost_key][0] if (self.plot_improvements_only and 'cost' in self.problem.recorder.driver_iteration_dict and len(self.problem.recorder['cost']) and cost > self.problem.recorder['cost'].min()): return # find limits def get_lim(key): if (key in self.problem.design_vars and isinstance(self.problem.design_vars[key], tuple) and len(self.problem.design_vars[key]) == 4): return np.min(self.problem.design_vars[key][1]), np.max(np.min(self.problem.design_vars[key][2])) else: return min(inputs[key]), max(inputs[key]) min_x, max_x = get_lim(topfarm.x_key) min_y, max_y = get_lim(topfarm.y_key) self.init_plot(np.array([[min_x, min_y], [max_x, max_y]])) self.plot_constraints() initial = self.get_initial() x = inputs[topfarm.x_key] y = inputs[topfarm.y_key] if initial is not None: x0, y0, cost0 = initial if self.plot_initial: self.plot_initial2current(x0, y0, x, y) if self.memory > 0: self.plot_history(x, y) else: cost0 = cost self.plot_current_position(x, y) self.set_title(cost0, cost) self.ax.legend(loc=self.legendloc) if self.counter == 0: plt.pause(1e-6) mypause(self.delay) self.counter += 1 outputs['plot_counter'] = self.counter if self.save_plot_per_iteration: fig = self.ax if not os.path.exists('Figures'): os.makedirs('Figures') plt.savefig('Figures/iteration_%s.png' % self.counter)
class PlotComp(XYPlotComp): def __init__(self, memory=10, delay=0.001, plot_initial=True, plot_improvements_only=False, ax=None): XYPlotComp.__init__(self, memory=memory, delay=delay, plot_initial=plot_initial, plot_improvements_only=plot_improvements_only, ax=ax) sys.stderr.write("%s is deprecated. Use XYPlotComp instead\n" % self.__class__.__name__)
[docs]class TurbineTypePlotComponent(XYPlotComp): """Plotting component for turbine types""" colors = np.array(['b', 'r', 'm', 'c', 'g', 'y', 'orange', 'indigo', 'grey'] * 10) markers = np.array(list("123v^<>.o48spP*hH+xXDd|_"))
[docs] def __init__(self, turbine_type_names, **kwargs): """Initialize component for plotting turbine types Parameters ---------- turbine_type_names : list of str Names of turbine types for legend **kwargs : keyword arguments, optional Keyword arguments that can be passed into XYPlotComp """ self.turbine_type_names = turbine_type_names XYPlotComp.__init__(self, **kwargs)
def setup(self): XYPlotComp.setup(self) self.add_input(topfarm.type_key, np.zeros(self.n_wt, dtype=int)) def compute(self, inputs, outputs): self.types = np.asarray(inputs[topfarm.type_key], dtype=int) XYPlotComp.compute(self, inputs, outputs) def init_plot(self, limits): XYPlotComp.init_plot(self, limits) for m, n, c in zip(self.markers, self.turbine_type_names, self.colors): self.ax.plot([], [], m + c, label=n) self.ax.legend() def plot_initial2current(self, x0, y0, x, y): rec = self.problem.recorder if rec.num_cases > 0: pw = self.problem.get_vars_from_recorder() x0 = np.atleast_1d(pw['x0']) y0 = np.atleast_1d(pw['y0']) for c, x0_, y0_, x_, y_ in zip(self.colors[self.types], x0, y0, x, y): self.ax.plot(x0_, y0_, '>', markerfacecolor=c, markeredgecolor='k') self.ax.plot((x0_, x_), (y0_, y_), '-', color=c) self.ax.plot([], [], '>k', markerfacecolor="#00000000", markeredgecolor='k', label='Initial position') def plot_history(self, x, y): rec = self.problem.recorder if rec.num_cases > 0: def get(xy, xy_key, pw): rec_xy = pw[xy_key][-self.memory:] if len(rec_xy.shape) == 1: rec_xy = rec_xy[:, np.newaxis] return np.r_[rec_xy, [xy]] pw = self.problem.get_vars_from_recorder() x = get(x, topfarm.x_key, pw) y = get(y, topfarm.y_key, pw) for c, x_, y_ in zip(self.colors[self.types], x.T, y.T): self.ax.plot(x_, y_, '--', color=c) def plot_current_position(self, x, y): for m, c, x_, y_ in zip(self.markers[self.types], self.colors[self.types], x, y): # self.ax.plot(x_, y_, 'o', color=c, ms=5) self.ax.plot(x_, y_, m + 'k', markeredgecolor=c, markeredgewidth=1, ms=20)
# class TurbineCablePlotComponent(XYPlotComp): # """Plotting component for electrical colletion system""" # colors = ['b', 'r', 'm', 'c', 'g', 'y', 'orange', 'indigo', 'grey'] * 100 # def __init__(self, ecsga, **kwargs): # """Initialize component for plotting turbine types # Parameters # ---------- # turbine_type_names : list of str # Names of turbine types for legend # **kwargs : keyword arguments, optional # Keyword arguments that can be passed into XYPlotComp # """ # self.ecsga = ecsga # XYPlotComp.__init__(self, **kwargs) # def setup(self): # XYPlotComp.setup(self) # self.add_input('tree', np.zeros((self.n_wt, 5))) # def compute(self, inputs, outputs): # self.tree = np.asarray(inputs['tree']) # XYPlotComp.compute(self, inputs, outputs) # def init_plot(self, limits): # XYPlotComp.init_plot(self, limits) # for n, cable_type in enumerate(self.ecsga.Cable.ID): # index = self.tree[:, 3] == n # if index.any(): # self.ax.plot([], [], self.colors[n], label='Cable: {} mm2'.format(self.ecsga.Cable.CrossSection[n])) # self.ax.legend() # def plot_current_position(self, x, y): # CoordX = self.ecsga.CoordX # CoordY = self.ecsga.CoordY # CoordX[1:] = x # CoordY[1:] = y # self.ax.plot(CoordX[0], CoordY[0], 'ro', markersize=10, label='OSS') # for n, cable_type in enumerate(self.ecsga.Cable.ID): # index = self.tree[:, 3].astype(int) == n # if index.any(): # xs = CoordX[(self.tree[index].T[:2] - 1).astype(int)] # ys = CoordY[(self.tree[index].T[:2] - 1).astype(int)] # self.ax.plot(xs, ys, self.colors[n]) # class AggregatedConstraintsPlotComponent(XYPlotComp): # colors = np.array(['b', 'r', 'm', 'c', 'g', 'y', 'orange', 'indigo', 'grey'] * 10) # markers = np.array(list("123v^<>.o48spP*hH+xXDd|_")) # def show(self): # pass # def plot_constraints(self): # # if len(self.problem.model.aggr_comp.constraints) != 0: # # for constr in self.problem.model.aggr_comp.constraints[0].constraints: # # constr.constraintComponent.plot(self.ax) # # else: # for constr in self.problem.model.constraint_components: # constr.plot(self.ax) # def plot_history(self, x, y): # rec = self.problem.recorder # if rec.num_cases > 0: # def get(xy, xy_key, pw): # rec_xy = pw[xy_key][-self.memory:] # if len(rec_xy.shape) == 1: # rec_xy = rec_xy[:, np.newaxis] # return np.r_[rec_xy, [xy]] # pw = self.problem.get_vars_from_recorder() # x = get(x, topfarm.x_key, pw) # y = get(y, topfarm.y_key, pw) # for c, x_, y_ in zip(self.colors, x.T, y.T): # self.ax.plot(x_, y_, '--', color=c) # def plot_initial2current(self, x0, y0, x, y): # rec = self.problem.recorder # if rec.num_cases > 0: # pw = self.problem.get_vars_from_recorder() # x0 = np.atleast_1d(pw['x0']) # y0 = np.atleast_1d(pw['y0']) # for c, x0_, y0_, x_, y_ in zip(self.colors, x0, y0, x, y): # self.ax.plot(x0_, y0_, '>', markerfacecolor=c, markeredgecolor='k') # self.ax.plot((x0_, x_), (y0_, y_), '-', color=c) # self.ax.plot([], [], '>k', markerfacecolor="#00000000", markeredgecolor='k', label='Initial position') # def plot_current_position(self, x, y): # for c, x_, y_ in zip(self.colors, x, y): # self.ax.plot(x_, y_, 'o', color=c, ms=5) # self.ax.plot(x_, y_, 'xk', ms=4) # self.ax.plot([], [], 'xk', label='Current position')