"""
audiosample
~~~~~~~~~~~
This module is based on the :class:`~maelzel.snd.audiosample.Sample` class,
which contains the audio of a soundfile as a numpy array and it aware of its sr,
original format and encoding, etc. It can also perform simple actions
(fade-in/out, cut, insert, reverse, normalize, etc) on its own audio
destructively or return a new Sample. It implements most math operations
valid for audio data (``+``, ``-``, ``*``, ``/``)
.. note::
All operations are samplerate-aware: any operation involving
multiple :class:`Sample` instances will broadcast these to the highest samplerate used
Examples
~~~~~~~~
.. code-block:: python
# load a Sample, fade it, play and write
from maelzel.snd.audiosample import *
s = Sample("snd/Numbers_EnglishFemale.flac")
s.fade(0.5)
s.play(speed=0.5, block=True)
# Plot a 6 second fragment startine at time=1
s[1:7].plotSpectrogram(fftsize=4096, overlap=8, mindb=-100, maxfreq=8000)
.. image:: assets/audiosample-plot-spectrogram.png
.. code-block:: python
samples = [Sample("soundA.wav"),
Sample("soundB.aif"),
Sample("soundC.flac")]
a, b, c = broadcastSamplerate(samples)
# mix them down
out = a.prependSilence(2) + b + c
out.write("mixed.wav")
"""
from __future__ import annotations
import abc
import numpy as np
import os
import pitchtools as pt
from maelzel import _util
from maelzel.snd import numpysnd as _npsnd
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import bpf4
import sounddevice
import csoundengine
import csoundengine.synth
from typing import Iterator, Sequence
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from maelzel.partialtracking import spectrum as _spectrum
from maelzel.transcribe import mono
from typing_extensions import Self
__all__ = (
'Sample',
)
config = {
'reprhtml_include_audiotag': True,
'reprhtml_audiotag_maxduration_seconds': 600,
'reprhtml_audiotag_width': '100%',
'reprhtml_audiotag_maxwidth': '1200px',
'reprhtml_audiotag_embed_maxduration_seconds': 8,
'reprhtml_audio_format': 'mp3',
'csoundengine': 'maelzel.snd',
}
class PlaybackStream(abc.ABC):
"""
A class to abstract the playback engine
"""
@abc.abstractmethod
def stop(self) -> None:
"""
Stop this stream
"""
raise NotImplementedError
def active(self) -> bool:
"""
True if playback is active
"""
raise NotImplementedError
def _repr_html_(self) -> str:
from maelzel.core import jupytertools
jupytertools.displayButton("Stop", self.stop)
return repr(self)
class _PortaudioPlayback(PlaybackStream):
"""
Portaudio based playback for audiosamples (based on sounddevice)
"""
def __init__(self, stream: sounddevice.OutputStream):
self.stream = stream
def active(self) -> bool:
return self.stream.active
def stop(self):
self.stream.stop()
class _CsoundenginePlayback(PlaybackStream):
"""
Csoundengine playback for audiosamples
"""
def __init__(self, synth: csoundengine.synth.Synth):
self.synth = synth
def stop(self):
self.synth.stop()
def active(self) -> bool:
return self.synth.playStatus() != 'playing'
def _normalizePath(path: str) -> str:
path = os.path.expanduser(path)
return os.path.abspath(path)
def _openInEditor(soundfile: str, wait=False, app=None) -> None:
"""
Open soundfile in an external app
Args:
soundfile: the file to open
wait: if True, wait until editing is finished
app: the app to use. If None is given, a default app is used
"""
soundfile = _normalizePath(soundfile)
import emlib.misc
emlib.misc.open_with_app(soundfile, app, wait=wait, min_wait=5)
def readSoundfile(sndfile: str, start: float = 0., end: float = 0.
) -> tuple[np.ndarray, int]:
"""
Read a soundfile, returns a tuple ``(samples:np.ndarray, sr:int)``
Args:
sndfile: The path of the soundfile
start: The time to start reading. A negative value will seek from the end.
end: The time to stop reading (0=end of file). A negative value will
seek from the end
Returns:
a tuple (samples:np.ndarray, sr:int)
Example::
# Read the first two seconds
>>> samples, sr = readSoundfile("sound.flac", end=2)
# Read the last two seconds
>>> samples, sr = readSoundfile("sound.aif", start=-2)
"""
if sndfile == "?":
import emlib.dialogs
sndfile = emlib.dialogs.selectFile(directory=os.getcwd(),
filter=emlib.dialogs.filters['Sound'],
title='Select soundfile')
if not sndfile:
raise RuntimeError("No soundfile selected")
sndfilestr = _normalizePath(str(sndfile))
import sndfileio
return sndfileio.sndread(sndfilestr, start=start, end=end)
def _vampPyinAvailable() -> bool:
try:
import vamp
except ImportError:
return False
return "pyin:pyin" in vamp.list_plugins()
def _playSamples(samples: np.ndarray, sr: int, mapping: list[int], gain=1., speed=1., loop=False, block=False
) -> PlaybackStream:
import sounddevice
sr = int(sr * speed)
ctx = sounddevice._CallbackContext(loop=loop)
ctx.frames = ctx.check_data(data=samples, mapping=mapping, device=None) # type: ignore
def callback(outdata, numframes, time, status, gain=gain):
assert len(outdata) == numframes
ctx.callback_enter(status, outdata)
if gain != 1:
outdata *= gain
ctx.write_outdata(outdata)
ctx.callback_exit()
ctx.start_stream(
sounddevice.OutputStream,
samplerate=sr,
channels=ctx.output_channels,
dtype=ctx.output_dtype,
callback=callback,
blocking=block,
prime_output_buffers_using_stream_callback=False)
return _PortaudioPlayback(stream=ctx.stream)
[docs]
class Sample:
"""
A class representing audio data
Args:
sound: either the path to a soundfile or a numpy array with the samples
sr: only needed if passed an array
start: the start time (only valid when reading from a soundfile). Can be
negative, in which case the frame is sought from the end.
end: the end time (only valid when reading from a soundfile). Can be
negative, in which case the frame is sought from the end
readonly: is this Sample readonly?
engine: the sound engine (`csoundengine.Engine`) used for playback
"""
_csoundEngine: csoundengine.Engine | None = None
def __init__(self,
sound: str | np.ndarray,
sr: int = 0,
start=0.,
end=0.,
readonly=False,
engine: csoundengine.Engine | None = None):
self._csoundTable: tuple[str, int] | None = None
"""Keeps track of any table created in csound for playback"""
self._reprHtml: str = ''
"""Caches html representation"""
self._asbpf: bpf4.BpfInterface | None = None
"""Caches bpf representation"""
self._f0: bpf4.BpfInterface | None = None
self.path = ''
"""If non-empty, the audio was loaded from this path and has not changed"""
self.originalpath = ''
"""The original path from which the sample data was loaded, if applicable"""
self.readonly = readonly
if isinstance(sound, str):
samples, sr = readSoundfile(sound, start=start, end=end)
self.path = str(sound)
self.originalpath = self.path
elif isinstance(sound, np.ndarray):
assert sr
samples = sound
else:
raise TypeError(f"sound should be a path or an array of samples, got {type(sound)}")
self.samples: np.ndarray = samples
"""The actual audio samples as a numpy array. Can be multidimensional"""
self.sr: int = sr
"""The sr"""
self.numchannels = 1 if len(self.samples.shape) == 1 else self.samples.shape[1]
"""The number of channels of each frame"""
self.engine: csoundengine.Engine | None = engine
"""The sound engine used for playback"""
def __del__(self):
if not self._csoundTable:
return
enginename, tabnum = self._csoundTable
if enginename == Sample.getEngine().name:
Sample.getEngine().freeTable(tabnum)
@property
def numframes(self) -> int:
"""The number of frames"""
return len(self.samples)
@property
def duration(self) -> float:
"""The duration in seconds"""
return len(self.samples)/self.sr
def __repr__(self):
return (f"Sample(dur={self.duration}, sr={self.sr:d}, "
f"ch={self.numchannels})")
[docs]
@staticmethod
def getEngine(**kws) -> csoundengine.Engine:
"""
Returns the csound Engine used for playback, starts the Engine if needed
If no playback has been performed up to this point, a new Engine
is created. Keywords are passed directly to :class:`csoundengine.Engine`
(https://csoundengine.readthedocs.io/en/latest/api/csoundengine.engine.Engine.html#csoundengine.engine.Engine)
and will only take effect if this function is called before any
playback has been performed.
An already existing Engine can be set as the playback engine via
:meth:`Sample.setEngine`
See Also
~~~~~~~~
* :meth:`Sample.setEngine`
"""
if Sample._csoundEngine:
return Sample._csoundEngine
import csoundengine as ce
name = config['csoundengine']
engine = ce.Engine.activeEngines.get(name) or ce.Engine(name=name, **kws)
Sample._csoundEngine = engine
return engine
[docs]
@classmethod
def createSilent(cls, dur: float, channels: int, sr: int) -> Sample:
"""
Generate a silent Sample with the given characteristics
Args:
dur: the duration of the new Sample
channels: the number of channels
sr: the sample rate
Returns:
a new Sample with all samples set to 0
"""
numframes = int(dur * sr)
return cls(_silentFrames(numframes, channels), sr)
def _makeCsoundTable(self, engine: csoundengine.Engine) -> int:
if self._csoundTable:
usedengine, table = self._csoundTable
if usedengine == engine.name:
return table
else:
import maelzel.common
maelzel.common.getLogger(__file__).warning(f"Engine changed, was {usedengine}, now {engine.name}")
tabproxy = engine.session().makeTable(self.samples, sr=self.sr, block=True)
tabnum = tabproxy.tabnum
self._csoundTable = (engine.name, tabnum)
return tabnum
[docs]
def preparePlay(self, engine=None):
"""Send audio data to the audio engine (blocking)"""
if engine is None:
engine = Sample.getEngine()
self._makeCsoundTable(engine)
engine.session().prepareSched('.playSample')
def _playPortaudio(self,
loop=False,
chan=1,
gain=1.,
speed=1.,
skip=0.,
dur=0.,
block=False) -> PlaybackStream:
mapping = list(range(chan, self.numchannels + chan))
samples = self.samples
if skip:
samples = samples[int(self.sr * skip):]
if dur:
samples = samples[:int(self.sr*dur)]
return _playSamples(samples=samples, sr=self.sr, mapping=mapping, loop=loop,
block=block, gain=gain, speed=speed)
[docs]
def play(self,
loop=False,
chan: int = 1,
gain=1.,
delay=0.,
pan: float | None = None,
speed=1.0,
skip=0.,
dur=0,
engine: csoundengine.Engine | None = None,
block=False,
backend=''
) -> PlaybackStream:
"""
Play the given sample
At the moment two playback backends are available, portaudio and csound.
If no engine is given and playback is immediate (no delay), playback is
performed directly via portaudio. This has the advantage that no data
must be copied to the playback engine (which is the case when using csound)
If backend is 'csound' or a csoundengine's Engine is passed, csound
is used as playback backend. The csound backend is recommended if sync
is needed between this playback and other events.
Args:
loop: should playback be looped?
chan: first channel to play to. For stereo samples, output
is routed to consecutive channels starting with this channel
gain: a gain modifier
delay: start delay in seconds
pan: a value between 0 (left) and 1 (right). Use -1
for a default value, which is 0 for mono samples and 0.5
for stereo. For 3 or more channels pan is currently ignored
speed: the playback speed. A variation in speed will change
the pitch accordingly.
skip: start playback at a given point in time
dur: duration of playback. 0 indicates to play until the end of the sample
engine: the Engine instance to use for playback. If not given, playback
is performed via portaudio
block: if True, block execution until playback is finished
backend: one of 'portaudio', 'csound'
Returns:
a :class:`PlaybackStream`. This can be used to stop playback
See Also
~~~~~~~~
* :meth:`Sample.getEngine`
* :meth:`Sample.setEngine`
"""
if not backend:
if engine is None and delay == 0 and speed <= 8:
backend = 'portaudio'
else:
backend = 'csound'
if backend == 'portaudio':
return self._playPortaudio(loop=loop, chan=chan, gain=gain,
speed=speed, skip=skip, dur=dur, block=block)
elif backend == 'csound':
# Use csoundengine
if not engine:
engine = Sample.getEngine()
if self.path:
source = self.path
else:
source = self._makeCsoundTable(engine)
if pan is None:
pan = 0 if self.numchannels == 1 else 0.5
if dur == 0:
dur = -1
synth = engine.session().playSample(source, chan=chan, gain=gain, loop=loop,
skip=skip, dur=dur, delay=delay, pan=pan,
speed=speed)
if block:
import time
while synth.playing():
time.sleep(0.02)
return _CsoundenginePlayback(synth=synth)
else:
raise ValueError(f"backend should be one of 'csound', 'portaudio', got {backend}")
[docs]
def asbpf(self) -> bpf4.BpfInterface:
"""
Convert this sample to a ``bpf4.core.Sampled`` bpf
.. seealso:: `bpf <https://bpf4.readthedocs.io>`_
"""
if self._asbpf not in (None, False):
return self._asbpf
import bpf4
bpf = bpf4.Sampled(self.samples, 1/self.sr)
self._asbpf = bpf
return bpf
[docs]
def plot(self, profile='auto') -> Figure:
"""
plot the sample data
Args:
profile: one of 'low', 'medium', 'high' or 'auto'
Returns:
the Figure used
"""
from . import plotting
return plotting.plotWaveform(self.samples, self.sr, profile=profile)
def _repr_html_(self) -> str:
return self.reprHtml()
[docs]
def show(self, withAudiotag=True, figsize=(24, 4), external=False, profile=''):
if external:
raise ValueError("External editor not supported")
if _util.pythonSessionType() == 'jupyter':
from IPython.display import display_html
display_html(self.reprHtml(withAudiotag=withAudiotag, figsize=figsize, profile=profile), raw=True)
else:
self.plot()
[docs]
def reprHtml(self,
withHeader=True,
withAudiotag=True,
figsize=(24, 4),
profile=''
) -> str:
"""
Returns an HTML representation of this Sample
This can be used within a Jupyter notebook to force the
html display. It is useful inside a block were it would
not be possible to put this Sample as the last element
of the cell to force the html representation
Args:
withHeader: include a header line with repr text ('Sample(...)')
withAudiotag: include html for audio playback. If None, this
defaults to config['reprhtml_include_audiotag']
Returns:
the HTML repr as str
Example
-------
>>> from maelzel.snd.audiosample import Sample
>>> sample = Sample("snd/Numbers_EnglishFemale.flac")
>>> sample.reprHtml()
.. image:: ../assets/audiosample-reprhtml.png
"""
if self._reprHtml:
return self._reprHtml
from csoundengine.internal import plotSamplesAsHtml
if withAudiotag is None:
withAudiotag = config['reprhtml_include_audiotag']
if withHeader:
from emlib.misc import sec2str
dur = self.duration
durstr = sec2str(dur) if dur > 60 else f"{dur:.3g}"
header = (f"<b>Sample</b>(duration=<code>{durstr}</code>, "
f"sr=<code>{self.sr}</code>, "
f"numchannels=<code>{self.numchannels}</code>)<br>")
else:
header = ''
audiotagMaxDur = config['reprhtml_audiotag_embed_maxduration_seconds']
embed = self.duration <= audiotagMaxDur
html = plotSamplesAsHtml(samples=self.samples, sr=self.sr,
customHeader=header,
withAudiotag=withAudiotag,
profile=profile, path=self.path, figsize=figsize,
embedAudiotag=embed,
audiotagMaxDuration=audiotagMaxDur if embed else 9999999)
self._reprHtml = html
return html
[docs]
def plotSpetrograph(self, framesize=2048, window='hamming', start=0., dur=0.,
axes: Axes | None = None
) -> Axes:
"""
Plot the spectrograph of this sample or a fragment thereof
Args:
framesize: the size of each analysis, in samples
window: As passed to scipy.signal.get_window
`blackman`, `hamming`, `hann`, `bartlett`, `flattop`, `parzen`,
`bohman`, `blackmanharris`, `nuttall`, `barthann`, `kaiser` (needs beta),
`gaussian` (needs standard deviation)
start: if given, plot the spectrograph at this time
dur: if given, use this fragment of the sample (0=from start to end of
sample)
axes: the axes to plot to. A new axes will be created if not given
Returns:
the used axes
Plots the spectrograph of the entire sample (slice before to use only
a fraction)
"""
from . import plotting
samples = self.samples if self.numchannels == 1 else self.samples[:, 0]
s0 = 0 if start == 0 else int(start*self.sr)
s1 = self.numframes if dur == 0 else min(self.numframes,
int(dur*self.sr)-s0)
if s0 > 0 or s1 != self.numframes:
samples = samples[s0:s1]
return plotting.plotPowerSpectrum(samples, self.sr, framesize=framesize,
window=window, axes=axes)
[docs]
def plotSpectrogram(self,
fftsize=2048,
window='hamming',
winsize: int = 0,
overlap=4,
mindb=-120,
minfreq: int = 40,
maxfreq: int = 12000,
yaxis='linear',
figsize=(24, 10),
axes: Axes | None = None
) -> Axes:
"""
Plot the spectrogram of this sound using matplotlib
Args:
fftsize: the size of the fft.
window: window type. One of 'hamming', 'hanning', 'blackman', ...
(see scipy.signal.get_window)
winsize: window size in samples, defaults to fftsize
mindb: the min. amplitude to plot
overlap: determines the hop size (hop size in samples = fftsize/overlap).
None to infer a sensible default from the other parameters
minfreq: the min. freq to plot
maxfreq: the highes freq. to plot. If None, a default is estimated
(check maelzel.snd.plotting.config)
yaxis: one of 'linear' or 'log'
figsize: the figure size, a tuple (width: int, height: int)
axes: a matplotlib Axes object. If passed, plotting is done using this
Axes; otherwise a new Axes object is created and returned
Returns:
the matplotlib Axes
"""
from . import plotting
if self.numchannels > 1:
samples = self.samples[:, 0]
else:
samples = self.samples
return plotting.plotSpectrogram(samples, self.sr, window=window, fftsize=fftsize,
overlap=overlap, mindb=mindb, minfreq=minfreq,
maxfreq=maxfreq, axes=axes, yaxis=yaxis,
winsize=winsize, figsize=figsize)
[docs]
def plotMelSpectrogram(self,
fftsize=2048,
overlap=4,
winsize: int = 0,
nmels=128,
axes: Axes | None = None,
axislabels=False,
cmap='magma',
) -> Axes:
"""
Plot a mel-scale spectrogram
Args:
fftsize: the fftsize in samples
overlap: the amount of overlap. An overlap of 4 will result in a hop-size of
winlength samples // overlap
winsize: the window size in samples. If None, fftsize is used. If given,
winlength <= fftsize
nmels: number of mel bins
axes: if given, plot on these Axes
axislabels: if True, include labels on the axes
cmap: a color map byname
Returns:
the Axes used
"""
from . import plotting
return plotting.plotMelSpectrogram(self.samples, sr=self.sr, fftsize=fftsize,
overlap=overlap, winsize=winsize, axes=axes,
setlabel=axislabels, nmels=nmels, cmap=cmap)
[docs]
def openInEditor(self, wait=True, app=None, fmt='wav'
) -> Self | None:
"""
Open the sample in an external editor.
The original is not changed.
Args:
wait: if True, the editor is opened in blocking mode,
the results of the edit are returned as a new Sample
app: if given, this application is used to open the sample.
Otherwise, the application configured via the key 'editor'
is used
fmt: the format to write the samples to
Returns:
if wait is True, returns the sample after closing editor
"""
assert fmt in {'wav', 'aiff', 'aif', 'flac', 'mp3', 'ogg'}
import tempfile
tmpfile = tempfile.NamedTemporaryFile(suffix="."+fmt, delete=False)
sndfile = tmpfile.name
self.write(sndfile)
# sndfile = tempfile.mktemp(suffix="." + fmt)
_openInEditor(sndfile, wait=wait, app=app)
if wait:
return self.__class__(sndfile)
return None
[docs]
def write(self,
outfile: str,
encoding='',
overflow='fail',
fmt='',
bitrate=224,
**metadata
) -> None:
"""
Write the samples to outfile
Args:
outfile: the name of the soundfile. The extension determines the
file format
encoding: the encoding to use. One of pcm16, pcm24, pcm32, float32,
float64 or, in the case of mp3 or ogg, the frame rate as integer
(160 = 160Kb)
fmt: if not given, it is inferred from the extension. One of 'wav',
'aiff', 'flac'.
overflow: one of 'fail', 'normalize', 'nothing'. This applies only to
pcm formats (wav, aif, mp3)
bitrate: bitrate used when writing to mp3
metadata: XXX
"""
if outfile == "?":
import emlib.dialogs
outfile = emlib.dialogs.saveDialog(filter=emlib.dialogs.filters['Sound'],
title="Save soundfile",
directory=os.getcwd())
if not outfile:
return
outfile = _normalizePath(outfile)
samples = self.samples
if not fmt:
fmt = os.path.splitext(outfile)[1][1:].lower()
assert fmt in {'wav', 'aif', 'aiff', 'flac', 'mp3', 'ogg'}
import sndfileio
if not encoding:
encoding = sndfileio.util.default_encoding(fmt)
if not encoding:
raise ValueError(f"Format {fmt} is not supported")
if overflow != 'nothing' and encoding.startswith('pcm'):
import numpyx
minval, maxval = numpyx.minmax1d(self.getChannel(0).samples)
if minval < -1 or maxval > 1:
if overflow == 'fail':
raise ValueError("Samples would overflow when written")
elif overflow == 'normalize':
maxpeak = max(maxval, abs(minval))
samples = samples / maxpeak
sndfileio.sndwrite(outfile, samples=samples, sr=self.sr,
encoding=encoding, fileformat=fmt,
bitrate=bitrate,
metadata=metadata)
[docs]
def copy(self) -> Self:
"""
Return a copy of this Sample
.. note::
if self is readonly, the copied Sample will not be readonly.
"""
return self.__class__(self.samples.copy(), self.sr)
def _changed(self) -> None:
# clear cached values, invalidate path
self._csoundTable = None
self._reprHtml = ''
self._asbpf = None
self._f0 = None
self.path = ''
def __add__(self, other: float | Sample) -> Self:
if isinstance(other, (int, float)):
return self.__class__(self.samples+other, self.sr)
elif isinstance(other, Sample):
assert self.numchannels == other.numchannels and self.sr == other.sr
if len(self) == len(other):
return self.__class__(self.samples+other.samples, self.sr)
elif len(self) > len(other):
return self.__class__(self.samples[:len(other)]+other.samples, self.sr)
else:
return self.__class__(self.samples + other.samples[:len(self)], self.sr)
else:
raise TypeError(f"Expected a scalar or a sample, got {other}")
def __iadd__(self, other: float | Sample) -> None:
if isinstance(other, (int, float)):
self.samples += other
elif isinstance(other, Sample):
assert self.numchannels == other.numchannels and self.sr == other.sr
if len(self) == len(other):
self.samples += other.samples
elif len(other) < len(self):
self.samples[:len(other)] += other.samples
else:
self.samples += other.samples[:len(self)]
self._changed()
else:
raise TypeError(f"Expected a scalar or a sample, got {other}")
def __sub__(self, other: float | Self) -> Self:
if isinstance(other, (int, float)):
return self.__class__(self.samples-other, self.sr)
elif isinstance(other, Sample):
assert self.numchannels == other.numchannels and self.sr == other.sr
if len(self) == len(other):
return self.__class__(self.samples-other.samples, self.sr)
elif len(self) > len(other):
return self.__class__(self.samples[:len(other)]-other.samples, self.sr)
else:
return self.__class__(self.samples - other.samples[:len(self)], self.sr)
else:
raise TypeError(f"Expected a scalar or a sample, got {other}")
def __isub__(self, other: float | Self) -> None:
if isinstance(other, (int, float)):
self.samples -= other
elif isinstance(other, Sample):
assert self.numchannels == other.numchannels and self.sr == other.sr
if len(self) == len(other):
self.samples -= other.samples
elif len(self) > len(other):
self.samples[:len(other)] -= other.samples
else:
self.samples -= other.samples[:len(self)]
else:
raise TypeError(f"Expected a scalar or a sample, got {other}")
self._changed()
def __mul__(self, other: float | Self) -> Self:
if isinstance(other, (int, float)):
return self.__class__(self.samples*other, self.sr)
elif isinstance(other, Sample):
assert self.numchannels == other.numchannels and self.sr == other.sr
if len(self) == len(other):
return self.__class__(self.samples*other.samples, self.sr)
elif len(self) > len(other):
return self.__class__(self.samples[:len(other)]*other.samples, self.sr)
else:
return self.__class__(self.samples * other.samples[:len(self)], self.sr)
else:
raise TypeError(f"Expected a scalar or a sample, got {other}")
def __imul__(self, other: float | Sample) -> Self:
if isinstance(other, (int, float)):
self.samples *= other
elif isinstance(other, Sample):
assert self.numchannels == other.numchannels and self.sr == other.sr
if len(self) == len(other):
self.samples *= other.samples
elif len(self) > len(other):
self.samples[:len(other)] *= other.samples
else:
self.samples *= other.samples[:len(self)]
else:
raise TypeError(f"Expected a scalar or a sample, got {other}")
self._changed()
return self
def __pow__(self, other: float) -> Self:
return self.__class__(self.samples**other, self.sr)
def __len__(self) -> int:
return len(self.samples)
def __getitem__(self, item: slice) -> Self:
"""
Samples support slicing
``sample[start:stop]`` will return a new Sample consisting of a slice
of this sample between the times start and stop. As it is a slice
of this Sample, any changes inplace will be reflected in the original
samples. To avoid this, use :meth:`Sample.copy`.
Example::
# Get a slice between seconds 1.5 and 3. Any change to view will be
# reflected in original
>>> source = Sample("sound.wav")
>>> view = source[1.5:3.0]
# To slice at the sample level, access .samples directly
# NB: this will be a 'view' over the existing samples, any modification
# will be reflected in the source array. Use .copy to produce an independent
# sample
>>> newsample = Sample(source.samples[1024:2048], source.sr)
"""
if not isinstance(item, slice):
raise ValueError("Samples only support the form sample[start:end]. "
"To access individual samples, use sample.samples[index]")
start, stop, step = item.start, item.stop, item.step
if stop is None:
stop = self.duration
if start is None:
start = 0.
if step is not None:
raise ValueError("Samples do not support a step for slicing. NB: "
"To resample a Sample, use the .resample method")
stop = min(stop, self.duration)
start = min(start, self.duration)
assert 0 <= start <= stop
frame0 = int(start*self.sr)
frame1 = int(stop*self.sr)
return self.__class__(self.samples[frame0:frame1], self.sr)
[docs]
def splice(self, start: int = 0, end: int = 0) -> Self:
"""
Splice this Sample between the given frames
Args:
start: start frame (in samples)
end: end frame (in samples, 0=end)
Returns:
a copy of self spliced between start and end frame
"""
if start == end == 0:
return self
if end == 0:
end = len(self.samples)
return self.__class__(self.samples[start:end], sr=self.sr,
readonly=self.readonly, engine=self.engine)
[docs]
def fade(self, fadetime: float | tuple[float, float], shape='linear'
) -> Self:
"""
Fade this Sample **inplace**, returns self.
If only value is given as fadetime a fade-in and fade-out is performed with
this fadetime. A tuple can be used to apply a different fadetime for in and out.
Args:
fadetime: the duration of the fade.
shape: the shape of the fade. One of 'linear', 'expon(x)', 'halfcos'
Returns:
self
.. note::
To generate a faded sample without modifying the original sample,
use ``sample = sample.copy().fade(...)``
Example::
>>> sample1= Sample("sound.wav")
# Fade-in and out
>>> sample1.fade(0.2)
>>> sample2 = Sample("another.wav")
# Create a copy with a fade-out of 200 ms
>>> sample3 = sample2.copy().fade((0, 0.2))
"""
self._checkWrite()
if isinstance(fadetime, tuple):
fadein, fadeout = fadetime
if fadein:
_npsnd.applyFade(self.samples, self.sr, fadetime=fadein,
mode='in', shape=shape)
if fadeout:
_npsnd.applyFade(self.samples, self.sr, fadetime=fadeout,
mode='out', shape=shape)
else:
assert isinstance(fadetime, (int, float))
_npsnd.applyFade(self.samples, self.sr, fadetime=fadetime,
mode='inout', shape=shape)
self._changed()
return self
[docs]
def prependSilence(self, dur: float) -> Self:
"""
Return a new Sample with silence of given dur at the beginning
Args:
dur: duration of the silence to add at the beginning
Returns:
new Sample
"""
silence = _silentFrames(numframes=int(self.sr*dur), channels=self.numchannels)
samples = np.concatenate([silence, self.samples])
return self.__class__(samples, sr=self.sr)
[docs]
def appendSilence(self, dur: float) -> Self:
"""
Return a new Sample with added silence at the end
Args:
dur: the duration of the added silence
Returns:
a new Sample
.. seealso:: :meth:`Sample.prependSilence`, :meth:`Sample.join`, :meth:`Sample.append`
"""
silence = _silentFrames(numframes=int(self.sr*dur), channels=self.numchannels)
samples = np.concatenate([self.samples, silence])
return self.__class__(samples, sr=self.sr)
[docs]
def concat(self, *other: Self) -> Self:
"""
Join (concatenate) this Sample with other(s)
Args:
*other: one or more Samples to join together
Returns:
the resulting Sample
.. seealso:: :meth:`Sample.join`
"""
samples = [self, *other]
samp = concatenate(samples)
return self.__class__(samp.samples, sr=samp.sr)
def _checkWrite(self) -> None:
if self.readonly:
raise RuntimeError("This Sample is readonly. Create a copy (which will"
" be writable) and operate on that copy")
[docs]
def panned(self, pan: float) -> Self:
"""Return a new Sample with panning applied
Args:
pan: panning value between 0 (left) and 1 (right)
Returns:
the new Sample, always a stereo sample
"""
if self.numchannels > 2:
raise ValueError(f"Panning can only be applied to mono or stereo samples, "
f"this sample has {self.numchannels} channels")
samples = _npsnd.panStereo(self.samples, pan)
return self.__class__(samples, sr=self.sr)
[docs]
def applyPanning(self, pan: float) -> Self:
"""Apply panning to the sample in place
.. note:: This method is only available for stereo samples.
Args:
pan: panning value between 0 (left) and 1 (right)
Returns:
self
"""
if self.numchannels != 2:
raise ValueError(f"Panning can only be applied to stereo samples, "
f"this sample has {self.numchannels} channels")
self._checkWrite()
_npsnd.applyPanning(self.samples, pan)
self._changed()
return self
[docs]
def normalize(self, headroom=0.) -> Self:
"""Normalize inplace, returns self
Args:
headroom: maximum peak in dB
Returns:
self
"""
self._checkWrite()
ratio = _npsnd.normalizationRatio(self.samples, headroom)
self.samples *= ratio
self._changed()
return self
[docs]
def peak(self) -> float:
"""Highest sample value in dB"""
return pt.amp2db(np.abs(self.samples).max())
[docs]
def peaksBpf(self, framedur=0.01, overlap=2) -> bpf4.Sampled:
"""
Create a bpf representing the peaks envelope of the source
Args:
framedur: the duration of an analysis frame (in seconds)
overlap: determines the hop time between analysis frames.
``hoptime = framedur / overlap``
Returns:
A bpf representing the peaks envelope of the source
A peak is the absolute maximum value of a sample over a window
of time (the *framedur* in this case). To use another metric
for tracking amplitude see :meth:`Sample.rmsBpf` which uses
rms, or :meth:`Sample.amplitudeBpf` which uses an envelope
follower
The resolution of the returned bpf will be ``framedur/overlap``
.. seealso::
https://bpf4.readthedocs.io/en/latest/
"""
return _npsnd.peaksBpf(self.samples, self.sr, dt=framedur, overlap=overlap)
[docs]
def reverse(self) -> Self:
""" reverse the sample **in-place**, returns self """
self._checkWrite()
self.samples[:] = self.samples[-1::-1]
self._changed()
return self
[docs]
def rmsBpf(self, dt=0.01, overlap=1) -> bpf4.Sampled:
"""
Creates a BPF representing the rms of this sample over time
Args:
dt (float): The duration of each frame in seconds.
overlap (int): The number of frames to overlap.
Returns:
bpf4.Sampled: A BPF representing the rms of this sample over time.
Raises:
ValueError: If dt is not positive.
ValueError: If overlap is not positive.
.. seealso:: https://bpf4.readthedocs.io/en/latest/
"""
return _npsnd.rmsBpf(self.samples, self.sr, dt=dt, overlap=overlap)
[docs]
def rms(self) -> float:
"""
RMS of the samples
This method returns the rms for **all** the frames at once. As such
it is only of use for short samples. The use case is as follows:
>>> from maelzel.snd.audiosample import Sample
>>> from pitchtools import amp2db
>>> s = Sample("/path/to/sample.flac")
>>> amp2db(s[0.5:0.7].rms())
-12.05
.. seealso:: :meth:`Sample.rmsbpf`
"""
return _npsnd.rms(self.samples)
[docs]
def amplitudeBpf(self, attack=0.01, release=0.01, chunktime=0.05, overlap=2) -> bpf4.Sampled:
"""
Creates a bpf representing the average amplitude over time
Args:
attack: attack time in seconds for the envelope follower
release: decay time in seconds for the envelope follower
chunktime: chunk time in seconds, averages envelope over this time
overlap: overlap factor for averaging the envelope
Returns:
a bpf representing the average amplitude over time
"""
return _npsnd.ampBpf(self.samples, self.sr, attack=attack, release=release, chunktime=chunktime, overlap=overlap)
[docs]
def mixdown(self, enforceCopy=False) -> Sample:
"""
Return a new Sample with this sample downmixed to mono
Args:
enforceCopy: always return a copy, even if self is already mono
Returns:
a mono version of self.
"""
if self.numchannels == 1:
return self if not enforceCopy else self.copy()
return Sample(_npsnd.asmono(self.samples), sr=self.sr)
[docs]
def stripLeft(self, threshold=-120.0, margin=0.01, window=0.02) -> Self:
"""
Remove silence from the left. Returns a new Sample
Args:
threshold: dynamic of silence, in dB
margin: leave at list this amount of time between the first sample
and the beginning of silence
window: the duration of the analysis window, in seconds
Returns:
a new Sample with silence removed
"""
period = int(window*self.sr)
first_sound_sample = _npsnd.firstSound(self.samples, threshold, period)
if first_sound_sample is not None and first_sound_sample >= 0:
time = max(first_sound_sample/self.sr-margin, 0)
return self[time:]
return self
[docs]
def stripRight(self, threshold=-120.0, margin=0.01, window=0.02) -> Self:
"""
Remove silence from the right. Returns a new Sample
Args:
threshold: dynamic of silence, in dB
margin: leave at list this amount of time between the first/last sample
and the beginning of silence or
window: the duration of the analysis window, in seconds
Returns:
a new Sample with silence removed
"""
period = int(window*self.sr)
lastsample = _npsnd.lastSound(self.samples, threshold, period)
if lastsample is not None and lastsample >= 0:
time = min(lastsample/self.sr+margin, self.duration)
return self[:time]
return self
[docs]
def strip(self, threshold=-120.0, margin=0.01, window=0.02) -> Self:
"""
Remove silence from the sides. Returns a new Sample
Args:
threshold: dynamic of silence, in dB
margin: leave at list this amount of time between the first/last sample
and the beginning of silence or
window: the duration of the analysis window, in seconds
Returns:
a new Sample with silence at the sides removed
"""
out = self.stripLeft(threshold, margin, window)
out = out.stripRight(threshold, margin, window)
return out
[docs]
def resample(self, sr: int) -> Sample:
"""
Return a new Sample with the given sr
"""
if sr == self.sr:
return self
from maelzel.snd.resample import resample
samples = resample(self.samples, self.sr, sr)
return Sample(samples, sr=sr)
[docs]
def scrub(self, bpf: bpf4.BpfInterface) -> Sample:
"""
Scrub the samples with the given curve
Args:
bpf: a bpf mapping time -> time (see `bpf <https://bpf4.readthedocs.io>`)
Example::
Read sample at half speed
>>> import bpf4
>>> sample = Sample("path.wav")
>>> dur = sample.duration
>>> sample2 = sample.scrub(bpf4.linear([(0, 0), (dur*2, dur)]))
"""
from maelzel.snd import sndfiletools
samples, sr = sndfiletools.scrub((self.samples, self.sr), bpf,
rewind=False)
return Sample(samples, self.sr)
[docs]
def getChannel(self, n: int, contiguous=False) -> Sample:
"""
return a new mono Sample with the given channel
Args:
n: the channel index (starting with 0)
contiguous: if True, ensure that the samples are represented as
contiguous in memory
"""
if self.numchannels == 1 and n == 0:
return self
if n > (self.numchannels-1):
raise ValueError(f"this sample has only {self.numchannels} channel(s)!")
newsamples = self.samples[:, n]
if contiguous and not newsamples.flags.c_contiguous:
newsamples = np.ascontiguousarray(newsamples)
return Sample(newsamples, self.sr)
[docs]
def contiguous(self) -> Sample:
"""
Return a Sample ensuring that the samples are contiguous in memory
If self is already contiguous, self is returned
"""
if self.samples.flags.c_contiguous:
return self
return Sample(np.ascontiguousarray(self.samples), self.sr)
[docs]
def fundamentalAnalysis(self,
semitoneQuantization=0,
fftsize: int = 0,
simplify=0.08,
overlap=8,
minFrequency=50,
minSilence=0.08,
onsetThreshold=0.05,
onsetOverlap=8,
) -> mono.FundamentalAnalysisMonophonic:
"""
Analyze the fundamental of this sound, assuming it is a monophonic sound
This is a wrapper around ``maelzel.transcribe.mono.FundamentalAnalysisMono`` and
is placed here for visibility and easy of use. To access all parameters,
use that directly
Args:
semitoneQuantization (float): Semitone quantization, 0 to disable quantization
fftsize (int): FFT size
simplify (float): Simplification threshold
overlap (int): Overlap factor
minFrequency (float): Minimum frequency
minSilence (float): Minimum silence duration
onsetThreshold (float): Onset threshold
onsetOverlap (int): overlap factor for onset analysis
Returns:
a :class:`maelzel.transcribe.mono.FundamentalAnalysisMono`
Example
~~~~~~~
>>> from maelzel.snd import audiosample
>>> samp = audiosample.Sample("sndfile.wav")
>>> f0analysis = samp.fundamentalAnalysis()
>>> notes = [(group.start(), group.duration(), group.meanfreq())
... for group in f0analysis.groups]
"""
from maelzel.transcribe import mono
analysis = mono.FundamentalAnalysisMonophonic(samples=self.samples,
sr=self.sr,
semitoneQuantization=semitoneQuantization,
fftSize=fftsize,
overlap=overlap,
simplify=simplify,
minFrequency=minFrequency,
minSilence=minSilence,
onsetThreshold=onsetThreshold,
onsetOverlap=onsetOverlap)
return analysis
[docs]
def onsets(self, fftsize=2048, overlap=4, method='rosita',
threshold: float | None = None, mingap=0.03) -> np.ndarray:
"""
Detect onsets
Depending on the implementation, onsets can be "possitive"
onsets, similar to an attack, or just sudden changes in the spectrum; this
includes "negative" onsets, which would be detected at the sudden end
of a note. To accurately track onsets it might be useful to use other
features, like peak amplitude, rms, or voicedness to check the kind
of onset.
For an in-depth demonstration of these concepts see
https://github.com/gesellkammer/maelzel/blob/master/notebooks/onsets.ipynb
Args:
fftsize: the size of the window
overlap: a hop size as a fraction of the fftsize
method: one of 'rosita' (using a lightweight version of librosa's onset
detection algorithm) or 'aubio' (needs aubio to be installed)
threshold: the onset sensitivity. This is a value specific for a given
method (rosita has a default of 0.07, while aubio has a default of 0.03)
mingap: the min. time between two onsets
Returns:
a list of onset times, as a numpy array
Example
~~~~~~~
.. code-block:: python
from maelzel.snd import audiosample
from maelzel.core import *
from pitchtools import *
samp = audiosample.Sample("snd/finneganswake-fragm01.flac").getChannel(0, contiguous=True)[0:10]
onsets = samp.onsets(threshold=0.1, mingap=0.05)
ax = samp.plotSpectrogram()
# Plot each onset as a vertical line
ax.vlines(onsets, ymin=0, ymax=10000, color='white', alpha=0.4, linewidth=2)
.. image:: ../assets/audiosample-onsets.png
See Also
~~~~~~~~
* maelzel.snd.features.onsetsAubio
* maelzel.snd.features.onsets
"""
if method == 'rosita':
if threshold is None:
threshold = 0.07
from maelzel.snd import features
onsets, onsetstrength = features.onsets(self.samples, sr=self.sr,
winsize=fftsize,
hopsize=fftsize // overlap,
threshold=threshold,
mingap=mingap)
return onsets
else:
raise ValueError(f"method {method} not known. Possible methods: 'rosita'")
[docs]
def partialTrackingAnalysis(self,
resolution: float = 50.,
channel=0,
windowsize=0.,
freqdrift=0.,
hoptime=0.,
mindb=-90,
) -> _spectrum.Spectrum:
"""
Analyze this audiosample using partial tracking
Args:
resolution: the resolution of the analysis, in Hz
channel: which channel to analyze
windowsize: The window size in hz. This value needs to be higher than the
resolution since the window in samples needs to be smaller than the fft analysis
mindb: the amplitude floor.
hoptime: the time to move the window after each analysis. For overlap==1, this is 1/windowsize.
For overlap==2, 1/(windowsize*2)
freqdrift: the max. variation in frequency between two breakpoints (by default, 1/2 resolution)
Returns:
a :class:`maelzel.partialtracking.spectrum.Spectrum`
.. seealso::
:meth:`~Sample.spectrumAt`, :meth:`maelzel.partialtracking.spectrum.Spectrum.analyze`
"""
from maelzel.partialtracking.spectrum import Spectrum
samples = self.getChannel(channel).samples
return Spectrum.analyze(samples=samples,
sr=self.sr,
resolution=resolution,
windowsize=windowsize,
hoptime=hoptime,
freqdrift=freqdrift,
mindb=mindb)
[docs]
def spectrumAt(self,
time: float,
resolution: float = 50.,
channel=0,
windowsize: float = -1,
mindb=-90,
minfreq=0,
maxfreq=12000,
maxcount=0
) -> list[tuple[float, float]]:
"""
Analyze sinusoidal components of this Sample at the given time
Args:
time: the time to analyze
resolution: the resolution of the analysis, in hz
channel: if this sample has multiple channels, which channel to analyze
windowsize: the window size in hz
mindb: the min. amplitude in dB for a component to be included
minfreq: the min. frequency of a component to be included
maxfreq: the max. frequency of a component to be included
maxcount: the max. number of components to include (0 to include all)
Returns:
a list of pairs (frequency, amplitude) where each pair represents a sinusoidal
component of this sample at the given time. Amplitudes are in the range 0-1
"""
return spectrumAt(self.samples, sr=self.sr, time=time, resolution=resolution,
channel=channel, windowsize=windowsize, mindb=mindb,
minfreq=minfreq, maxfreq=maxfreq, maxcount=maxcount)
[docs]
def fundamentalFreq(self, time: float | None = None, dur=0.2, fftsize=2048, overlap=4,
fallbackfreq=0
) -> float | None:
"""
Calculate the fundamental freq. at a given time
The returned frequency is averaged over the given duration period
At the moment the smooth pyin method is used
Args:
time: the time to start sampling the fundamental frequency. If None is given,
the first actual sound within this Sample is used
dur: the duration of the estimation period. The returned frequency will be the
average frequency over this period of time.
fftsize: the fftsize used
fallbackfreq: frequency to use when no fundamental frequency was detected
overlap: amount of overlaps per fftsize, determines the hop time
Returns:
the average frequency within the given period of time, or None if no fundamental
was found
"""
if time is None:
time, freq = self.firstPitch()
return freq if freq else None
from maelzel.snd import vamptools
import scipy.stats
samples = self.samples
if len(samples.shape) > 1:
samples = samples[:, 0]
startsamp = int(time * self.sr)
endsamp = min(int((time+dur)*self.sr), len(samples))
samples = samples[startsamp:endsamp]
dt, freqs = vamptools.pyinSmoothPitch(samples, self.sr, fftSize=fftsize,
stepSize=fftsize//overlap)
freqs = freqs[~np.isnan(freqs)]
if len(freqs) == 0:
avgfreq = fallbackfreq
else:
minfreq = self.sr / fftsize * 2
avgfreq = float(scipy.stats.trim_mean(freqs[freqs > minfreq], proportiontocut=0.1))
return avgfreq
[docs]
def fundamental(self, fftsize=2048, overlap=4, unvoiced='negative', minAmpDb=-60, sensitivity=0.7
) -> tuple[np.ndarray, np.ndarray]:
"""
Track the fundamental frequency of this sample
Args:
fftsize: the fft size to use
overlap: number of overlaps
unvoiced: one of 'negative' or 'nan'
minAmpDb: the minimum amplitude in dB. Any sound softer than this
will be supressed
sensitivity: onset sensitivity, a value between 0 and 1
Returns:
a tuple (times, freqs), both numpy arrays. The frequency array will
contain a negative frequency whenever the sound is unvoiced (inharmonic,
no fundamental can be predicted)
.. seealso:: :func:`maelzel.snd.vamptools.pyinSmoothPitch`, :func:`maelzel.snd.freqestimate.f0curvePyinVamp`
"""
from maelzel.snd import vamptools
samples = _npsnd.getChannel(self.samples, 0, ensureContiguous=True)
_util.checkChoice("unvoiced", unvoiced, choices=('negative', 'nan'))
dt, freqs = vamptools.pyinSmoothPitch(samples, self.sr,
fftSize=fftsize,
stepSize=fftsize // overlap,
outputUnvoiced='negative',
lowAmpSuppression=pt.db2amp(minAmpDb),
onsetSensitivity=sensitivity
)
times = np.arange(0, dt * len(freqs) - dt*0.5, dt)
assert len(times) == len(freqs), f"{len(times)=}, {len(freqs)=}"
return times, freqs
[docs]
def fundamentalBpf(self,
fftsize=2048,
overlap=4,
unvoiced='negative',
lowAmpSuppression=pt.db2amp(-60),
onsetSensitivity=0.7,
method='pyin-pitchtrack'
) -> tuple[bpf4.BpfInterface, bpf4.BpfInterface]:
"""
Construct a bpf which follows the fundamental of this sample in time
Args:
fftsize: the size of the fft, in samples
overlap: determines the hop size
unvoiced: method to handle unvoiced sections. One of 'nan', 'negative', 'keep'
method: one of 'pyin-pitchtrack' or 'pyin-smoothpitch'
lowAmpSuppression: only analyzes audio louder than this threshold
onsetSensitivity: onset sensitivity of the pyin algorithm
Returns:
a tuple (f0bpf, voicednessbpf), each is a `bpf <https://bpf4.readthedocs.io>`_.
``f0bpf`` represents the fundamental freq. over time, ``voicednessbpf``
represents the voicedness (how "Pitched" the signal is) at a given time
"""
from maelzel.snd import vamptools
import bpf4
samples = self.getChannel(0).samples
if method == 'pyin-pitchtrack':
data = vamptools.pyinPitchTrack(
samples=samples,
sr=self.sr,
fftSize=fftsize,
overlap=overlap,
lowAmpSuppression=lowAmpSuppression,
onsetSensitivity=onsetSensitivity,
outputUnvoiced=unvoiced)
times = data[:,0]
freqs = data[:,1]
voicedness = data[:,2]
return bpf4.Linear(times, freqs), bpf4.Linear(times, voicedness)
elif method == 'pyin-smoothpitch':
dt, freqs = vamptools.pyinSmoothPitch(samples, self.sr,
fftSize=fftsize,
stepSize=fftsize // overlap,
lowAmpSuppression=lowAmpSuppression,
onsetSensitivity=onsetSensitivity,
outputUnvoiced=unvoiced)
return bpf4.Sampled(freqs, dt), bpf4.Const(1.0)
else:
raise ValueError(f"Unknown method {method}")
[docs]
def chunks(self, chunksize: int, hop: int = 0, pad=False) -> Iterator[np.ndarray]:
"""
Iterate over the samples in chunks of chunksize.
If pad is True, the last chunk will be zeropadded, if necessary
Args:
chunksize: the size of each chunk
hop: the number of samples to skip
pad: if True, pad the last chunk with 0 to fill chunksize
Returns:
an iterator over the chunks
"""
import emlib.numpytools as nptools
return nptools.chunks(self.samples,
chunksize=chunksize,
hop=hop or None,
padwith=(0 if pad else None))
[docs]
def firstPitch(self, threshold=-120, minfreq=60, overlap=4, channel=0, chunkdur=0.25
) -> tuple[float, float]:
"""
Returns the first (monophonic) pitch found
Args:
threshold: the silence threhsold
minfreq: the min. frequency to considere valid
overlap: pitch analysis overlap
channel: for multichannel audio, which channel to use
chunkdur: chunk duration to analyze, in seconds
Returns:
a tuple (time, freq) of the first pitched sound found.
If no pitched sound found, returns (0, 0)
"""
samples = self.samples if self.numchannels == 1 else self.samples[:,channel]
firstidx = _npsnd.firstSound(samples, threshold=threshold)
lastidx = _npsnd.lastSound(samples, threshold=threshold)
if firstidx is None or lastidx is None:
return (0., 0.)
from maelzel.snd import freqestimate
chunksize = int(chunkdur * self.sr)
for idx in range(firstidx, lastidx, chunksize):
fragm = samples[idx:idx+chunksize]
f0, prob = freqestimate.f0curve(fragm, sr=self.sr, minfreq=minfreq,
overlap=overlap, unvoicedFreqs='nan')
times, freqs = f0.points()
mask = ~np.isnan(freqs)
if not mask.any():
continue
selfreqs = freqs[mask]
seltimes = times[mask]
idx = min(len(selfreqs)-1, 3)
return float(seltimes[idx] + idx / self.sr), float(selfreqs[idx])
return 0., 0.
[docs]
def firstSound(self, threshold=-120.0, period=0.04, overlap=2, start=0.,
) -> float | None:
"""
Find the time of the first sound within this sample
This does not make any difference between background noise or pitched/voiced sound
Args:
threshold: the sound threshold in dB.
period: the time period to calculate the rms
overlap: determines the step size between rms calculations
start: start time (0=start of sample)
Returns:
the time of the first sound, or None if no sound found
.. seealso:: :meth:`Sample.firstPitch`
"""
idx = _npsnd.firstSound(self.samples,
threshold=threshold,
periodsamps=int(period * self.sr),
overlap=overlap,
skip=int(start * self.sr))
if idx is None:
return None
return idx / self.sr if idx >= 0 else None
[docs]
def firstSilence(self, threshold=-80, period=0.04, overlap=2,
soundthreshold=-50, start=0.) -> float | None:
"""
Find the first silence in this sample
Args:
threshold: rms value which counts as silence, in dB
period: the time period to calculate the rms
overlap: determines the step size between rms calculations
soundthreshold: rms value which counts as sound, in dB
start: start time (0=start of sample)
Returns:
the time of the first silence, or None if no silence found
"""
idx = _npsnd.firstSilence(samples=self.samples,
threshold=threshold,
period=int(period*self.sr),
overlap=overlap,
soundthreshold=soundthreshold,
startidx=int(start*self.sr))
return idx/self.sr if idx is not None else None
[docs]
def addChannels(self, channels: np.ndarray | int) -> Self:
"""
Create a new Sample with added channels
Args:
channels: the audiodata of the new channels or the number
of empty channels to add (as integer). In the case of
passing audio data this new samples should have the
exact same number of frames as self
Returns:
a new Sample with the added channels. The returned Sample
will have the same duration as self
"""
if isinstance(channels, int):
channels = _silentFrames(self.numframes, channels)
else:
assert len(channels) == len(self)
frames = np.column_stack((self.samples, channels))
return self.__class__(frames, sr=self.sr)
[docs]
@staticmethod
def mix(samples: list[Sample],
offsets: list[float] | None = None,
gains: list[float] | None = None,
positions: list[float] | None = None
) -> Sample:
"""
Static method: mix the given samples down, optionally with a time offset
This is a static method. All samples should share the same
number of channels and sr
Args:
samples: the Samples to mix
offsets: if given, an offset in seconds for each sample
gains: if given, a gain for each sample
positions: if given, panning positions for each sample.
Returns:
the resulting Sample
Example::
>>> from maelzel.snd.audiosample import Sample
>>> a = Sample("stereo-2seconds.wav")
>>> b = Sample("stereo-3seconds.wav")
>>> m = Sample.mix([a, b], offsets=[2, 0])
>>> m.duration
4.0
"""
return mixSamples(samples, offsets=offsets, gains=gains, positions=positions)
[docs]
@staticmethod
def join(samples: Sequence[Sample]) -> Sample:
"""
Concatenate a sequence of Samples
Samples should share numchannels. If mismatching samplerates are found,
all samples are upsampled to the highest sr
Args:
samples: a seq. of Samples
Returns:
the concatenated samples as one Sample
"""
return concatenate(samples)
def broadcastSamplerate(samples: list[Sample]) -> list[Sample]:
"""
Match the samplerates audio samples to the highest one.
The audio sample with the lowest sr is resampled to the
higher one.
"""
assert all(isinstance(s, Sample) for s in samples)
sr = max(s.sr for s in samples)
return [s.resample(sr) for s in samples]
def _asNumpySamples(samples: Sample | np.ndarray) -> np.ndarray:
if isinstance(samples, Sample):
return samples.samples
elif isinstance(samples, np.ndarray):
return samples
else:
return np.asarray(samples, dtype=float)
def asSample(source: str | Sample | tuple[np.ndarray, int]) -> Sample:
"""
Return a Sample instance
Args:
source: a filename, a Sample or a tuple (samples, sr)
Returns:
a Sample. If already a Sample, it just returns it
"""
if isinstance(source, Sample):
return source
if isinstance(source, str):
return Sample(source)
if isinstance(source, tuple) and isinstance(source[0], np.ndarray):
samples, sr = source
return Sample(samples, sr)
else:
raise TypeError("can't convert source to Sample")
def matchSamplerates(sampleseq: Sequence[Sample], sr: int = 0, forcecopy=False) -> list[Sample]:
"""
Match the samplerates of the given Samples
Args:
sampleseq: a sequence of Sample instances
sr: the sr to use or None to use the highest samplerate of all samples
forcecopy: if True, a copy of the Sample is returned even if no resampling
is needed
Returns:
a list of Samples, where all Samples share the same samplerate.
Only samples which need to be resampled will be resampled. Sample
instances matching the used samplerate will be returned as is
"""
numchannels = sampleseq[0].numchannels
if any(s.numchannels != numchannels for s in sampleseq):
s = next(s for s in sampleseq if s.numchannels != numchannels)
raise ValueError(f"All samples should have {numchannels} channels, "
f"but one Sample has {s.numchannels} channels")
if not sr:
sr = max(s.sr for s in sampleseq)
if any(s.sr != sr for s in sampleseq):
sampleseq = [s.resample(sr) if s.sr != sr else s.copy() if forcecopy else s for s in sampleseq]
else:
sampleseq = list(sampleseq)
return sampleseq
def concatenate(sampleseq: Sequence[Sample]) -> Sample:
"""
Concatenate a sequence of Samples
Samples should share numchannels. If mismatching samplerates are found,
all samples are upsampled to the highest sr
Args:
sampleseq: a seq. of Samples
Returns:
the concatenated samples as one Sample
"""
s = np.concatenate([s.samples for s in matchSamplerates(sampleseq)])
return Sample(s, sampleseq[0].sr)
def _mapn_between(func, n: int, t0: float, t1: float) -> np.ndarray:
"""
Returns a numpy array of n-size, mapping func between t0-t1 at a rate of n/(t1-t0)
Args:
func: a callable of the form func(float) -> float, can be a bpf
(see https://bpf4.readthedocs.io)
"""
if hasattr(func, 'mapn_between'):
ys = func.mapn_between(n, t0, t1) # is it a Bpf?
else:
X = np.linspace(t0, t1, n)
ufunc = np.vectorize(func)
Y = ufunc(X)
return Y
return ys
def _silentFrames(numframes: int, channels: int) -> np.ndarray:
"""
Generate silent frames
Args:
numframes: the number of frames
channels: the number of channels
Returns:
a new numpy array with zeroed frames
"""
if channels == 1:
samples = np.zeros((numframes,), dtype=float)
else:
samples = np.zeros((numframes, channels), dtype=float)
return samples
def mixSamples(samples: list[Sample],
offsets: list[float] | None = None,
gains: list[float] | None = None,
positions: list[float] | None = None
) -> Sample:
"""
Mix the given samples down, optionally with a time offset
All samples should share the same number of channels and sr
Args:
samples: the Samples to mix
offsets: if given, an offset in seconds for each sample
gains: if given, a gain for each sample
positions: if given, panning positions for each sample (between 0 and 1)
This will force the output sample to be stereo. Multichannel audio
does not support panning
Returns:
the resulting Sample
Example::
>>> from maelzel.snd.audiosample import Sample
>>> a = Sample("stereo-2seconds.wav")
>>> b = Sample("stereo-3seconds.wav")
>>> m = Sample.mix([a, b], offsets=[2, 0])
>>> m.duration
4.0
"""
nchannels = max(s.numchannels for s in samples)
sr = samples[0].sr
if not all(s.sr == sr for s in samples):
raise ValueError(f"All samples should have the same samplerate, got {[s.sr for s in samples]}")
if offsets is None:
offsets = [0.] * len(samples)
else:
assert len(offsets) == len(samples)
if gains is None:
gains = [1.] * len(samples)
else:
assert len(gains) == len(samples)
if positions:
assert len(positions) == len(samples)
if nchannels > 2:
raise ValueError("Multichannel (> 2) samples are not supported with panning")
nchannels = 2
dur = max(s.duration + offset for s, offset in zip(samples, offsets))
numframes = int(dur * sr)
if nchannels == 1:
buf = np.zeros((numframes,), dtype=float)
else:
buf = np.zeros((numframes, nchannels), dtype=float)
for i in range(len(samples)):
s, gain, offset = samples[i], gains[i], offsets[i]
startframe = int(offset * sr)
endframe = startframe + len(s)
data = s.samples
if positions and nchannels == 2:
position = positions[i]
data = _npsnd.panStereo(data, position)
buf[startframe:endframe] += data
if gain != 1.0:
buf[startframe:endframe] *= gain
return Sample(buf, sr=sr)
def spectrumAt(samples: np.ndarray,
sr: int,
time: float,
resolution: float,
channel=0,
windowsize: float = -1,
mindb=-90,
minfreq=0,
maxfreq=12000,
maxcount=0
) -> list[tuple[float, float]]:
"""
Analyze sinusoidal components of these samples at the given time
Args:
samples: the samples, a 1D numpy array. If it is not contiguous it will
be made contiguous.
sr: the sample rate
time: the time to analyze
resolution: the resolution of the analysis, in hz
channel: if this sample has multiple channels, which channel to analyze
windowsize: the window size in hz
mindb: the min. amplitude in dB for a component to be included
minfreq: the min. frequency of a component to be included
maxfreq: the max. frequency of a component to be included
maxcount: the max. number of components to include (0 to include all)
Returns:
a list of pairs (frequency, amplitude) where each pair represents a sinusoidal
component of this sample at the given time. Amplitudes are in the range 0-1
"""
if _npsnd.numChannels(samples) > 1:
samples = _npsnd.getChannel(samples, channel)
resolutionperiod = 1 / resolution
margin = resolutionperiod * 4
starttime = max(0., time - margin)
duration = len(samples) / sr
endtime = min(time + margin, duration)
startsample = int(starttime * sr)
endsample = int(endtime * sr)
samples = samples[startsample:endsample]
samples = np.ascontiguousarray(samples)
try:
import loristrck.util
except ImportError:
raise ImportError("loristrck is needed to perform this operation. Install it via "
"'pip install loristrck'")
partials = loristrck.analyze(samples, sr=sr, resolution=resolution, windowsize=windowsize)
if minfreq is None:
minfreq = int(resolution * 1.3)
validpartials, rest = loristrck.util.select(partials, mindur=margin, minamp=mindb,
maxfreq=maxfreq, minfreq=minfreq)
breakpoints = loristrck.util.partials_at(validpartials, t=margin, maxcount=maxcount)
pairs = [(float(bp[0]), float(bp[1])) for bp in breakpoints]
pairs.sort(key=lambda pair: pair[0])
return pairs
def playSamples(samples: np.ndarray,
sr: int,
loop=False,
chan=1,
gain=1.0,
speed=1.0,
skip=0.0,
dur=0.0,
block=False
) -> PlaybackStream:
"""
Simple playback for samples
If more complex playback is needed, use ``Sample(samples, sr).play()``
Args:
samples: the samples to play
sr: sample rate
loop: should playback be looped?
chan: first channel to play to. For stereo samples, output
is routed to consecutive channels starting with this channel
gain: a gain modifier
speed: the playback speed. A variation in speed will change
the pitch accordingly.
skip: start playback at a given point in time
dur: duration of playback. 0 indicates to play until the end of the sample
block: if True, block execution until playback is finished
Returns:
a :class:`PlaybackStream`. This can be used to stop playback
See Also
~~~~~~~~
* :meth:`Sample.getEngine`
* :meth:`Sample.setEngine`
"""
numchannels = _npsnd.numChannels(samples)
mapping = list(range(chan, numchannels + chan))
if skip:
samples = samples[int(sr * skip):]
if dur:
samples = samples[:int(sr*dur)]
return _playSamples(samples=samples, mapping=mapping, sr=sr, loop=loop,
speed=speed, block=block, gain=gain)