import os
import numpy as np
from scipy.signal import firwin # To create FIR kernels
from tqdm import tqdm
from modulo_vki.utils._utils import conv_m, switch_eigs
[docs]
def temporal_basis_mPOD(K, Nf, Ex, F_V, Keep, boundaries, MODE='reduced', dt=1,FOLDER_OUT: str = "./", MEMORY_SAVING: bool = False,SAT: int = 100,n_Modes=10, eig_solver: str = 'svd_sklearn_randomized'):
'''
This function computes the PSIs for the mPOD. In this implementation, a "dft-trick" is proposed, in order to avoid
expansive SVDs. Randomized SVD is used by default for the diagonalization.
:param K:
np.array Temporal correlation matrix
:param dt: float.
1/fs, the dt between snapshots. Units in seconds.
:param Nf:
np.array. Vector collecting the order of the FIR filters used in each scale.
:param Ex: int.
Extension at the boundaries of K to impose the boundary conditions (see boundaries). It must be at least as Nf.
:param F_V: np.array.
Frequency splitting vector, containing the frequencies of each scale (see article). If the time axis is in seconds, these frequencies are in Hz.
:param Keep: np.array.
Vector defining which scale to keep.
:param boundaries: str -> {'nearest', 'reflect', 'wrap' or 'extrap'}.
In order to avoid 'edge effects' if the time correlation matrix is not periodic, several boundary conditions can be used. Options are (from scipy.ndimage.convolve):
‘reflect’ (d c b a | a b c d | d c b a) The input is extended by reflecting about the edge of the last pixel.
‘nearest’ (a a a a | a b c d | d d d d) The input is extended by replicating the last pixel.
‘wrap’ (a b c d | a b c d | a b c d) The input is extended by wrapping around to the opposite edge.
:param MODE: tr -> {‘reduced’, ‘complete’, ‘r’, ‘raw’}
As a final step of this algorithm, the orthogonality is imposed via a QR-factorization. This parameterd define how to perform such factorization, according to numpy.
Options: this is a wrapper to np.linalg.qr(_, mode=MODE). Check numpy's documentation.
if ‘reduced’ The final basis will not necessarely be full. If ‘complete’ The final basis will always be full
:param FOLDER_OUT: str.
This is the directory where intermediate results will be stored if the memory saving is active.It will be ignored if MEMORY_SAVING=False.
:param MEMORY_SAVING: Bool.
If memory saving is active, the results will be saved locally. Nevertheless, since Psi_M is usually not expensive, it will be returned.
:param SAT: int.
Maximum number of modes per scale. The user can decide how many modes to compute; otherwise, modulo set the default SAT=100.
:param n_Modes: int.
Total number of modes that will be finally exported
:param eig_solver: str.
This is the eigenvalue solver that will be used. Refer to eigs_swith for the options.
:return PSI_M: np.array.
The mPOD PSIs. Yet to be sorted !
'''
if Ex < np.max(Nf):
raise RuntimeError("For the mPOD temporal basis computation Ex must be larger than or equal to Nf")
#Converting F_V in radiants and initialise number of scales M
Fs = 1 / dt
F_Bank_r = F_V * 2 / Fs
M = len(F_Bank_r)
# Loop over the scales to show the transfer functions
Psi_M = np.array([])
Lambda_M = np.array([])
n_t = K.shape[1]
# if K_S:
# Ks = np.zeros((n_t, n_t, M + 1))
#DFT-trick below: computing frequency bins.
Freqs = np.fft.fftfreq(n_t) * Fs
print("Filtering and Diagonalizing H scale: \n")
#Filtering and computing eigenvectors
for m in tqdm(range(0, M)):
# Generate the 1d filter for this
if m < 1:
if Keep[m] == 1:
# Low Pass Filter
h_A = firwin(Nf[m], F_Bank_r[m], window='hamming')
# Filter K_LP
print('\n Filtering Largest Scale')
K_L = conv_m(K=K, h=h_A, Ex=Ex, boundaries=boundaries)
# R_K = np.linalg.matrix_rank(K_L, tol=None, hermitian=True)
'''We replace it with an estimation based on the non-zero freqs the cut off frequency of the scale is '''
F_CUT = F_Bank_r[m] * Fs / 2
Indices = np.argwhere(np.abs(Freqs) < F_CUT)
R_K = np.min([len(Indices), SAT])
print(str(len(Indices)) + ' Modes Estimated')
print('\n Diagonalizing Largest Scale')
Psi_P, Lambda_P = switch_eigs(K_L, R_K, eig_solver) #svds_RND(K_L, R_K)
Psi_M=Psi_P; Lambda_M=Lambda_P
else:
print('\n Scale '+str(m)+' jumped (keep['+str(m)+']=0)')
# if K_S:
# Ks[:, :, m] = K_L # First large scale
# method = signal.choose_conv_method(K, h2d, mode='same')
elif m > 0 and m < M - 1:
if Keep[m] == 1:
# print(m)
print('\n Working on Scale '+str(m)+'/'+str(M))
# This is the 1d Kernel for Band pass
h1d_H = firwin(Nf[m], [F_Bank_r[m], F_Bank_r[m + 1]], pass_zero=False) # Band-pass
F_CUT1 = F_Bank_r[m] * Fs / 2
F_CUT2 = F_Bank_r[m + 1] * Fs / 2
Indices = np.argwhere((np.abs(Freqs) > F_CUT1) & (np.abs(Freqs) < F_CUT2))
R_K = np.min([len(Indices), SAT]) # number of frequencies here
print(str(len(Indices)) + ' Modes Estimated')
# print('Filtering H Scale ' + str(m + 1) + '/' + str(M))
K_H = conv_m(K, h1d_H, Ex, boundaries)
# Ks[:, :, m + 1] = K_H # Intermediate band-pass
print('Diagonalizing H Scale ' + str(m + 1) + '/' + str(M))
# R_K = np.linalg.matrix_rank(K_H, tol=None, hermitian=True)
Psi_P, Lambda_P = switch_eigs(K_H, R_K, eig_solver) #svds_RND(K_H, R_K) # Diagonalize scale
if np.shape(Psi_M)[0]==0: # if this is the first contribute to the basis
Psi_M=Psi_P; Lambda_M=Lambda_P
else:
Psi_M = np.concatenate((Psi_M, Psi_P), axis=1) # append to the previous
Lambda_M = np.concatenate((Lambda_M, Lambda_P), axis=0)
else:
print('\n Scale '+str(m)+' jumped (keep['+str(m)+']=0)')
else: # this is the case m=M: this is a high pass
if Keep[m] == 1:
print('Working on Scale '+str(m)+'/'+str(M))
# This is the 1d Kernel for High Pass (last scale)
h1d_H = firwin(Nf[m], F_Bank_r[m], pass_zero=False)
F_CUT1 = F_Bank_r[m] * Fs / 2
Indices = np.argwhere((np.abs(Freqs) > F_CUT1))
R_K = len(Indices)
R_K = np.min([len(Indices), SAT]) # number of frequencies here
print(str(len(Indices)) + ' Modes Estimated')
print('Filtering H Scale ' + str(m + 1) + '/ ' + str(M))
K_H = conv_m(K, h1d_H, Ex, boundaries)
# Ks[:, :, m + 1] = K_H # Last (high pass) scale
print('Diagonalizing H Scale ' + str(m + 1) + '/ ' + str(M))
# R_K = np.linalg.matrix_rank(K_H, tol=None, hermitian=True)
Psi_P, Lambda_P = switch_eigs(K_H, R_K, eig_solver) #svds_RND(K_H, R_K) # Diagonalize scale
Psi_M = np.concatenate((Psi_M, Psi_P), axis=1) # append to the previous
Lambda_M = np.concatenate((Lambda_M, Lambda_P), axis=0)
else:
print('\n Scale '+str(m)+' jumped (keep['+str(m)+']=0)')
# Now Order the Scales
Indices = np.flip(np.argsort(Lambda_M)) # find indices for sorting in decreasing order
Psi_M = Psi_M[:, Indices] # Sort the temporal structures
#print(f"Size psis in mpodtime = {np.shape(Psi_M)}")
# Now we complete the basis via re-orghotonalization
print('\n QR Polishing...')
PSI_M, R = np.linalg.qr(Psi_M, mode=MODE)
print('Done!')
if MEMORY_SAVING:
os.makedirs(FOLDER_OUT + '/mPOD', exist_ok=True)
np.savez(FOLDER_OUT + '/mPOD/Psis', Psis=PSI_M)
return PSI_M[:,0:n_Modes]