Source code for spaudiopy.sig

# -*- coding: utf-8 -*-
""" Signal class.
Avoid code duplications (and errors) by defining a few custom classes here.

.. plot::
    :context: reset

    import numpy as np
    import matplotlib.pyplot as plt
    plt.rcParams['axes.grid'] = True

    import spaudiopy as spa


import os
import copy
from warnings import warn

import numpy as np
from scipy import signal as scysig
import soundfile as sf
    import sounddevice as sd
except (ImportError, OSError) as e:
    warn("Sounddevice not available.")

from . import io, utils, sph
from . import process as pcs

[docs]class MonoSignal: """Signal class for a MONO channel audio signal."""
[docs] def __init__(self, signal, fs): """Constructor. Parameters ---------- signal : array_like fs : int """ self.signal = utils.asarray_1d(signal) self.fs = fs
def __len__(self): """Override len().""" return len(self.signal) def __getitem__(self, key): """Enable [] operator, returns signal data.""" return self.signal[key]
[docs] @classmethod def from_file(cls, filename, fs=None): """Alternative constructor, load signal from filename.""" sig, fs_file = if fs is not None: if fs != fs_file: raise ValueError("File: Found different fs:" + str(fs_file)) else: fs = fs_file if sig.ndim != 1: raise ValueError("Signal must be mono. Try MultiSignal.") return cls(sig, fs)
[docs] def copy(self): """Return an independent (deep) copy of the instance.""" return copy.deepcopy(self)
[docs] def save(self, filename, subtype='FLOAT'): """Save to file.""" io.save_audio(self, os.path.expanduser(filename), subtype=subtype)
[docs] def trim(self, start, stop): """Trim audio to start and stop in seconds.""" assert start < len(self) / self.fs, "Trim start exceeds signal." self.signal = self.signal[int(start * self.fs): int(stop * self.fs)]
[docs] def apply(self, func, *args, **kwargs): """Apply function 'func' to signal, arguments are forwarded.""" self.signal = func(*args, **kwargs)
[docs] def conv(self, h, **kwargs): """Convolve signal, kwargs are forwarded to signal.convolve.""" h = utils.asarray_1d(h) self.signal = scysig.convolve(self.signal, h, **kwargs) return self
[docs] def resample(self, fs_new): """Resample signal to new sampling rate fs_new.""" if fs_new == self.fs: warn("Same sampling rate requested, no resampling.") else: sig_resamp = pcs.resample_signal(self.signal, self.fs, fs_new) self.__init__(sig_resamp, fs_new)
[docs] def play(self, gain=1, wait=True): """Play sound signal. Adjust gain and wait until finished.""" * self.signal, int(self.fs)) if wait: sd.wait()
[docs]class MultiSignal(MonoSignal): """Signal class for a MULTI channel audio signal."""
[docs] def __init__(self, signals, fs=None): """Constructor. Parameters ---------- signals : list of array_like fs : int """ assert isinstance(signals, (list, tuple)) = [] if fs is None: raise ValueError("Provide fs (as kwarg).") else: self.fs = fs for s in signals:, fs)) self.channel_count = len(
def __len__(self): """Override len().""" return len([0]) def __getitem__(self, key): """Override [] operator, returns signal channel.""" return[key]
[docs] @classmethod def from_file(cls, filename, fs=None): """Alternative constructor, load signal from filename.""" sig, fs_file = if fs is not None: if fs != fs_file: raise ValueError("File: Found different fs:" + str(fs_file)) else: fs = fs_file if np.ndim(sig) == 1: raise ValueError("Only one channel. Try MonoSignal.") return cls([*sig.T], fs=fs)
[docs] def get_signals(self): """Return ndarray of signals, stacked along rows (nCH, nSmps).""" return np.asarray([x.signal for x in])
[docs] def trim(self, start, stop): """Trim all channels to start and stop in seconds.""" assert start < len(self) / self.fs, "Trim start exceeds signal." for c in c.signal = c.signal[int(start * c.fs): int(stop * c.fs)]
[docs] def apply(self, func, *args, **kwargs): """Apply function 'func' to all signals, arguments are forwarded.""" for c in c.signal = func(*args, **kwargs)
[docs] def conv(self, irs, **kwargs): for c, h in zip(, irs): h = utils.asarray_1d(h) c.signal = scysig.convolve(c, h, **kwargs) return self
[docs] def resample(self, fs_new): """Resample signal to new sampling rate fs_new.""" if fs_new == self.fs: warn("Same sampling rate requested, no resampling.") else: sig_resamp = pcs.resample_signal(self.get_signals(), self.fs, fs_new, axis=-1) self.__init__([*sig_resamp], fs_new)
[docs] def play(self, gain=1, wait=True): """Play sound signal. Adjust gain and wait until finished.""" * self.get_signals().T, int(self.fs)) if wait: sd.wait()
[docs]class AmbiBSignal(MultiSignal): """Signal class for first order Ambisonics B-format signals."""
[docs] def __init__(self, signals, fs=None): MultiSignal.__init__(self, signals, fs=fs) assert self.channel_count == 4, "Provide four channels!" self.W = utils.asarray_1d([0].signal) self.X = utils.asarray_1d([1].signal) self.Y = utils.asarray_1d([2].signal) self.Z = utils.asarray_1d([3].signal)
[docs] @classmethod def from_file(cls, filename, fs=None): """Alternative constructor, load signal from filename.""" return super().from_file(filename, fs=fs)
[docs] @classmethod def sh_to_b(cls, multisig): """Alternative constructor, convert from sig.Multisignal. Assumes ACN channel order. """ assert isinstance(multisig, MultiSignal) _B = sph.sh_to_b(multisig.copy().get_signals()) return cls([*_B], fs=multisig.fs)
[docs]class HRIRs: """Signal class for head-related impulse responses."""
[docs] def __init__(self, left, right, azi, zen, fs): """Constructor. Parameters ---------- left : (numDirs, numTaps) ndarray Left ear HRIRs. right : (numDirs, numTaps) ndarray Right ear HRIRs. azi : (numDirs,) array_like, in rad fs : int """ left = np.asarray(left) right = np.asarray(right) assert len(left) == len(right), "Signals must be of same length." assert left.ndim == 2 assert right.ndim == 2 azi = utils.asarray_1d(azi) zen = utils.asarray_1d(zen) assert len(azi) == len(zen) assert len(azi) == left.shape[0] self.left = left self.right = right self.azi = azi self.zen = zen self.fs = fs self.num_grid_points = len(azi) self.num_samples = left.shape[1]
def __len__(self): """Override len() to count of samples per hrir.""" return self.left.shape[1] def __getitem__(self, key): """Enable [] operator, returns hrirs.""" return self.left[key, :], self.right[key, :]
[docs] def copy(self): """Return an independent (deep) copy of the instance.""" return copy.deepcopy(self)
[docs] def update_hrirs(self, left, right): """Update and replace HRIRs in place. Parameters ---------- left : (numDirs, numTaps) ndarray Left ear HRIRs. right : numDirs, numTaps ndarray Right ear HRIRs. Returns ------- None. """ # reinitialize self.__init__(left, right, self.azi, self.zen, self.fs)
[docs] def nearest_hrirs(self, azi, zen): """For a point on the sphere, select closest HRIR defined on grid. Based on the haversine distance. Parameters ---------- azi : float Azimuth. zen : float Zenith / Colatitude. Returns ------- h_l : (n,) array_like h(t) closest to [phi, theta]. h_r : (n,) array_like h(t) closest to [phi, theta]. """ # search closest gridpoint d_idx = self.nearest_idx(azi, zen) # get hrirs to that angle return self[d_idx]
[docs] def nearest_idx(self, azi, zen): """ Index of nearest HRIR grid point based on dot product. Parameters ---------- azi : float, array_like Azimuth. zen : float, array_like Zenith / Colatitude. Returns ------- idx : int, np.ndarray Index. """ azi = utils.asarray_1d(azi) zen = utils.asarray_1d(zen) vec = np.stack(utils.sph2cart(azi, zen), axis=1) vec_g = np.stack(utils.sph2cart(self.azi, self.zen), axis=1) return np.argmax(vec@vec_g.T, axis=1).squeeze()
[docs] def apply_ctf_eq(self, eq_taps=None, mode='full'): """ Equalize common transfer function (CTF) of HRIRs. Parameters ---------- eq_taps : array_like, optional FIR filter, `None` will calculate. The default is None. mode : string, optional Forwarded to scipy.signal.convolve(). The default is 'full'. Returns ------- None. """ if eq_taps is None: eq_taps = pcs.hrirs_ctf(self) self.left = scysig.convolve(self.left, eq_taps[None, :], mode) self.right = scysig.convolve(self.right, eq_taps[None, :], mode) self.num_samples = self.left.shape[1]
[docs]def trim_audio(A, start, stop): """Trim copy of MultiSignal audio to start and stop in seconds.""" B = copy.deepcopy(A) assert start < len(B) / B.fs, 'Trim start exceeds signal.' for c in range(B.channel_count):[c].signal =[c].signal[ int(start * B.fs): int(stop * B.fs)] return B