Skip to content

Commit

Permalink
linting
Browse files Browse the repository at this point in the history
  • Loading branch information
ssolson committed Feb 6, 2024
1 parent 13ca14b commit d25369a
Showing 1 changed file with 172 additions and 75 deletions.
247 changes: 172 additions & 75 deletions mhkit/loads/extreme/mler.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
visualization.
"""

from typing import Union, List, Optional, Dict
from typing import Union, List, Optional, Dict, Any

import pandas as pd
import xarray as xr
Expand Down Expand Up @@ -106,15 +106,15 @@ def mler_coefficients(
wave_spectrum = wave_spectrum.to_numpy() / (2 * np.pi)

# get frequency step
dw = 2.0 * np.pi / (len(freq_hz) - 1)
d_w = 2.0 * np.pi / (len(freq_hz) - 1)

# Note: waves.A is "S" in Quon2016; 'waves' naming convention
# matches WEC-Sim conventions (EWQ)
# Response spectrum [(response units)^2-s/rad] -- Quon2016 Eqn. 3
spectrum_r = np.abs(rao_array) ** 2 * (2 * wave_spectrum)

# calculate spectral moments and other important spectral values.
m0 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 0).iloc[0, 0]
m_0 = frequency_moment(pd.Series(spectrum_r, index=freq_hz), 0).iloc[0, 0]
m1_m2 = (
frequency_moment(pd.Series(spectrum_r, index=freq_hz), 1).iloc[0, 0],
frequency_moment(pd.Series(spectrum_r, index=freq_hz), 2).iloc[0, 0],
Expand All @@ -124,12 +124,12 @@ def mler_coefficients(
# Drummen version. Dietz has negative of this.
_coeff_a_rn = (
np.abs(rao)
* np.sqrt(2 * wave_spectrum * dw)
* np.sqrt(2 * wave_spectrum * d_w)
* (
(m1_m2[1] - freq_hz * m1_m2[0])
+ (m1_m2[0] / m0) * (freq_hz * m0 - m1_m2[0])
+ (m1_m2[0] / m_0) * (freq_hz * m_0 - m1_m2[0])
)
/ (m0 * m1_m2[1] - m1_m2[0] ** 2)
/ (m_0 * m1_m2[1] - m1_m2[0] ** 2)
)
# save the new spectral info to pass out
# Phase delay should be a positive number in this convention (AP)
Expand Down Expand Up @@ -235,8 +235,7 @@ def mler_wave_amp_normalize(
mler: Union[pd.DataFrame, xr.Dataset],
sim: SimulationParameters,
k: Union[NDArray[np.float_], List[float], pd.Series],
frequency_dimension: str = "",
to_pandas: bool = True,
**kwargs: Any,
) -> Union[pd.DataFrame, xr.Dataset]:
"""
Function that renormalizes the incoming amplitude of the MLER wave
Expand Down Expand Up @@ -264,8 +263,11 @@ def mler_wave_amp_normalize(
mler_norm : pandas DataFrame or xarray Dataset
MLER coefficients
"""
if not isinstance(k, np.ndarray):
k = np.array(k, dtype=float)
frequency_dimension = kwargs.get("frequency_dimension", "")
to_pandas = kwargs.get("to_pandas", True)

k_array = np.array(k, dtype=float) if not isinstance(k, np.ndarray) else k

if not isinstance(mler, (pd.DataFrame, xr.Dataset)):
raise TypeError(
f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}"
Expand All @@ -274,52 +276,63 @@ def mler_wave_amp_normalize(
raise TypeError(f"wave_amp must be of type int or float. Got: {type(wave_amp)}")
if not isinstance(sim, dict):
raise TypeError(f"sim must be of type dict. Got: {type(sim)}")
if not isinstance(k, np.ndarray):
raise TypeError(f"k must be of type ndarray. Got: {type(k)}")
if not isinstance(frequency_dimension, str):
raise TypeError(
"frequency_dimension must be of type bool."
+ f"Got: {type(frequency_dimension)}"
)
if not isinstance(to_pandas, bool):
raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")

# If input is pandas, convert to xarray
if isinstance(mler, pd.DataFrame):
mler = mler.to_xarray()

if frequency_dimension == "":
frequency_dimension = list(mler.coords)[0]
freq = mler.coords[frequency_dimension].values * 2 * np.pi
dw = (max(freq) - min(freq)) / (len(freq) - 1) # get delta

wave_amp_time = np.zeros((sim["maxIX"], sim["maxIT"]))
for ix, x in enumerate(sim["X"]):
for it, t in enumerate(sim["T"]):
# conditioned wave
wave_amp_time[ix, it] = np.sum(
np.sqrt(2 * mler["WaveSpectrum"] * dw)
* np.cos(freq * (t - sim["T0"]) - k * (x - sim["X0"]) + mler["Phase"])
mler_xr = mler.to_xarray() if isinstance(mler, pd.DataFrame) else mler()

# Determine frequency dimension
freq_dim = frequency_dimension or list(mler_xr.coords)[0]
# freq = mler_xr.coords[freq_dim].values * 2 * np.pi
# d_w = np.diff(freq).mean()

wave_amp_time = np.array(
[
np.sum(
np.sqrt(
2
* mler_xr["WaveSpectrum"].values
* np.diff(mler_xr.coords[freq_dim].values * 2 * np.pi).mean()
)
* np.cos(
mler_xr.coords[freq_dim].values * 2 * np.pi * (t - sim["T0"])
- k_array * (x - sim["X0"])
+ mler_xr["Phase"].values
)
)
for x in np.linspace(sim["startX"], sim["endX"], sim["maxIX"])
for t in np.linspace(sim["startTime"], sim["endTime"], sim["maxIT"])
]
).reshape(sim["maxIX"], sim["maxIT"])

tmp_max_amp = np.max(np.abs(wave_amp_time))

# renormalization of wave amplitudes
rescale_fact = np.abs(wave_amp) / np.abs(tmp_max_amp)

# rescale the wave spectral amplitude coefficients
mler_norm = mler["WaveSpectrum"] * rescale_fact**2
mler_norm = mler_norm.to_dataset()
mler_norm = mler_norm.assign({"Phase": (frequency_dimension, mler["Phase"].data)})

if to_pandas:
mler_norm = mler_norm.to_pandas()
rescale_fact = np.abs(wave_amp) / np.max(np.abs(wave_amp_time))

return mler_norm
# Rescale the wave spectral amplitude coefficients and assign phase
mler_norm = xr.Dataset(
{
"WaveSpectrum": (
["frequency"],
mler_xr["WaveSpectrum"].data * rescale_fact**2,
),
"Phase": (["frequency"], mler_xr["Phase"].data),
},
coords={"frequency": (["frequency"], mler_xr.coords[freq_dim].data)},
)
return mler_norm.to_pandas() if to_pandas else mler_norm


def mler_export_time_series(
rao: Union[NDArray[np.float_], List[float], pd.Series],
mler: Union[pd.DataFrame, xr.Dataset],
sim: SimulationParameters,
k: Union[NDArray[np.float_], List[float], pd.Series],
frequency_dimension: str = "",
to_pandas: bool = True,
**kwargs: Any,
) -> Union[pd.DataFrame, xr.Dataset]:
"""
Generate the wave amplitude time series at X0 from the calculated
Expand Down Expand Up @@ -349,12 +362,17 @@ def mler_export_time_series(
by time [s].
"""
frequency_dimension = kwargs.get("frequency_dimension", "")
to_pandas = kwargs.get("to_pandas", True)

rao_array = np.array(rao, dtype=float) if not isinstance(rao, np.ndarray) else rao
k_array = np.array(k, dtype=float) if not isinstance(k, np.ndarray) else k
# If input is pandas, convert to xarray
mler_xr = mler if isinstance(mler, xr.Dataset) else mler.to_xarray()

if not isinstance(rao_array, np.ndarray):
raise TypeError(f"rao must be of type ndarray. Got: {type(rao_array)}")
if not isinstance(mler, (pd.DataFrame, xr.Dataset)):
if not isinstance(mler_xr, (xr.Dataset)):
raise TypeError(
f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}"
)
Expand All @@ -365,42 +383,121 @@ def mler_export_time_series(
if not isinstance(to_pandas, bool):
raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")

# If input is pandas, convert to xarray
if isinstance(mler, pd.DataFrame):
mler = mler.to_xarray()
# Handle optional frequency dimension
freq_dim = frequency_dimension if frequency_dimension else list(mler_xr.coords)[0]
freq = mler_xr.coords[freq_dim].values * 2 * np.pi
dw = np.diff(freq).mean()

if frequency_dimension == "":
frequency_dimension = list(mler.coords)[0]
freq = mler.coords[frequency_dimension].values * 2 * np.pi
dw = (max(freq) - min(freq)) / (len(freq) - 1) # get delta

# calculate the series
wave_amp_time = np.zeros((sim["maxIT"], 2))
xi = sim["X0"]
for i, ti in enumerate(sim["T"]):
# conditioned wave
wave_amp_time[i, 0] = np.sum(
np.sqrt(2 * mler["WaveSpectrum"] * dw)
* np.cos(
freq * (ti - sim["T0"]) + mler["Phase"] - k_array * (xi - sim["X0"])
)
)
# Response calculation
wave_amp_time[i, 1] = np.sum(
np.sqrt(2 * mler["WaveSpectrum"] * dw)
* np.abs(rao_array)
* np.cos(freq * (ti - sim["T0"]) - k_array * (xi - sim["X0"]))
)
# Calculation loop optimized with numpy operations
cos_terms = np.cos(
freq * (sim["T"][:, None] - sim["T0"])
- k_array * (sim["X0"] - sim["X0"])
+ mler_xr["Phase"].values
)
wave_height = np.sum(np.sqrt(2 * mler_xr["WaveSpectrum"] * dw) * cos_terms, axis=1)
linear_response = np.sum(
np.sqrt(2 * mler_xr["WaveSpectrum"] * dw) * np.abs(rao_array) * cos_terms,
axis=1,
)

# Construct the output dataset
mler_ts = xr.Dataset(
data_vars={
"WaveHeight": (["time"], wave_amp_time[:, 0]),
"LinearResponse": (["time"], wave_amp_time[:, 1]),
{
"WaveHeight": ("time", wave_height),
"LinearResponse": ("time", linear_response),
},
coords={"time": sim["T"]},
)

if to_pandas:
mler_ts = mler_ts.to_pandas()

return mler_ts
# Convert to pandas DataFrame if requested
return mler_ts.to_dataframe() if to_pandas else mler_ts


# ORIGINAL TO MATCH
# def mler_export_time_series(rao, mler, sim, k, frequency_dimension="", to_pandas=True):
# """
# Generate the wave amplitude time series at X0 from the calculated
# MLER coefficients

# Parameters
# ----------
# rao: numpy ndarray
# Response amplitude operator.
# mler: pandas DataFrame or xarray Dataset
# MLER coefficients dataframe generated from an MLER function.
# sim: dict
# Simulation parameters formatted by output from
# 'mler_simulation'.
# k: numpy ndarray
# Wave number.
# frequency_dimension: string (optional)
# Name of the xarray dimension corresponding to frequency. If not supplied,
# defaults to the first dimension. Does not affect pandas input.
# to_pandas: bool (optional)
# Flag to output pandas instead of xarray. Default = True.

# Returns
# -------
# mler_ts: pandas DataFrame or xarray Dataset
# Time series of wave height [m] and linear response [*] indexed
# by time [s].

# """
# try:
# rao = np.array(rao)
# except:
# pass
# try:
# k = np.array(k)
# except:
# pass
# if not isinstance(rao, np.ndarray):
# raise TypeError(f"rao must be of type ndarray. Got: {type(rao)}")
# if not isinstance(mler, (pd.DataFrame, xr.Dataset)):
# raise TypeError(
# f"mler must be of type pd.DataFrame or xr.Dataset. Got: {type(mler)}"
# )
# if not isinstance(sim, dict):
# raise TypeError(f"sim must be of type dict. Got: {type(sim)}")
# if not isinstance(k, np.ndarray):
# raise TypeError(f"k must be of type ndarray. Got: {type(k)}")
# if not isinstance(to_pandas, bool):
# raise TypeError(f"to_pandas must be of type bool. Got: {type(to_pandas)}")

# # If input is pandas, convert to xarray
# if isinstance(mler, pd.DataFrame):
# mler = mler.to_xarray()

# if frequency_dimension == "":
# frequency_dimension = list(mler.coords)[0]
# freq = mler.coords[frequency_dimension].values * 2 * np.pi
# dw = (max(freq) - min(freq)) / (len(freq) - 1) # get delta

# # calculate the series
# wave_amp_time = np.zeros((sim["maxIT"], 2))
# xi = sim["X0"]
# for i, ti in enumerate(sim["T"]):
# # conditioned wave
# wave_amp_time[i, 0] = np.sum(
# np.sqrt(2 * mler["WaveSpectrum"] * dw)
# * np.cos(freq * (ti - sim["T0"]) + mler["Phase"] - k * (xi - sim["X0"]))
# )
# # Response calculation
# wave_amp_time[i, 1] = np.sum(
# np.sqrt(2 * mler["WaveSpectrum"] * dw)
# * np.abs(rao)
# * np.cos(freq * (ti - sim["T0"]) - k * (xi - sim["X0"]))
# )

# mler_ts = xr.Dataset(
# data_vars={
# "WaveHeight": (["time"], wave_amp_time[:, 0]),
# "LinearResponse": (["time"], wave_amp_time[:, 1]),
# },
# coords={"time": sim["T"]},
# )

# if to_pandas:
# mler_ts = mler_ts.to_pandas()

# return mler_ts

0 comments on commit d25369a

Please sign in to comment.