Source code for spaudiopy.io

# -*- coding: utf-8 -*-
"""Input Output (IO) helpers.

.. plot::
    :context: reset

    import numpy as np
    import matplotlib.pyplot as plt
    plt.rcParams['axes.grid'] = True

    import spaudiopy as spa

"""

import os
from warnings import warn
import multiprocessing
import json
from datetime import datetime

import numpy as np
from scipy.io import loadmat, savemat
import h5py

import soundfile as sf

from . import utils, sig, decoder, grids, sph, process, parsa, __version__


[docs]def load_audio(filenames, fs=None): """Load mono and multichannel audio from files. Parameters ---------- filenames : string or list of strings Audio files. Returns ------- sig : sig.MonoSignal or sig.MultiSignal Audio signal. """ loaded_data = [] loaded_fs = [] # pack in list if only a single string if not isinstance(filenames, (list, tuple)): filenames = [filenames] for file in filenames: data, fs_file = sf.read(os.path.expanduser(file)) if data.ndim != 1: # detect and split interleaved wav for c in data.T: loaded_data.append(c) else: loaded_data.append(data) loaded_fs.append(fs_file) # Assert same sample rate for all channels assert all(x == loaded_fs[0] for x in loaded_fs) # Check against provided samplerate if fs is not None: if fs != loaded_fs[0]: raise ValueError("File: Found different fs:" + str(loaded_fs[0])) else: fs = loaded_fs[0] # MonoSignal or MultiSignal if len(loaded_data) == 1: return sig.MonoSignal(loaded_data, fs=fs) else: return sig.MultiSignal([*loaded_data], fs=fs)
[docs]def save_audio(signal, filename, fs=None, subtype='FLOAT'): """Save signal to audio file. Parameters ---------- signal : sig. MonoSignal, sig.MultiSignal or np.ndarray Audio Signal, forwarded to sf.write(); (frames x channels). filename : string Audio file name. fs : int fs(t). subtype : optional """ if isinstance(sig, sig.MonoSignal): if fs is not None: assert (signal.fs == fs) if type(signal) == sig.MonoSignal: data = signal.signal data_fs = signal.fs elif type(signal) in (sig.MultiSignal, sig.AmbiBSignal): data = signal.get_signals().T data_fs = signal.fs elif isinstance(signal, (np.ndarray, np.generic)): if fs is None: raise ValueError("Needs fs for generic audio data!") data = signal data_fs = fs else: raise NotImplementedError('Data type not supported.') data = np.asarray(data) if (data.ndim > 1): assert (data.ndim == 2) if (data.shape[1] > data.shape[0]): warn(f"Writing file with {data.shape[1]} channels") if (np.max(np.abs(data)) > 1.0): warn(f"Audio clipped! ({np.max(np.abs(data)):.2f})") sf.write(os.path.expanduser(filename), data, data_fs, subtype=subtype)
[docs]def load_hrirs(fs, filename=None, jobs_count=None): """Convenience function to load 'HRIRs.mat'. The file contains ['hrir_l', 'hrir_r', 'fs', 'azi', 'zen']. Parameters ---------- fs : int fs(t). filename : string, optional HRTF.mat file or default set, or 'dummy' for debugging. jobs_count : int or None, optional Number of parallel jobs for resample_hrirs() in get_default_hrirs(), 'None' employs 'cpu_count'. Returns ------- HRIRs : sig.HRIRs instance left : (g, h) numpy.ndarray h(t) for grid position g. right : (g, h) numpy.ndarray h(t) for grid position g. azi : (g,) array_like grid azimuth. zen : (g,) array_like grid zenith / colatitude. fs : int fs(t). """ if filename == 'dummy': azi, zen, _ = grids.gauss(15) # Create diracs as dummy hrir_l = np.zeros([len(azi), 256]) hrir_r = np.zeros_like(hrir_l) hrir_fs = fs # apply ILD / ITD a_l = 0.5*(1 + np.cos(azi - np.pi/2)) * np.sin(zen) a_r = 0.5*(1 + np.cos(azi + np.pi/2)) * np.sin(zen) hrir_l[np.arange(len(azi)), (a_r * 0.75e-3 * fs + 10).astype(int)] = 1. hrir_r[np.arange(len(azi)), (a_l * 0.75e-3 * fs + 10).astype(int)] = 1. hrir_l *= a_l[:, np.newaxis] + 0.1 hrir_r *= a_r[:, np.newaxis] + 0.1 hrir_l[np.arange(len(azi)), (a_l * 5).astype(int)] = hrir_l[:, 0] hrir_r[np.arange(len(azi)), (a_r * 5).astype(int)] = hrir_r[:, 0] elif filename is None: # default if fs not in [44100, 48000, 96000]: raise NotImplementedError('44100, 48000, 96000' ' default available.') default_file = '../data/HRIRs/' + 'HRIRs_default_' + str(fs) + '.mat' current_file_dir = os.path.dirname(__file__) filename = os.path.join(current_file_dir, default_file) try: mat = loadmat(filename) except FileNotFoundError: warn("No default hrirs. Generating them...") get_default_hrirs(jobs_count=jobs_count) mat = loadmat(filename) else: mat = loadmat(os.path.expanduser(filename)) if not filename == 'dummy': hrir_l = np.array(np.squeeze(mat['hrir_l']), dtype=float) hrir_r = np.array(np.squeeze(mat['hrir_r']), dtype=float) try: hrir_fs = int(mat['fs']) except KeyError: hrir_fs = int(mat['SamplingRate']) azi = np.array(np.squeeze(mat['azi']), dtype=float) zen = np.array(np.squeeze(mat['zen']), dtype=float) HRIRs = sig.HRIRs(hrir_l, hrir_r, azi, zen, hrir_fs) assert HRIRs.fs == fs return HRIRs
[docs]def get_default_hrirs(grid_azi=None, grid_zen=None, jobs_count=None): """Creates the default HRIRs loaded by load_hrirs() by inverse SHT. By default it renders onto a gauss grid of order N=35, and additionally resamples fs to 48kHz. Parameters ---------- grid_azi : array_like, optional grid_zen : array_like, optional jobs_count : int or None, optional Number of parallel jobs for resample_hrirs(), 'None' employs 'cpu_count'. Notes ----- HRTFs in SH domain obtained from http://dx.doi.org/10.14279/depositonce-5718.5 """ default_file = '../data/HRIRs/FABIAN/' \ 'SphericalHarmonics/FABIAN_DIR_measured_HATO_0.mat' current_file_dir = os.path.dirname(__file__) filename = os.path.join(current_file_dir, default_file) # Load HRTF try: file = loadmat(filename) except FileNotFoundError: import requests, zipfile, io print("Downloading from https://depositonce.tu-berlin.de/handle/" "11303/6153.5 ...") r = requests.get('https://depositonce.tu-berlin.de/bitstream/11303/' '6153.5/9/HRIRs_neutral_head_orientation_v4.zip') with zipfile.ZipFile(io.BytesIO(r.content)) as zip_ref: zip_ref.extractall(os.path.join(current_file_dir, '../data/HRIRs/FABIAN/')) file = loadmat(filename) # CTF already compensated in DIR # Extracting the data is a bit ugly here... SamplingRate = int(file['SamplingRate']) SH_l = file['SH'][0][0][0] SH_r = file['SH'][0][0][1] f = np.squeeze(file['SH'][0][0][5]) # default grid: if (grid_azi is None) and (grid_zen is None): grid_azi, grid_zen, _ = grids.gauss(35) # grid positions # Inverse SHT HRTF_l = sph.inverse_sht(SH_l, grid_azi, grid_zen, 'complex') HRTF_r = sph.inverse_sht(SH_r, grid_azi, grid_zen, 'complex') assert HRTF_l.shape == HRTF_r.shape # ifft hrir_l = np.fft.irfft(HRTF_l) # creates 256 samples(t) hrir_r = np.fft.irfft(HRTF_r) # creates 256 samples(t) assert hrir_l.shape == hrir_r.shape # Resample fs_target = 48000 hrir_l_48k, hrir_r_48k, _ = process.resample_hrirs(hrir_l, hrir_r, SamplingRate, fs_target, jobs_count=jobs_count) fs_target = 96000 hrir_l_96k, hrir_r_96k, _ = process.resample_hrirs(hrir_l, hrir_r, SamplingRate, fs_target, jobs_count=jobs_count) savemat(os.path.join(current_file_dir, '../data/HRIRs/' 'HRIRs_default_44100.mat'), {'hrir_l': hrir_l, 'hrir_r': hrir_r, 'azi': grid_azi, 'zen': grid_zen, 'fs': 44100}) savemat(os.path.join(current_file_dir, '../data/HRIRs/' 'HRIRs_default_48000.mat'), {'hrir_l': hrir_l_48k, 'hrir_r': hrir_r_48k, 'azi': grid_azi, 'zen': grid_zen, 'fs': 48000}) savemat(os.path.join(current_file_dir, '../data/HRIRs/' 'HRIRs_default_96000.mat'), {'hrir_l': hrir_l_96k, 'hrir_r': hrir_r_96k, 'azi': grid_azi, 'zen': grid_zen, 'fs': 96000}) print("Saved new default HRIRs.")
[docs]def load_sofa_data(filename): """Load .sofa file into python dictionary that contains the data in numpy arrays.""" with h5py.File(os.path.expanduser(filename), 'r') as f: out_dict = {} for key, value in f.items(): out_dict[key] = np.squeeze(value) return out_dict
[docs]def load_sofa_hrirs(filename): """ Load SOFA file containing HRIRs. Parameters ---------- filename : string SOFA filepath. Returns ------- HRIRs : sig.HRIRs instance left : (g, h) numpy.ndarray h(t) for grid position g. right : (g, h) numpy.ndarray h(t) for grid position g. azi : (g,) array_like grid azimuth. zen : (g,) array_like grid zenith / colatitude. fs : int fs(t). """ sdata = load_sofa_data(os.path.expanduser(filename)) fs = int(sdata['Data.SamplingRate']) irs = np.asarray(sdata['Data.IR']) grid = np.asarray(sdata['SourcePosition']) assert (abs((grid[:, 2]-grid[:, 2].mean())).mean() < 0.1) # Otherwise not r grid_azi, grid_zen = np.deg2rad(grid[:, 0]), np.pi/2 - np.deg2rad( grid[:, 1]) assert (all(grid_zen > -10e-6)) # Otherwise not zen irs_left = np.squeeze(irs[:, 0, :]) irs_right = np.squeeze(irs[:, 1, :]) HRIRs = sig.HRIRs(irs_left, irs_right, grid_azi, grid_zen, fs) return HRIRs
[docs]def sofa_to_sh(filename, N_sph, sh_type='real'): """Load and transform SOFA IRs to the Spherical Harmonic Domain. Parameters ---------- filename : string SOFA file name. N_sph : int Spherical Harmonic Transform order. sh_type : 'real' (default) or 'complex' spherical harmonics. Returns ------- IRs_nm : (2, (N_sph+1)**2, S) numpy.ndarray Left and right (stacked) SH coefficients. fs : int """ hrirs = load_sofa_hrirs(filename) fs = hrirs.fs grid_azi, grid_zen = hrirs.azi, hrirs.zen # Pinv / lstsq since we can't be sure about the grid Y_pinv = np.linalg.pinv(sph.sh_matrix(N_sph, grid_azi, grid_zen, sh_type)) irs = np.stack((hrirs.left, hrirs.right), axis=0) IRs_nm = Y_pinv @ irs return IRs_nm, fs
[docs]def load_sdm(filename, init_nan=True): """Convenience function to load 'SDM.mat'. The file contains ['h_ref' or 'p', 'sdm_azi' or 'sdm_phi', 'sdm_zen' or 'sdm_theta', 'fs']. Parameters ---------- filename : string SDM.mat file init_nan : bool, optional Initialize nan to [0, pi/2]. Returns ------- h : (n,) array_like p(t). sdm_azi : (n,) array_like Azimuth angle. sdm_zen : (n,) array_like Colatitude angle. fs : int fs(t). """ mat = loadmat(os.path.expanduser(filename)) try: h = np.array(np.squeeze(mat['h_ref']), dtype=float) except KeyError: h = np.array(np.squeeze(mat['p']), dtype=float) try: sdm_azi = np.array(np.squeeze(mat['sdm_azi']), dtype=float) except KeyError: sdm_azi = np.array(np.squeeze(mat['sdm_phi']), dtype=float) try: sdm_zen = np.array(np.squeeze(mat['sdm_zen']), dtype=float) except KeyError: sdm_zen = np.array(np.squeeze(mat['sdm_theta']), dtype=float) if init_nan: sdm_azi[np.isnan(sdm_azi)] = 0. sdm_zen[np.isnan(sdm_zen)] = np.pi / 2 fs = int(mat['fs']) return h, sdm_azi, sdm_zen, fs
[docs]def write_ssr_brirs_loudspeaker(filename, ls_irs, hull, fs, subtype='FLOAT', hrirs=None, jobs_count=1): """Write binaural room impulse responses (BRIRs) and save as wav file. The azimuth resolution is one degree. The channels are interleaved and directly compatible to the SoundScape Renderer (SSR) ssr-brs. Parameters ---------- filename : string ls_irs : (L, S) np.ndarray Impulse responses of L loudspeakers, e.g. by hull.loudspeaker_signals(). hull : decoder.LoudspeakerSetup fs : int subtype : forwarded to sf.write(), optional hrirs : sig.HRIRs, optional jobs_count : int, optional [CPU Cores], Number of Processes, switches implementation for n > 1. """ if hrirs is None: hrirs = load_hrirs(fs=fs) assert (hrirs.fs == fs) if jobs_count is None: jobs_count = multiprocessing.cpu_count() if not filename[-4:] == '.wav': filename = filename + '.wav' ssr_brirs = np.zeros((720, ls_irs.shape[1] + len(hrirs) - 1)) if jobs_count == 1: for angle in range(0, 360): ir_l, ir_r = hull.binauralize(ls_irs, fs, orientation=(np.deg2rad(angle), 0), hrirs=hrirs) # left ssr_brirs[2 * angle, :] = ir_l # right ssr_brirs[2 * angle + 1, :] = ir_r elif jobs_count > 1: with multiprocessing.Pool(processes=jobs_count) as pool: results = pool.starmap(hull.binauralize, map(lambda a: (ls_irs, fs, (np.deg2rad(a), 0), hrirs), range(0, 360))) # extract ir_l = [ir[0] for ir in results] ir_r = [ir[1] for ir in results] for angle in range(0, 360): # left ssr_brirs[2 * angle, :] = ir_l[angle] # right ssr_brirs[2 * angle + 1, :] = ir_r[angle] # normalize if np.max(np.abs(ssr_brirs)) > 1: warn('Normalizing BRIRs') ssr_brirs = ssr_brirs / np.max(np.abs(ssr_brirs)) # write to file save_audio(ssr_brirs.T, filename, fs, subtype=subtype)
[docs]def write_ssr_brirs_sdm(filename, sdm_p, sdm_phi, sdm_theta, fs, subtype='FLOAT', hrirs=None): """Write binaural room impulse responses (BRIRs) and save as wav file. The azimuth resolution is one degree. The channels are interleaved and directly compatible to the SoundScape Renderer (SSR) ssr-brs. Parameters ---------- filename : string sdm_p : (n,) array_like Pressure p(t). sdm_phi : (n,) array_like Azimuth phi(t). sdm_theta : (n,) array_like Colatitude theta(t). fs : int subtype : forwarded to sf.write(), optional hrirs : sig.HRIRs, optional """ if hrirs is None: hrirs = load_hrirs(fs=fs) assert (hrirs.fs == fs) if not filename[-4:] == '.wav': filename = filename + '.wav' ssr_brirs = np.zeros((720, len(sdm_p) + len(hrirs) - 1)) for angle in range(0, 360): sdm_phi_rot = sdm_phi - np.deg2rad(angle) ir_l, ir_r = parsa.render_bsdm(sdm_p, sdm_phi_rot, sdm_theta, hrirs=hrirs) # left ssr_brirs[2 * angle, :] = ir_l # right ssr_brirs[2 * angle + 1, :] = ir_r # normalize if np.max(np.abs(ssr_brirs)) > 1: warn('Normalizing BRIRs') ssr_brirs = ssr_brirs / np.max(np.abs(ssr_brirs)) # write to file save_audio(ssr_brirs.T, filename, fs, subtype=subtype)
[docs]def load_layout(filename, listener_position=None, N_kernel=50): """Load loudspeaker layout from json configuration file.""" with open(os.path.expanduser(filename), 'r') as f: in_data = json.load(f) layout = in_data['LoudspeakerLayout'] ls_data = layout['Loudspeakers'] azi = np.array([ls['Azimuth'] for ls in ls_data]) ele = np.array([ls['Elevation'] for ls in ls_data]) if np.any(ele < -90) or np.any(ele > +90): warn("Elevation out of bounds! (+-90)") r = np.array([ls['Radius'] for ls in ls_data]) try: # not actually used, yet ls_gains = np.array([ls['Gain'] for ls in ls_data]) except KeyError as e: warn('KeyError : {}, will return unit gain!'.format(e)) ls_gains = np.ones_like(azi) try: isImaginary = np.array([ls['IsImaginary'] for ls in ls_data]) except KeyError as e: warn('KeyError : {}, will return all False!'.format(e)) isImaginary = np.full_like(azi, False, dtype=bool) # first extract real loudspeakers ls_x, ls_y, ls_z = utils.sph2cart(utils.deg2rad(azi[~isImaginary]), utils.deg2rad(90-ele[~isImaginary]), r[~isImaginary]) ls_layout = decoder.LoudspeakerSetup(ls_x, ls_y, ls_z, listener_position=listener_position) ls_layout.ls_gains = ls_gains # then add imaginary loudspeakers to ambisonics setup imag_x, imag_y, imag_z = utils.sph2cart(utils.deg2rad(azi[isImaginary]), utils.deg2rad(90-ele[isImaginary]), r[isImaginary]) imag_pos = np.c_[imag_x, imag_y, imag_z] ls_layout.ambisonics_setup(N_kernel=N_kernel, update_hull=True, imaginary_ls=imag_pos) return ls_layout
[docs]def save_layout(filename, ls_layout, name='unknown', description='unknown'): """Save loudspeaker layout to json configuration file.""" if not ls_layout.ambisonics_hull: raise ValueError("No ambisonics_hull.") out_data = {} out_data['Name'] = name out_data['Description'] = 'This configuration file was created with ' +\ 'spaudiopy (v-' + str(__version__) + '), ' + \ str(datetime.now()) out_data['LoudspeakerLayout'] = {} out_data['LoudspeakerLayout']['Name'] = name out_data['LoudspeakerLayout']['Description'] = description out_data['LoudspeakerLayout']['Loudspeakers'] = [] for ls_idx in range(ls_layout.ambisonics_hull.npoints): ls_dirs = utils.cart2sph(ls_layout.ambisonics_hull.x[ls_idx], ls_layout.ambisonics_hull.y[ls_idx], ls_layout.ambisonics_hull.z[ls_idx]) ls_dict = {} ls_dict['Azimuth'] = round(float(utils.rad2deg(ls_dirs[0])), 2) ls_dict['Elevation'] = round(float(90 - utils.rad2deg(ls_dirs[1])), 2) ls_dict['Radius'] = round(float(ls_dirs[2]), 2) ls_dict['IsImaginary'] = ls_idx in np.asarray( ls_layout.ambisonics_hull.imaginary_ls_idx) ls_dict['Channel'] = ls_idx + 1 ls_dict['Gain'] = 0. if ls_idx in np.asarray( ls_layout.ambisonics_hull.imaginary_ls_idx) else \ ls_layout.ls_gains[ls_idx] out_data['LoudspeakerLayout']['Loudspeakers'].append(ls_dict) with open(os.path.expanduser(filename), 'w') as outfile: json.dump(out_data, outfile, indent=4)