from __future__ import annotations
import os
import itertools
import pitchtools as pt
import numpy as np
from maelzel.snd import audiosample
from maelzel.common import F, asF, F0, F1, asmidi
from maelzel.core import event
from maelzel.core.synthevent import SynthEvent
from maelzel.core.workspace import Workspace
from maelzel import _util
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from maelzel.common import time_t, pitch_t
from typing_extensions import Self
from maelzel.core.synthevent import PlayArgs
from maelzel import scoring
import csoundengine
from maelzel.scorestruct import ScoreStruct
from maelzel.core.config import CoreConfig
from maelzel.core.renderer import Renderer
__all__ = (
'Clip',
)
[docs]
class Clip(event.MEvent):
"""
A Clip represent an audio sample in time
It is possible to select a fragment from the source
Args:
source: the source of the clip (a filename, audiosample, samples as numpy array)
dur: the duration of the clip, in quarternotes. If not given, the duration will
be the duration of the source. If loop==True, then the duration **needs** to
be given
pitch: the pitch representation of this clip. It has no influence in the playback
itself, it is only for notation purposes
amp: the playback gain
offset: the time offset of this clip. Like in a Note, if not given,
the start time depends on the context (previous events) where this
clip is evaluated
startsecs: selection start time (in seconds)
endsecs: selection end time (in seconds). If 0., play until the end of the source
loop: if True, playback of this Clip should be looped
label: a label str to identify this clip
speed: playback speed of the clip
dynamic: allows to attach a dynamic expression to this Clip. This is
only for notation purposes, it does not modify playback
tied: this clip should be tied to the next one. This is only valid if the clips
share the same source (same soundfile or samples) and allows to automate
parameters such as playback speed or amplitude.
noteheadShape: the notehead shape to use for notation, one of 'normal', 'cross',
'harmonic', 'triangle', 'xcircle', 'rhombus', 'square', 'rectangle', 'slash', 'cluster'.
(see :ref:`config['show.clipNoteheadShape'] <config_show_clipnoteheadshape>`)
"""
_excludedPlayKeys = ('instr', 'args')
__slots__ = ('amp',
'selectionStartSecs',
'selectionEndSecs',
'source',
'soundfile',
'numChannels',
'channel',
'dynamic',
'pitch',
'sourceDurSecs',
'loop',
'noteheadShape',
'sr',
'_speed',
'_playbackMethod',
'_engine',
'_csoundTable',
'_sample',
'_durContext',
'_explicitDur',
'_cache'
)
def __init__(self,
source: str | audiosample.Sample | tuple[np.ndarray, int],
dur: time_t = None,
pitch: pitch_t = None,
amp: float = 1.,
offset: time_t = None,
startsecs: float | F = 0.,
endsecs: float | F = 0.,
channel: int | None = None,
loop=False,
speed: F | float = F1,
label='',
dynamic='',
tied=False,
noteheadShape=''
):
if source == '?':
from maelzel.core import _dialogs
selection = _dialogs.selectSndfileForOpen()
if selection is None or not selection:
raise ValueError("No source selected")
source = selection
if loop and dur is None:
raise ValueError(f"The duration of a looping Clip needs to be given (source: {source})")
self.soundfile = ''
"""The soundfile holding the audio data (if any)"""
self.numChannels = 0
"""The number of channels of this Clip"""
self.sr = 0
"""The samplerate of this Clip"""
self.sourceDurSecs: F = F0
"""The duration in seconds of the source of this Clip"""
self.tied = tied
"""Is this clip tied to the next?"""
self.loop = loop
"""Should this clip loop?"""
self.channel = channel
"""Which channel to load from soundfile. If None, all channels are loaded"""
self._playbackMethod = 'diskin'
"""One of 'diskin' | 'table' """
self._engine: csoundengine.Engine | None = None
"""Will be set to the playback engine when realtime has been initialized"""
self._csoundTable: int = 0
"""Will be filled during event initialization"""
self.noteheadShape = noteheadShape
"""The shape to use as notehead"""
self._explicitDur: F | None = None if dur is None else asF(dur)
self._cache = {}
if isinstance(source, audiosample.Sample):
source = (source.samples, source.sr)
if isinstance(source, str):
if not os.path.exists(source):
raise FileNotFoundError(f"Soundfile not found: '{source}'")
self.soundfile = source
import sndfileio
info = sndfileio.sndinfo(source)
self.sr = info.samplerate
self.sourceDurSecs = F(info.duration)
self.numChannels = info.channels if self.channel is None else 1
self._playbackMethod = 'diskin'
elif isinstance(source, tuple):
assert len(source) == 2 and isinstance(source[0], np.ndarray)
samples, sr = source
source = samples
self.sr = sr
self.sourceDurSecs = F(len(samples) / self.sr)
self.numChannels = 1 if len(samples.shape) == 1 else samples.shape[-1]
self._playbackMethod = 'table'
else:
raise TypeError(f"source should be either a str, a tuple (samples: np.ndarray, sr: int) "
f"or an audiosample.Sample, got {type(source)}")
self.source: str | np.ndarray = source
"""The source of this clip"""
self.amp: float = amp
"""An amplitude (gain) applied to this clip"""
self.dynamic: str = dynamic
"""A dynamic expression attached to the score representation of this clip"""
self.selectionStartSecs: F = asF(startsecs)
"""The start time of the selected fragment of the source media (in seconds)"""
self.selectionEndSecs: F = asF(endsecs if endsecs > 0 else self.sourceDurSecs)
"""The end time of the selected fragment of the source media (in seconds)"""
self._speed: F = asF(speed)
"""Playback speed"""
self._sample: audiosample.Sample | None = None
self._durContext: tuple[ScoreStruct, F] | None = None
if pitch:
pitchrepr = asmidi(pitch)
else:
pitchrepr = 0.
self.pitch: float = pitchrepr
"""The pitch representation of this clip.
This is used for notation purposes, it has no influence on the playback
of this clip"""
if offset is not None:
offset = asF(offset)
super().__init__(offset=offset, dur=F0, label=label)
self._calculateDuration()
def _readSamples(self) -> None:
if isinstance(self.source, np.ndarray):
pass
assert isinstance(self.source, str)
import sndfileio
samples, sr = sndfileio.sndread(self.source)
self.source = samples
[docs]
def samplesBetween(self, start: float, end: float) -> np.ndarray:
"""
Return the samples between start and end time
Args:
start: start time in seconds
end: end time in seconds
Returns:
the samples as a numpy array
"""
if isinstance(self.source, str):
import sndfileio
return sndfileio.sndread(self.source, start=start, end=end)[0]
else:
assert isinstance(self.source, np.ndarray)
return self.source[int(start * self.sr):int(end * self.sr)]
[docs]
def firstSound(self, threshold=-120) -> float | None:
if isinstance(self.source, str):
from maelzel.snd import sndfiletools
return sndfiletools.firstSound(self.source, threshold=threshold)
else:
from maelzel.snd import numpysnd
assert isinstance(self.source, np.ndarray)
samplenum = numpysnd.firstSound(self.source, threshold=threshold)
return None if samplenum is None else samplenum * self.sr
[docs]
def fundamentalPitch(self, dur=0.2, start=0., default=60.) -> float:
if start == 0:
start = self.firstSound()
samples = self.samplesBetween(start, start+dur*2)
f0 = audiosample.Sample(samples, sr=self.sr).fundamentalFreq(dur=dur)
return pt.f2m(f0) if f0 is not None else default
@property
def speed(self) -> F:
"""The playback speed"""
return self._speed
@speed.setter
def speed(self, speed: time_t):
self._speed = asF(speed)
@property
def name(self) -> str:
return f"Clip(source={self.source})"
[docs]
def copy(self) -> Self:
# We do not copy the parent attr
source = self.source if isinstance(self.source, str) else (self.source, self._sr)
out = self.__class__(source=source,
pitch=self.pitch,
amp=self.amp,
offset=self.offset,
label=self.label,
dynamic=self.dynamic,
startsecs=self.selectionStartSecs,
endsecs=self.selectionEndSecs,
channel=self.channel,
speed=self.speed,
loop=self.loop,
tied=self.tied)
self._copyAttributesTo(out)
return out
def __hash__(self):
source = self.source if isinstance(self.source, str) else id(self.source)
parts = (source, self.selectionStartSecs, self.selectionEndSecs, self.speed, self.sr,
self.numChannels, self.sourceDurSecs, self.amp,
self.dynamic, self.noteheadShape, self.pitch)
return hash(parts)
def __getitem__(self, item):
if isinstance(item, slice):
start = slice.start if slice.start is None else self.selectionStartSecs
end = slice.stop if slice.stop is None else self.selectionEndSecs
assert slice.step is None
source = self.source if isinstance(self.source, str) else (self.source, self._sr)
return Clip(source, startsecs=start, endsecs=end,
speed=self.speed, amp=self.amp, dynamic=self.dynamic)
else:
raise ValueError("Only a slice of the form clip[start:end] is supported")
[docs]
def asSample(self) -> audiosample.Sample:
"""
Return a :class:`maelzel.snd.audiosample.Sample` with the audio data of this Clip
Returns:
a Sample with the audio data of this Clip. The returned Sample is read-only.
Example
~~~~~~~
TODO
"""
if self._sample is not None:
return self._sample
if isinstance(self.source, np.ndarray):
return audiosample.Sample(self.source, sr=int(self.sr),
start=float(self.selectionStartSecs),
end=float(self.selectionEndSecs),
readonly=True)
elif isinstance(self.source, str):
sample = audiosample.Sample(self.source,
readonly=True,
engine=self._engine,
start=float(self.selectionStartSecs),
end=float(self.selectionEndSecs))
self._sample = sample
return sample
else:
raise TypeError(f"Wrong type for source!, got {type(self.source)}")
[docs]
def isRest(self) -> bool:
return False
[docs]
def durSecs(self) -> F:
return (self.selectionEndSecs - self.selectionStartSecs) / self.speed
[docs]
def pitchRange(self) -> tuple[float, float]:
if not self.pitch:
self.pitch = self.fundamentalPitch(default=60)
return (self.pitch, self.pitch)
@property
def dur(self) -> F:
"The duration of this Clip, in quarter notes"
if self._explicitDur:
return self._explicitDur
absoffset = self.absOffset()
struct = self.activeScorestruct()
if self._dur and self._durContext is not None:
cachedstruct, cachedbeat = self._durContext
if struct is cachedstruct and cachedbeat == absoffset:
return self._dur
self._calculateDuration(absoffset=absoffset, struct=struct)
return self._dur
def _calculateDuration(self, absoffset: F|None = None, struct: ScoreStruct|None = None
) -> None:
if absoffset is None:
absoffset = self.absOffset()
if struct is None:
struct = self.activeScorestruct()
dur = struct.beatDelta(absoffset, absoffset + self.durSecs())
self._dur = dur
self._durContext = (struct, absoffset)
def __repr__(self):
return (f"Clip(source={self.source}, "
f"numChannels={self.numChannels}, "
f"sr={self.sr}, "
f"dur={_util.showT(self.dur)}, "
f"sourcedur={_util.showT(self.sourceDurSecs)}s)")
def _synthEvents(self,
playargs: PlayArgs,
parentOffset: F,
workspace: Workspace,
) -> list[SynthEvent]:
skip = float(self.selectionStartSecs / self.speed)
scorestruct = workspace.scorestruct
reloffset = self.relOffset()
offset = reloffset + parentOffset
starttime = float(scorestruct.beatToTime(offset))
endtime = float(scorestruct.beatToTime(offset + self.dur))
amp = self.amp if self.amp is not None else 1.0
bps = [(starttime, self.pitch, amp),
(endtime, self.pitch, amp)]
if self.playargs:
playargs = playargs.updated(self.playargs)
if self._playbackMethod == 'diskin':
assert isinstance(self.source, str), f"The diskin playback method needs a path " \
f"as source, got {self.source}"
assert os.path.exists(self.soundfile)
args = {'ipath': self.soundfile,
'isndfilechan': -1 if self.channel is None else self.channel,
'kspeed': float(self.speed),
'iskip': skip,
'iwrap': 1 if self.loop else 0}
playargs = playargs.clone(instr='_clip_diskin', args=args)
elif self._playbackMethod == 'table':
args = {'isndtab': 0, # The table number will be filled later
'kspeed': float(self.speed),
'istart': skip,
'ixfade': -1 if not self.loop else 0.1}
playargs = playargs.clone(instr='_playtable', args=args)
else:
raise RuntimeError(f"Playback method {self._playbackMethod} not supported")
event = SynthEvent.fromPlayArgs(bps=bps,
playargs=playargs,
numchans=self.numChannels,
initfunc=self._initEvent)
if playargs.automations:
event.addAutomationsFromPlayArgs(playargs, scorestruct=scorestruct)
return [event]
def _initEvent(self, event: SynthEvent, renderer: Renderer) -> None:
if self._playbackMethod == 'table':
if not self._csoundTable:
if isinstance(self.source, np.ndarray):
self._csoundTable = renderer.makeTable(self.source, sr=int(self.sr)).tabnum
else:
assert os.path.exists(self.soundfile)
self._csoundTable = renderer.readSoundfile(self.soundfile)
event._ensureArgs()['isndtab'] = self._csoundTable
[docs]
def spectrum(self, resolution=50., windowsize=0., mindb=-90., hoptime=0., minfreq=0, maxfreq=0, start=0., end=0.):
"""
Analyze this sample to construct a partial tracking spectrum
Args:
resolution: analysis resolution in Hz
windowsize: the window size, in Hz. Normally higher than resolution, defaults
to the same as resolution
mindb: min. amplitude for any bin to be used for partial tracking
hoptime: time offset between analysis windows. For an overlap of n, use
1/(windowsize*n)
minfreq: partials with an average freq. lower than this will be discarded
maxfreq: partials with an average freq. higher than this will be discarded
start: start time of the analysis, in seconds
end: end time of the analysis, in seconds
Returns:
a :class:`maelzel.partialtracking.spectrum.Spectrum`
"""
if not windowsize:
windowsize = resolution
if hoptime == 0.:
hoptime = 1/(windowsize * 4)
if maxfreq == 0:
maxfreq = self.sr / 2
if end == 0:
end = float(self.sourceDurSecs)
cachekey = (resolution, windowsize, hoptime, minfreq, maxfreq, start, end)
if (subcache := self._cache.get('.spectrum')) is None:
self._cache['.spectrum'] = subcache = {}
elif spec := subcache.get(cachekey):
return spec
sample = self.asSample()
if end == 0:
end = sample.duration
elif end < 0:
end = sample.duration + end
else:
end = min(end, sample.duration)
assert end > start
if start > 0 or end < sample.duration:
sample = sample[start:end]
spec = sample.partialTrackingAnalysis(resolution=resolution, windowsize=windowsize, mindb=mindb, hoptime=hoptime)
if maxfreq:
partials = [p for p in spec.partials if minfreq <= p.meanfreq() < maxfreq]
spec = spec.__class__(partials)
if len(subcache) == 10: # Evict oldest cached spectrum
del subcache[next(iter(subcache.keys()))]
subcache[cachekey] = spec
return spec
[docs]
def chordsAt(self,
times: list[float],
resolution: float = 50,
windowsize=0.,
mindb=-90,
durs: list[time_t] | time_t | None = None,
maxcount=0,
ampfactor=1.,
maxfreq=2000,
minfreq=0.,
averageAmplitude=False
) -> list[event.Chord | event.Note]:
margin = 1/resolution * 8
start = max(0., times[0] - margin)
end = times[-1] + margin
if durs is None:
durations = [t1 - t0 for t0, t1 in itertools.pairwise(times)]
durations.append(durations[-1])
elif isinstance(durs, (list, tuple)):
durations = durs
else:
durations = [durs] * len(times)
spectrum = self.spectrum(resolution=resolution, mindb=mindb, windowsize=windowsize, start=start, end=end)
chords = []
minamp = pt.db2amp(mindb)
minfreq = max(minfreq, resolution * 0.8)
for i, time in enumerate(times):
eventdur = durations[i]
partials = spectrum.partialsBetween(start=time, end=time)
# eventdur = dur or (times[i+1] - time if i < len(times) - 1 else F(1))
if not partials:
chords.append(event.Rest(dur=eventdur))
else:
bps = [partial.at(time) for partial in partials]
bps = [bp for bp in bps if minfreq <= bp[0] < maxfreq and bp[1] > minamp]
if maxcount > 0:
bps.sort(key=lambda bp: bp[1], reverse=True)
bps = bps[:maxcount]
if not bps:
chords.append(event.Rest(dur=eventdur))
else:
components = [event.Note(pt.f2m(bp[0]), amp=bp[1] * ampfactor, properties={'bandwidth': bp[3]})
for bp in bps]
if averageAmplitude:
for note in components:
note.amp = 1.
chordamp = sum(bp[1] for bp in bps) / len(bps)
else:
chordamp = 1.
chord = event.Chord(components, dur=eventdur, amp=chordamp, properties={'time': time})
chords.append(chord)
return chords
[docs]
def chordAt(self,
time: float,
resolution: float = 50,
channel=0,
mindb=-90,
dur: time_t = None,
maxcount=0,
ampfactor=1.0,
maxfreq=20000,
minfreq=0
) -> event.Chord | None:
"""
Analyze the spectrum at the given time and extract the most relevant partials
A small fragment of the clip is analyzed and the most relevant sinusoidal
components at the given time are extracted and returned in the form of a
chord
Args:
time: the time to analyze. This is a time in seconds within the clip
resolution: the resolution of the analysis, in Hz
channel: which channel to analyze, for multichannel clips
mindb: the min. amplitude (in dB) for a component to be included
dur: the duration of the returned chord
maxcount: the max. number of components in the chord
ampfactor: an amplitude factor to apply to each component.
maxfreq: the max. frequency for a component to be included
minfreq: the min. frequency for a component to be included
Returns:
a Chord representing the spectrum of the clip at the given time, or None
if no components were found following the given restraints. None if no
components found which would satisfy the given conditions at the given time
"""
sample = self.asSample()
pairs = sample.spectrumAt(time, resolution=resolution, channel=channel,
mindb=mindb, maxcount=maxcount, maxfreq=maxfreq,
minfreq=minfreq)
if not pairs:
return None
components = [event.Note(pt.f2m(freq), amp=amp * ampfactor)
for freq, amp in pairs]
return event.Chord(components, dur=dur)
[docs]
def scoringEvents(self,
groupid='',
config: CoreConfig|None = None,
parentOffset: F|None = None
) -> list[scoring.Notation]:
if not config:
config = Workspace.active.config
offset = self.absOffset()
dur = self.dur
from maelzel import scoring
if not self.pitch:
self.pitch = self.fundamentalPitch(default=60)
notation = scoring.Notation.makeNote(pitch=self.pitch,
duration=dur,
offset=offset,
dynamic=self.dynamic,
gliss=bool(self.gliss))
if self.tied:
notation.tiedNext = True
if self.label:
labelsymbol = self._labelSymbol(self.label, config=config)
labelsymbol.applyToNotation(notation)
shape = self.noteheadShape if self.noteheadShape else config['show.clipNoteheadShape']
if shape:
notation.setNotehead(shape)
if self.symbols:
for symbol in self.symbols:
symbol.applyToNotation(notation, parent=self)
return [notation]