Source code for spaudiopy.process

# -*- coding: utf-8 -*-
"""Collection of audio processing tools.

.. plot::
    :context: reset

    import numpy as np
    import matplotlib.pyplot as plt
    plt.rcParams['axes.grid'] = True

    import spaudiopy as spa

**Memory cached functions**

.. autofunction:: spaudiopy.process.resample_hrirs(hrir_l, hrir_r, fs_hrir, fs_target, jobs_count=None)

"""

import numpy as np
import resampy
import pickle
from scipy import signal
from joblib import Memory
import multiprocessing
import logging

from . import utils, sph, sig, grids


# Prepare Caching
cachedir = './.spa_cache_dir'
memory = Memory(cachedir)


[docs]@memory.cache def resample_hrirs(hrir_l, hrir_r, fs_hrir, fs_target, jobs_count=None): """ Resample HRIRs to new SamplingRate(t), using multiprocessing. Parameters ---------- hrir_l : (g, h) numpy.ndarray h(t) for grid position g. hrir_r : (g, h) numpy.ndarray h(t) for grid position g. fs_hrir : int Current fs(t) of hrirs. fs_target : int Target fs(t) of hrirs. jobs_count : int or None, optional Number of parallel jobs, 'None' employs 'cpu_count'. Returns ------- hrir_l_resampled : (g, h_n) numpy.ndarray h_n(t) resampled for grid position g. hrir_r_resampled : (g, h_n) numpy.ndarray h_n(t) resampled for grid position g. fs_hrir : int New fs(t) of hrirs. """ if jobs_count is None: jobs_count = multiprocessing.cpu_count() hrir_l_resampled = np.zeros([hrir_l.shape[0], int(hrir_l.shape[1] * fs_target / fs_hrir)]) hrir_r_resampled = np.zeros_like(hrir_l_resampled) if jobs_count == 1: hrir_l_resampled = resampy.resample(hrir_l, fs_hrir, fs_target, axis=1) hrir_r_resampled = resampy.resample(hrir_r, fs_hrir, fs_target, axis=1) elif jobs_count > 1: logging.info("Using %i processes..." % jobs_count) with multiprocessing.Pool(processes=jobs_count) as pool: results = pool.starmap(resampy.resample, map(lambda x: (x, fs_hrir, fs_target), hrir_l)) hrir_l_resampled = np.array(results) results = pool.starmap(resampy.resample, map(lambda x: (x, fs_hrir, fs_target), hrir_r)) hrir_r_resampled = np.array(results) fs_hrir = fs_target return hrir_l_resampled, hrir_r_resampled, fs_hrir
[docs]def resample_signal(s_time, fs_current, fs_target, axis=-1): """ Resample time signal. Parameters ---------- s_time : numpy.ndarray Time signal, or signals stacked. fs_current : int fs_target : int axis : int, optional Axis along which to resample. The default is -1. Returns ------- single_spec_resamp : numpy.ndarray. """ s_time = np.atleast_2d(s_time) s_time_resamp = resampy.resample(s_time, fs_current, fs_target, axis=axis) return np.squeeze(s_time_resamp)
[docs]def resample_spectrum(single_spec, fs_current, fs_target, axis=-1): """ Resample single sided spectrum, as e.g. from np.fft.rfft(). Parameters ---------- single_spec : numpy.ndarray Single sided spectrum, or spectra stacked. fs_current : int fs_target : int axis : int, optional Axis along which to resample. The default is -1. Returns ------- single_spec_resamp : numpy.ndarray. """ single_spec = np.atleast_2d(single_spec) s_time = np.fft.irfft(single_spec, axis=axis) s_time_resamp = resampy.resample(s_time, fs_current, fs_target, axis=axis) single_spec_resamp = np.fft.rfft(s_time_resamp, axis=axis) return np.squeeze(single_spec_resamp)
[docs]def hrirs_ctf(hrirs, MIN_PHASE=True, freq_lims=(125, 10e3), grid_weights=None): """ Get common transfer function (CTF) EQ for HRIRs. Often used to equalize the direction independent coloration of a measurement. Can be used to replace headphone EQ. Parameters ---------- hrirs : sig.HRIRs MIN_PHASE : bool, optional Minimum phase EQ. The default is True. freq_lims : tuple, optional Frequency limits of inversion. The default is (125, 10e3). grid_weights : array_like, optional Grid weights of hrirs, `None` will calculate them. The default is None. Returns ------- eq_taps : np.ndarray EQ filter taps, same length as HRIRs. """ if grid_weights is None: grid_weights = grids.calculate_grid_weights(hrirs.azi, hrirs.zen, 5) num_taps = hrirs.num_samples num_grid_points = hrirs.num_grid_points assert len(grid_weights) == num_grid_points nfft = 16 * num_taps H_left = np.fft.rfft(hrirs.left, nfft, axis=-1) H_right = np.fft.rfft(hrirs.right, nfft, axis=-1) H_avg_left = 0.5*np.sqrt(np.sum(grid_weights[:, None] * np.abs(H_left)**2, axis=0)) / (4*np.pi) H_avg_right = 0.5*np.sqrt(np.sum(grid_weights[:, None] * np.abs(H_right)**2, axis=0)) / (4*np.pi) # Smoothing H_avg_smooth = 0.5*frac_octave_smoothing(np.squeeze(H_avg_left), 1, WEIGHTED=True) + \ 0.5*frac_octave_smoothing(np.squeeze(H_avg_right), 1, WEIGHTED=True) freq_vec = np.fft.rfftfreq(nfft, 1/hrirs.fs) freq_weights = np.ones(len(freq_vec)) idx_lo = np.argmin(abs(freq_vec - freq_lims[0])) idx_hi = np.argmin(abs(freq_vec - freq_lims[1])) w_lo = np.hanning(2*idx_lo + 1)[:idx_lo] w_hi = np.hanning(2*(len(freq_vec)-idx_hi)+1)[(len(freq_vec)-idx_hi)+1:] freq_weights[:idx_lo] = w_lo freq_weights[idx_hi:] = w_hi # norm filters idx_1k = np.argmin(abs(freq_vec - 1000)) H_target = H_avg_smooth / np.mean(np.abs(H_avg_smooth[idx_1k-5:idx_1k+5])) H_weighted_inv = freq_weights * (1 / H_target) + \ (1 - freq_weights) * np.ones(len(freq_weights)) # get EQ if MIN_PHASE: eq_taps = signal.minimum_phase( np.fft.fftshift(np.fft.irfft(H_weighted_inv**2)))[:num_taps] else: eq_taps = np.roll(np.fft.irfft(H_weighted_inv), num_taps//2+1)[:num_taps] eq_taps *= np.hanning(num_taps) return eq_taps
[docs]def ilds_from_hrirs(hrirs, f_cut=(1e3, 20e3), TODB=True): """Calculate ILDs from HRIRs by high/band-passed broad-band RMS. Parameters ---------- hrirs : sig.HRIRs f_cut : float (2,), optional Band-pass cutoff frequencies. The default is (1000, 20000). TODB : bool, optional ILD in dB RMS ratio, otherwise as RMS difference. The default is TRUE. Returns ------- ild : array_like ILD per grid point, positive value indicates left ear louder. """ assert isinstance(hrirs, sig.HRIRs) fs = hrirs.fs sos = signal.butter(2, f_cut, 'bandpass', fs=fs, output='sos') hrirs_l_f = signal.sosfiltfilt(sos, hrirs.left, axis=-1) hrirs_r_f = signal.sosfiltfilt(sos, hrirs.right, axis=-1) if TODB: rms_diff = utils.db(utils.rms(hrirs_l_f, axis=-1) / utils.rms(hrirs_r_f, axis=-1)) else: rms_diff = utils.rms(hrirs_l_f, axis=-1) - \ utils.rms(hrirs_r_f, axis=-1) return rms_diff
[docs]def itds_from_hrirs(hrirs, f_cut=(100, 1.5e3), upsample=4): """Calculate ITDs from HRIRs inter-aural cross-correlation (IACC). The method calculates IACC on energy of upsampled, filtered HRIRs. Parameters ---------- hrirs : sig.HRIRs f_cut : float (2,), optional Band-pass cutoff frequencies. The default is (100, 1500). upsample : int, optional Upsampling factor. The default is 4. Returns ------- itd : array_like ITD in seconds per grid point, positive value indicates left ear first. References ---------- Andreopoulou, A., & Katz, B. F. G. (2017). Identification of perceptually relevant methods of inter-aural time difference estimation. JASA. """ assert isinstance(hrirs, sig.HRIRs) fs_rs = upsample*hrirs.fs hrirs_l_us, hrirs_r_us, _ = resample_hrirs(hrirs.left, hrirs.right, hrirs.fs, fs_rs) sos = signal.butter(2, f_cut, 'bandpass', fs=fs_rs, output='sos') hrirs_l_us = signal.sosfiltfilt(sos, hrirs_l_us, axis=-1) hrirs_r_us = signal.sosfiltfilt(sos, hrirs_r_us, axis=-1) maxidx = np.zeros(hrirs.num_grid_points) for idx, hrirs_dir in enumerate(zip(hrirs_l_us, hrirs_r_us)): maxidx[idx] = np.argmax(np.correlate(hrirs_dir[0]**2, hrirs_dir[1]**2, mode='same')) maxidx -= hrirs_l_us.shape[1]//2 # alternative # maxidx = np.argmax(hrirs_l_us, axis=-1) - np.argmax(hrirs_r_us, axis=-1) itd = -maxidx / fs_rs return itd
[docs]def match_loudness(sig_in, sig_target): """ Match loundess of input to target, based on RMS and avoid clipping. Parameters ---------- sig_in : (n, c) array_like Input(t) samples n, channel c. sig_target : (n, c) array_like Target(t) samples n, channel c. Returns ------- sig_out : (n, c) array_like Output(t) samples n, channel c. """ L_in = np.max(np.sqrt(np.mean(np.square(sig_in), axis=0))) L_target = np.max(np.sqrt(np.mean(np.square(sig_target), axis=0))) sig_out = sig_in * L_target / L_in peak = np.max(np.abs(sig_out)) if peak > 1: sig_out = sig_out / peak print('Audio normalized') return sig_out
[docs]def ambeo_a2b(Ambi_A, filter_coeffs=None): """Convert A 'MultiSignal' (type I: FLU, FRD, BLD, BRU) to B AmbiBSignal. Parameters ---------- Ambi_A : sig.MultiSignal Input signal. filter_coeffs : string Picklable file that contains b0_d, a0_d, b1_d, a1_d. Returns ------- Ambi_B : sig.AmbiBSignal B-format output signal. """ _B = sph.soundfield_to_b(Ambi_A.get_signals()) Ambi_B = sig.AmbiBSignal([_B[0, :], _B[1, :], _B[2, :], _B[3, :]], fs=Ambi_A.fs) if filter_coeffs is not None: b0_d, a0_d, b1_d, a1_d = pickle.load(open(filter_coeffs, "rb")) Ambi_B.W = signal.lfilter(b0_d, a0_d, Ambi_B.W) Ambi_B.X = signal.lfilter(b1_d, a1_d, Ambi_B.X) Ambi_B.Y = signal.lfilter(b1_d, a1_d, Ambi_B.Y) Ambi_B.Z = signal.lfilter(b1_d, a1_d, Ambi_B.Z) return Ambi_B
[docs]def b_to_stereo(Ambi_B): """Downmix B format first order Ambisonics to Stereo. Parameters ---------- Ambi_B : sig.AmbiBSignal B-format output signal. Returns ------- L, R : array_like """ L = Ambi_B.W + (Ambi_B.X + Ambi_B.Y) / (np.sqrt(2)) R = Ambi_B.W + (Ambi_B.X - Ambi_B.Y) / (np.sqrt(2)) return L, R
[docs]def lagrange_delay(N, delay): """Return fractional delay filter using lagrange interpolation. For best results, delay should be near N/2 +/- 1. Parameters ---------- N : int Filter order. delay : float Delay in samples. Returns ------- h : (N+1,) array_like FIR Filter. """ n = np.arange(N + 1) h = np.ones(N + 1) for k in range(N + 1): index = np.where(n != k) h[index] = h[index] * (delay - k) / (n[index] - k) return h
[docs]def frac_octave_smoothing(a, smoothing_n, WEIGHTED=True): """Fractional octave (weighted) smoothing. Parameters ---------- a : (n,) array_like Input spectrum. smoothing_n : int 1 / smoothing_n octave band. WEIGHTED : bool, optional Use (hamming) weighting on mean around center. The default is True. Returns ------- smoothed_a : (n,) np.array """ a = utils.asarray_1d(a) smooth = np.zeros_like(a) num_smpls = len(a) for idx in range(num_smpls): k_lo = np.floor(idx / (2**(1/(2*smoothing_n)))) k_hi = np.clip(np.ceil(idx * (2**(1/(2*smoothing_n)))), 1, num_smpls) if WEIGHTED: win = np.hamming(k_hi-k_lo) # hamming is good because never 0 else: win = np.ones(int(k_hi-k_lo)) smooth[idx] = 1/np.sum(win) * np.sum((win * a[k_lo.astype(int): k_hi.astype(int)])) return smooth
[docs]def frac_octave_filterbank(n, N_out, fs, f_low, f_high=None, mode='energy', overlap=0.5, slope_l=3): r"""Fractional octave band filterbank. Design of digital fractional-octave-band filters with energy conservation and perfect reconstruction. Parameters ---------- n : int Octave fraction, e.g. n=3 third-octave bands. N_out : int Number of non-negative frequency bins [0, fs/2]. fs : int Sampling frequency in Hz. f_low : int Center frequency of first full band in Hz. f_high : int Cutoff frequency in Hz, above which no further bands are generated. mode : 'energy' or 'amplitude' 'energy' produces -3dB at crossover, 'amplitude' -6dB. overlap : float Band overlap, should be between [0, 0.5]. slope_l : int Band transition slope, implemented as recursion order `l`. Returns ------- g : (b, N) np.ndarray Band gains for non-negative frequency bins. ff : (b, 3) np.ndarray Filter frequencies as [f_lo, f_c, f_hi]. Notes ----- This filterbank is originally designed such that the sum of gains squared sums to unity. The alternative 'amplitude' mode ensures that the gains sum directly to unity. References ---------- Antoni, J. (2010). Orthogonal-like fractional-octave-band filters. The Journal of the Acoustical Society of America, 127(2), 884–895. Examples -------- .. plot:: :context: close-figs fs = 44100 N = 2**16 gs, ff = spa.process.frac_octave_filterbank(n=1, N_out=N, fs=fs, f_low=100, f_high=8000) f = np.linspace(0, fs//2, N) fig, ax = plt.subplots(2, 1, constrained_layout=True) ax[0].semilogx(f, gs.T) ax[0].set_title('Band gains') ax[1].semilogx(f, np.sum(np.abs(gs)**2, axis=0)) ax[1].set_title(r"$\sum |g| ^ 2$") for a_idx in ax: a_idx.grid(True) a_idx.set_xlim([20, fs//2]) a_idx.set_xlabel('f in Hz') a_idx.set_ylabel('Amplitude') """ # fft bins N = (N_out - 1) * 2 # frequency axis freq = np.fft.rfftfreq(N, d=1. / fs) f_alias = fs // 2 if f_high is None: f_high = f_alias else: f_high = np.min([f_high, f_alias]) assert (overlap <= 0.5) # center frequencies f_c = [] # first is f_low f_c.append(f_low) # check next cutoff frequency while (f_c[-1] * (2 ** (1 / (2 * n)))) < f_high: f_c.append(2 ** (1 / n) * f_c[-1]) f_c = np.array(f_c) # cut-off freqs f_lo = f_c / (2 ** (1 / (2 * n))) f_hi = f_c * (2 ** (1 / (2 * n))) # convert w_s = 2 * np.pi * fs # w_m w_c = 2 * np.pi * f_c # w_1 w_lo = 2 * np.pi * f_lo # w_1+1 w_hi = 2 * np.pi * f_hi # DFT line that corresponds to the lower bandedge frequency k_i = np.floor(N * w_lo / w_s).astype(int) # DFT bins in the frequency band N_i = np.diff(k_i) # band overlap (twice) P = np.round(overlap * (N * (w_c - w_lo) / w_s)).astype(int) g = np.ones([len(f_c) + 1, len(freq)]) for b_idx in range(len(f_c)): p = np.arange(-P[b_idx], P[b_idx] + 1) # phi within [-1, 1] phi = (p / P[b_idx]) phi[np.isnan(phi)] = 1. # recursion eq. 20 for l_i in range(slope_l): phi = np.sin(np.pi / 2 * phi) # shift phi to [0, 1] phi = 0.5 * (phi + 1) a = np.sin(np.pi / 2 * phi) b = np.cos(np.pi / 2 * phi) # Hi g[b_idx, k_i[b_idx] - P[b_idx]: k_i[b_idx] + P[b_idx] + 1] = b g[b_idx, k_i[b_idx] + P[b_idx]:] = 0. # Lo g[b_idx + 1, k_i[b_idx] - P[b_idx]: k_i[b_idx] + P[b_idx] + 1] = a g[b_idx + 1, : k_i[b_idx] - P[b_idx]] = 0. if mode in ['energy']: g = g elif mode in ['amplitude', 'pressure']: # This is not part of Antony (2010), see 'notes' g = g**2 else: raise ValueError("Mode not implemented: " + mode) # Corresponding frequency limits ff = np.c_[f_lo, f_c, f_hi] # last band ff[-1, -1] = fs / 2 ff[-1, 1] = np.sqrt(ff[-1, 0] * ff[-1, -1]) # first band ff = np.vstack([np.array([0, np.sqrt(1 * ff[0, 0]), ff[0, 0]]), ff]) return g, ff
[docs]def subband_levels(x, width, fs, power=False, axis=-1): """Compute the level/power in each subband of subband signals.""" N = x.shape[1] if power is False: # normalization wrt bandwidth/sampling interval L = np.sqrt(1 / width * fs / 2 * np.sum(np.abs(x) ** 2, axis=axis)) else: L = 1 / N * 1 / width * fs / 2 * np.sum(np.abs(x) ** 2, axis=axis) return L
[docs]def energy_decay(p): """Energy decay curve (EDC) in dB by Schroeder backwards integration. Parameters ---------- p : array_like Returns ------- rd : array_like """ a = np.trapz(p**2) b = np.cumsum(p[::-1]**2)[::-1] return 10 * np.log10(b / a)
[docs]def half_sided_Hann(N): """Design half-sided Hann tapering window of order N (>=3).""" assert (N >= 3) w_full = signal.hann(2 * ((N + 1) // 2) + 1) # get half sided window w_taper = np.ones(N + 1) w_taper[-((N - 1) // 2):] = w_full[-((N + 1) // 2):-1] return w_taper
[docs]def gain_clipping(gain, threshold): """Limit gain factor by soft clipping function. Limits gain factor to +6dB beyond threshold point. (Pass values as factors/ratios, not dB!) Parameters ---------- gain : array_like threshold : float Returns ------- gain_clipped : array_like Examples -------- .. plot:: :context: close-figs x = np.linspace(-10, 10, 1000) lim_threshold = 2.5 y = spa.process.gain_clipping(x, lim_threshold) plt.figure() plt.plot(x, x, '--', label='In') plt.plot(x, y, label='Out') plt.legend() plt.xlabel('In') plt.ylabel('Out') plt.grid(True) """ gain = gain / threshold # offset by threshold gain[gain > 1] = 1 + np.tanh(gain[gain > 1] - 1) # soft clipping to 2 return gain * threshold
[docs]def pulsed_noise(t_noise, t_pause, fs, reps=10, t_fade=0.02, pink_noise=True, normalize=True): """Pulsed noise train, pink or white. Parameters ---------- t_noise : float t in s for pulse. t_pause : float t in s between pulses. fs : int Sampling frequency. reps : int, optional Repetitions (independent). The default is 10. t_fade : float, optional t in s for fade in and out. The default is 0.02. pink_noise : bool, optional Use 'pink' (1/f) noise. The default is True normalize : bool, optional Normalize output. The default is True. Returns ------- s_out : array_like output signal. """ s_out = [] for _ in range(reps): s_noise = np.random.randn(int(fs*t_noise)) if pink_noise: X = np.fft.rfft(s_noise) nbins = len(X) # divide by sqrt(n), power spectrum X_pink = X / np.sqrt(np.arange(nbins)+1) s_noise = np.fft.irfft(X_pink) s_pause = np.zeros(int(fs*t_noise)) # fades mask_n = int(fs*t_fade) mask_in = np.sin(np.linspace(0, np.pi/2, mask_n))**2 mask_out = np.cos(np.linspace(0, np.pi/2, mask_n))**2 # apply s_noise[:mask_n] *= mask_in s_noise[-mask_n:] *= mask_out s_out = np.r_[s_out, s_noise, s_pause] if normalize: s_out /= np.max(abs(s_out)) return s_out