Source code for maelzel.core.synthevent

from __future__ import annotations

import math

import pitchtools as pt
from dataclasses import dataclass

from ._common import logger
from maelzel.common import F
from maelzel._util import hasoverlap
from maelzel.core import automation as _automation
from maelzel.core import presetmanager
import functools

from typing import TYPE_CHECKING, cast as _cast
if TYPE_CHECKING:
    import csoundengine.instr
    from maelzel.common import time_t, location_t, num_t
    from maelzel.core import renderer
    from typing import Any, Callable, Iterable, Sequence, TypeAlias
    from .config import CoreConfig
    from matplotlib.axes import Axes
    from maelzel.scorestruct import ScoreStruct
    breakpoint_t: TypeAlias = tuple[float, ...]
    from maelzel.core import presetdef
    from typing import TypeVar, ClassVar
    import numpy as np
    _T = TypeVar('_T')


__all__ = (
    'PlayArgs',
    'SynthEvent',
)


@functools.cache
def _csoundengineUseDynamicPfields() -> bool:
    import csoundengine
    return csoundengine.config['dynamic_pfields']


def _normalizeSynthValue(val) -> float | str:
    if isinstance(val, (float, str)):
        return val
    else:
        try:
            return float(val)
        except ValueError as e:
            raise ValueError(f"Could not convert {val} to a float to be used as argument"
                             f" to a synth event ({e=})")


[docs] class PlayArgs: """ Playback customizations for an event or a set of events Each :class:`~maelzel.core.mobj.MObj` has a :attr:`~maelzel.core.mobj.MObj.playargs` attribute, which is an instance of :class:`PlayArgs` and allows each object to set playback attributes like the instrument used, fade, pan position, etc. Each attribute set is added to the :attr:`PlayArgs.args` dict. PlayArgs cascade similarly to css. If a note sets a specific attribute, like 'instr' (the instrument used), then this value is used since it is the most specific. If the note leaves that unset but the note is contained within a :class:`~maelzel.core.mobj.Chain` and this ``Chain`` sets the 'instr' key within its own :attr:`~maelzel.core.mobj.MObj.playargs`, then this value is used. If that chain is contained within a :class:`~maelzel.core.score.Score` and the score itself has the 'instr' key set, then that value is used, etc. Fallback defaults are often defined in the :ref:`configuration <config>` Keys ~~~~ * delay: when to schedule this synth. This time is added to the absolute offset of an object * chan: the channel to output to. If the synth is multichannel this is the first of many adjacent channels (TODO: implement channel mappings or similar strategies for spatialization) * gain: an overall gain of the synth * fade: a fade value or a tuple (fadein, fadeout), in seconds * instr: the instrument preset to use * pitchinterpol: interpolation mode for pitch ('linear', 'cos', 'freqlinear', 'freqcos') * fadeshape: shape of the fade curve (TODO) * args: any args passed to the instr preset (a dict {'pname': value} * priority: the priority of this synth. Priorities start with 1, low priorities are evaluated first. An instr with a higher priority is used to receive audio from an instr with a lower priority * position: the horizontal placement inplace. 0=left, 1=right. For multichannel (> 2) presets this value is interpreted freely by the instrument, which does its own spatialization * sustain: if positive the last breakpoint is extended by this duration. This is used mainly for sample based instruments (soundfont) to extend the playback. It can be used to implement one-shot sample playback * transpose: add an extra transposition to all breakpoints * glisstime: slide time to next event. This allows to add glissando lines for events even if their gliss attr is not set, or to generate legato lines """ playkeys: ClassVar[set[str]] = { 'delay', 'chan', 'gain', 'fade', 'instr', 'pitchinterpol', 'fadeshape', 'args', 'priority', 'position', 'sustain', 'transpose', 'glisstime', 'skip', 'end', 'linkednext'} """Available keys for playback customization""" __slots__ = ('db', 'automations') def __init__(self, db: dict[str, Any] | None = None, automations: list[_automation.Automation] | None = None): self.db: dict[str, Any] = db if db is not None else {} """A dictionary holding the arguments explicitely specified""" self.automations: list[_automation.Automation] | None = automations """A list of Automations""" def _check(self) -> None: db = self.db if db.keys() - self.playkeys: raise ValueError(f"Invalid keys present: diff={db.keys() - self.playkeys}") if any(v is None for v in db.values()): raise ValueError(f"Values passed should not be None: " f"{[k for k, v in db.items() if v is None]}")
[docs] def setArgs(self, **kws: float | str) -> None: """ Set one or multiple values for the parameters passed to a preset Args: **kws: any keyword parameter passed to an instrument preset. They are not checked """ args: dict | None = self.db.get('args') if args is None: self.db['args'] = kws else: args.update(kws)
def _checkArgs(self) -> None: args = self.args if args: assert all(arg not in PlayArgs.playkeys for arg in args), f"{self=}" @property def args(self) -> dict | None: return self.db.get('args') def __bool__(self): return bool(self.db) or bool(self.automations)
[docs] def keys(self) -> set[str]: """All possible keys for a PlayArgs instance This is not the equivalent of the actual set keys (see ``playargs.args.keys()``)""" return self.playkeys
[docs] def values(self) -> Iterable: """ The values corresponding to all possible keys This might contain unset values. For only the actually set values, use ``playargs.args.values()``""" db = self.db return (db.get(k) for k in self.playkeys)
[docs] def items(self) -> dict[str, Any]: """Like dict.items()""" db = self.db return {k: db.get(k) for k in self.playkeys}
[docs] def get(self, key: str, default: _T) -> _T: """Like dict.get(), but requieres a default value""" if key not in self.playkeys: raise KeyError(f"Unknown key {key}. Possible keys are: {self.playkeys}") return self.db.get(key, default)
def __getitem__(self, item: str): return self.db[item] def __setitem__(self, key: str, value) -> None: if key not in self.playkeys: raise KeyError(f'PlayArgs: unknown key "{key}", possible keys: {self.playkeys}') if value is None: del self.db[key] else: self.db[key] = value
[docs] def linkedNext(self) -> bool: return self.db.get('glisstime', 0.) > 0.
[docs] def addAutomation(self, param: str, breakpoints: list[tuple[time_t | location_t, float]] | list[tuple[time_t|location_t, float, str]] | list[num_t], interpolation='linear', relative=True) -> None: breakpoints = _automation.Automation.normalizeBreakpoints(breakpoints, interpolation=interpolation) # type: ignore if self.automations is None: self.automations = [] self.automations.append(_automation.Automation(param=param, breakpoints=breakpoints, relative=relative)) # type: ignore
@staticmethod def _updatedb(db: dict, other: dict) -> None: args = db.get('args') otherargs = other.get('args') db.update(other) if args: db['args'] = args if not otherargs else args | otherargs elif otherargs: db['args'] = otherargs def _updateAutomations(self, automations: list[_automation.Automation]) -> None: if self.automations is None: self.automations = automations.copy() else: merged: dict[str, _automation.Automation] = {autom.param: autom for autom in self.automations} for automation in automations: merged[automation.param] = automation self.automations = list(merged.values())
[docs] def updated(self, other: PlayArgs, automations=True) -> PlayArgs: """ A copy of self overwritten by other Args: other: the playargs to update self with automations: if True, include automations in the update Returns: a copy of self updated by other """ out = self.copy() PlayArgs._updatedb(out.db, other.db) if automations and other.automations: out._updateAutomations(other.automations) return out
[docs] def copy(self) -> PlayArgs: """ Returns a copy of self """ if not self.db and not self.automations: return PlayArgs({}) db = self.db.copy() if (args := self.db.get('args')) is not None: db['args'] = args.copy() return PlayArgs(db=db, automations=self.automations)
[docs] def clone(self, **kws) -> PlayArgs: """ Clone self with modifications Args: **kws: one of the possible playkeys Returns: the cloned PlayArgs """ out = self.copy() PlayArgs._updatedb(out.db, kws) return out
def __repr__(self): items = [f'{k}={v}' for k, v in self.db.items()] if self.automations: items.append(f'automations={self.automations}') args = ', '.join(items) return f"PlayArgs({args})"
[docs] @staticmethod def makeDefault(conf: CoreConfig, copy=True) -> PlayArgs: """ Create a PlayArgs with defaults from a CoreConfig Args: conf: a CoreConfig copy: if T Returns: the created PlayArgs """ d = conf._makeDefaultPlayArgsDict(copy=copy) return PlayArgs(d)
[docs] def fillWith(self, other: PlayArgs) -> None: """ Fill any unset value in self with the value in other **inplace** Args: other: another PlayArgs """ otherdb = other.db db = self.db for k, v in otherdb.items(): if v is not None and k != 'args': db[k] = db.get(k, v) otherargs = otherdb.get('args') if otherargs: if args := db.get('args'): args.update(otherargs) else: db['args'] = otherargs.copy()
[docs] def update(self, d: dict[str, Any]) -> None: PlayArgs._updatedb(self.db, d)
[docs] def makeSynthAutomations(self, scorestruct: ScoreStruct, parentOffset: F, ) -> list[_automation.SynthAutomation]: if not self.automations: return [] return [automation.makeSynthAutomation(scorestruct=scorestruct, parentOffset=parentOffset) for automation in self.automations]
def cropBreakpoints(bps: list[breakpoint_t], start: float, end: float ) -> list[breakpoint_t]: """ Crop the breakpoints at time t Args: bps: the breakpoints start: the time to start cropping end: the time to end cropping Returns: the cropped breakpoints """ if not 0 <= start <= end: raise ValueError(f"Invalid crop times: {start=}, {end=}") time0 = bps[0][0] assert time0 == 0 if start <= time0 and (end == 0 or end > bps[-1][0]): return bps if end < start: raise ValueError(f"Invalid crop range, {start=}, {end=} (end < start)") newbps = [] for i, bp in enumerate(bps): bptime = bp[0] if bptime < start: continue elif bptime > start and i > 0: # there is part of a breakpoint before, need to interpolate bp2 = _interpolateBreakpoints(start, bps[i-1], bp) newbps.append(bp2) if bptime <= end: newbps.append(bp) else: bp2 = _interpolateBreakpoints(end, bps[i - 1], bp) newbps.append(bp2) break return newbps def _interpolateBreakpoints(t: num_t, bp0: Sequence[num_t], bp1: Sequence[num_t] ) -> list[float]: t0, t1 = bp0[0], bp1[0] if not t0 <= t <= t1: raise ValueError(f"Invalid breakpoint: {t0=}, {t=}, {t1=}, {bp0=}, {bp1=}") delta = (t - t0) / (t1 - t0) bp = [float(t)] for v0, v1 in zip(bp0[1:], bp1[1:]): bp.append(float(v0 + (v1-v0)*delta)) return bp @dataclass class _AutomationSegment: """ Instances of this class are used to gather changes in dynamic parameters when merging multiple SynthEvents. Dynamic parameters are either builtin-in playback arguments, like position, or instrument defined parameters. A user never creates automation segments: these are created when multiple events are merged within a chain/voice. Each event within a chain/voice generates synth events; these are sorted into lines of subsequent synthevents, which are merged into one synthevent. Changes to pitch and amplitude are represented as breakpoints and modulations of any dynamic parameter are collected as automation segments """ param: str """The parameter to automate (either a builtin synth param or an instr param)""" time: float """The time (in seconds), relative to the beginning of the event""" value: float """The new value of the parameter""" pretime: float | None = None """The previous time, relative to the beginning of the event""" prevalue: float | None = None """The previous value, None for a """ kind: str = 'normal' """One of 'normal', 'arg' """
[docs] class SynthEvent: """ Represents a standard event (a line of variable breakpoints) A User never creates a :class:`SynthEvent`: a :class:`SynthEvent` is created by a :class:`Note` or a :class:`Voice`. They are used internally to generate a set of events to be played/recorded by the playback engine. """ __slots__ = ("bps", "delay", "chan", "fadein", "fadeout", "gain", "instr", "pitchinterpol", "fadeshape", "args", "kws", "priority", "position", "linkednext", "numchans", "whenfinished", "properties", 'sustain', 'automationSegments', 'automations', 'initfunc', '_initdone') dynamicAttributes = ( 'position', 'gain' ) """Attributes which can change within merged events""" staticAttributes = ( 'chan', 'priority', 'numchans', ) pitchinterpolToInt = { 'linear': 0, 'cos': 1, 'freqlinear': 2, 'freqcos': 3 } """Map an interpolation shape to an identifier used inside csound""" fadeshapeToInt = { 'linear': 0, 'cos': 1, 'scurve': 2, } """Map a fadeshape to an identifier used inside csound""" def __init__(self, bps: list[breakpoint_t], instr: str, delay: float = 0.0, chan: int = 1, fade: float | tuple[float, float] = 0, gain: float = 1.0, pitchinterpol: str = 'linear', fadeshape: str = 'cos', args: dict[str, float | str] | None = None, priority: int = 1, position: float = -1, numchans: int = 2, linkednext=False, whenfinished: Callable | None = None, properties: dict[str, Any] | None = None, sustain: float = 0., initfunc: Callable[[SynthEvent, renderer.Renderer], None] | None = None, # **kws ): """ bps (breakpoints): a seq of (delay, midi, amp, ...) of len >= 1. Args: bps: breakpoints, where each breakpoint is a tuple of (timeoffset, midi, amp, [...]). delay: time delay. The effective time of bp[n] will be delay + bp[n][0] chan: output channel fade: fade time (either a single value or a tuple (fadein, fadeout) gain: a gain to be applied to this event instr: the instr preset pitchinterpol: which interpolation to use for pitch ('linear', 'cos', 'freqlinear', 'freqcos') fadeshape: shape of the fade ('linear', 'cos', 'scurve') args: named parameters priority: schedule the corresponding instr at this priority numchans: the number of channels this event outputs linkednext: a hint to merge multiple events into longer lines. """ if len(bps[0]) < 2: raise ValueError(f"A breakpoint should have at least (delay, pitch), " f"but got {bps}") bpslen = len(bps[0]) if any(len(bp) != bpslen for bp in bps): raise ValueError("Not all breakpoints have the same length") if len(bps[0]) < 3: raise ValueError("A breakpoint needs to have at least (time, pitch, amp)") if pitchinterpol not in self.pitchinterpolToInt: raise ValueError(f"pitchinterpol should be one of {list(self.pitchinterpolToInt.keys())}, " f"got {pitchinterpol}") if fadeshape not in self.fadeshapeToInt: raise ValueError(f"fadeshape should be one of {list(self.fadeshapeToInt.keys())}") if position < 0: position = 0 if numchans == 1 else 0.5 delay = float(delay) if isinstance(fade, tuple): fadein, fadeout = fade else: fadein = fadeout = fade self.bps: list[breakpoint_t] = bps """breakpoints, where each breakpoint is a list of [timeoffset, midi, amp, [...]]""" dur = self.bps[-1][0] - self.bps[0][0] self.delay: float = delay """time delay - The effective time of bp[n] will be delay + bp[n][0]""" self.chan: int = chan """output channel""" self.gain: float = gain """a gain to be applied to this event""" self.fadein: float = fadein """fade in time""" self.fadeout: float = fadeout if dur < 0 else min(fadeout, dur) """fade out time""" self.instr: str = instr """Instrument preset used""" self.pitchinterpol: str = pitchinterpol """Pitch interpolation""" self.fadeshape: str = fadeshape """Shape of the fades""" self.priority: int = priority """Schedule priority (priorities start with 1)""" self.position: float = position """Panning position (between 0-1)""" self.args: dict[str, float | str] | None = args """Any parameters passed to the instrument. Can be None""" # self.kws: dict[str, float | str] | None = kws # """Ignored at the moment""" self.linkednext: bool = linkednext """Is this event linked to the next? A linked synthevent is a tied note or a note with a glissando followed by some continuation. In any case, the last breakpoint of this synthevent and the first breakpoint of the following event should be equal for a two events to be linked. NB: since we are dealing with floats, code should always check that the numbers are near instead of using == """ self.numchans: int = numchans """The number of signals produced by the event""" self.whenfinished: Callable | None = whenfinished """A function to call when this event has finished""" self.properties: dict[str, Any] | None = properties """User defined properties for an event""" self.sustain: float = sustain """Sustain time after the actual duration""" self.automations: list[_automation.SynthAutomation] | None = None """A list of SynthAutomation. This keeps track of any automation for this event, both automation lines and single set events. Add automation via .addAutomation""" self.automationSegments: list[_AutomationSegment] | None = None """List of automation points These are created when multiple events are merged into one. The dynamic parameters of the subsequent events are gathered as automation points.""" self.initfunc = initfunc """A function called when the event is being scheduled. It has the form (synthevent, renderer) -> None, where synthevent is the event being rendered and renderer is the renderer performing the render (either a maelzel.core.playback.RealtimeRenderer or a maelzel.core.playback.OfflineRenderer). It can be used to initialize any resources needed by the event (load/make tables, add includes, global code, etc)""" self._initdone = False self._consolidateDelay() if self.dur <= 0: raise ValueError(f"Duration of a synth event must be possitive: {self}")
[docs] def getPreset(self) -> presetdef.PresetDef: return presetmanager.presetManager.getPreset(self.instr)
def _ensureArgs(self) -> dict[str, float | str]: if self.args is None: self.args = {} return self.args
[docs] def paramValue(self, param: str): instr = self.getInstr() param2 = instr.unaliasParam(param, param) if self.args and param2 in self.args: return self.args[param2] defaults = instr.paramDefaultValues() value = defaults.get(param) if value is None: raise KeyError(f"Unknown parameter '{param}', " f"possible parameters: {defaults.keys()}") return value
[docs] def getInstr(self) -> csoundengine.instr.Instr: return self.getPreset().getInstr()
[docs] def initialize(self, renderer: renderer.Renderer) -> None: if not self._initdone and self.initfunc: self.initfunc(self, renderer)
def _applySustain(self) -> None: if self.linkednext and self.sustain: logger.debug(f"A linked event cannot have sustain ({self=}") return if self.sustain > 0: last = self.bps[-1] bp = (last[0] + self.sustain, *last[1:]) self.bps.append(bp) elif self.sustain < 0: self.crop(0., self.dur + self.sustain) self.sustain = 0 @property def start(self) -> float: return self.delay @property def end(self) -> float: """Absolute end time of this event, in seconds""" return self.delay + self.bps[-1][0] @property def dur(self) -> float: """Duration of this event, in seconds""" if not self.bps: return 0 return float(self.bps[-1][0] - self.bps[0][0])
[docs] def resolvedPosition(self) -> float: if self.position >= 0: return self.position if self.numchans == 1: return 0. else: return 0.5
[docs] def clone(self, **kws) -> SynthEvent: out = self.copy() for k, v in kws.items(): setattr(out, k, v) if out.bps[0][0] != 0: out._consolidateDelay() return out
[docs] def copy(self) -> SynthEvent: return SynthEvent(bps=self.bps.copy(), delay=self.delay, chan=self.chan, fade=self.fade, gain=self.gain, instr=self.instr, pitchinterpol=self.pitchinterpol, fadeshape=self.fadeshape, args=None if not self.args else self.args.copy(), priority=self.priority, position=self.position, numchans=self.numchans, linkednext=self.linkednext, whenfinished=self.whenfinished, properties=self.properties.copy() if self.properties else None)
@property def fade(self) -> tuple[float, float]: """A tuple (fadein, fadeout)""" return self.fadein, self.fadeout @fade.setter def fade(self, value: tuple[float, float]): self.fadein, self.fadeout = value
[docs] def addAutomation(self, automation: _automation.SynthAutomation): if self.automations is None: self.automations = [] self.automations.append(automation)
[docs] def automate(self, param: int | str, pairs: Sequence[float] | np.ndarray, interpolation="linear", delay=0., overtake=False, ) -> None: automation = _automation.SynthAutomation(param=param, data=pairs, delay=delay, interpolation=interpolation, overtake=overtake) self.addAutomation(automation)
[docs] def set(self, param: str, value: float, delay=0.) -> None: automation = _automation.SynthAutomation(param=param, data=[0, value], delay=delay) self.addAutomation(automation)
[docs] def addAutomationsFromPlayArgs(self, playargs: PlayArgs, scorestruct: ScoreStruct) -> None: if not playargs.automations: return offset = scorestruct.timeToBeat(self.delay + self.bps[0][0]) automations = playargs.makeSynthAutomations(scorestruct=scorestruct, parentOffset=offset) for automation in automations: self.addAutomation(automation)
[docs] @classmethod def fromPlayArgs(cls, bps: list[breakpoint_t], playargs: PlayArgs, properties: dict[str, Any] | None = None, **kws ) -> SynthEvent: """ Construct a SynthEvent from breakpoints and playargs .. note:: This method does not transfer any automations from the playargs to the created SynthEvent. Automations can be transferred via :meth:`SynthEvent.addAutomationsFromPlayArgs` Args: bps: the breakpoints playargs: playargs properties: any properties passed to the constructor kws: any keyword accepted by SynthEvent Returns: a new SynthEvent """ assert playargs.db is not None db = playargs.db | kws if kws else playargs.db.copy() linkednext = db.pop('linkednext', False) or db.get('glisstime') is not None for k in ('transpose', 'glisstime', 'end'): db.pop(k, None) instr = db.pop('instr') return SynthEvent(bps=bps, instr=instr, properties=properties, linkednext=linkednext, **db)
def _consolidateDelay(self) -> None: delay0 = self.bps[0][0] assert all(isinstance(bp, tuple) for bp in self.bps) if delay0 > 0: self.delay += delay0 self.bps = [(bp[0] - delay0,) + bp[1:] for bp in self.bps] assert self.bps[0][0] == 0 def _applyTimeFactor(self, timefactor: float) -> None: if timefactor == 1: return self.delay *= timefactor for bp in self.bps: bp[0] *= timefactor
[docs] def shiftInPlace(self, offset: float, crop=True) -> None: """ Shift the times of this event, in place Args: offset: the offset to add crop: allow cropping if the given offset results in negative delay """ if offset == 0: return delay = self.delay + offset assert self.bps[0][0] == 0. if delay >= 0: self.delay = delay elif not crop: raise ValueError(f"Cannot shift to negative time without cropping ({self=})") else: self.crop(-delay, math.inf) self.delay = 0 self._consolidateDelay()
[docs] def shifted(self, offset: float, crop=True) -> SynthEvent: """ A clone of this event, shifted in time by the given offset Args: offset: the offset to add crop: allow cropping if the given offset results in negative delay Returns: the resulting event """ out = self.copy() out.shiftInPlace(offset=offset, crop=crop) return out
[docs] def crop(self, start: float, end: float) -> None: """ Crop this event in place Args: start: start time, in seconds end: end time, in seconds """ assert self.bps[0][0] == 0 start = max(0., start - self.delay) end = max(start, end - self.delay) bps = cropBreakpoints(self.bps, start, end) self.bps = bps self._consolidateDelay()
[docs] def cropped(self, start: float, end: float) -> SynthEvent: """ Return a cropped version of this SynthEvent """ out = self.copy() out.crop(start, end) return out # ------ TODO: remove this code start = max(start - self.delay, 0) end -= self.delay if end - start <= 0: raise ValueError(f"Invalid crop: the end time ({end}) should lie before " f"the start time ({start})") out = [] for i in range(len(self.bps)): bp: list[float] = self.bps[i] t = bp[0] if t < start: if i < len(self.bps)-1 and start < self.bps[i + 1][0]: bpi = _interpolateBreakpoints(start, bp, self.bps[i + 1]) out.append(bpi) elif start <= t < end: out.append(bp.copy()) if i < len(self.bps) - 1 and end <= self.bps[i+1][0]: bp2 = _interpolateBreakpoints(end, bp, self.bps[i+1]) out.append(bp2) elif t > end: break return self.clone(bps=out)
[docs] def breakpointSize(self) -> int: """ Returns the number of breakpoints in this SynthEvent """ return len(self.bps[0])
[docs] @staticmethod def dumpEvents(events: Sequence[SynthEvent]): rows = [] for event in events: row = [f"{event.delay:.3f}", f"{event.dur:.3f}{'~' if event.linkednext else ''}", event.instr, event.chan] def bprepr(bp): parts = [f"{bp[0]:2.6}s"] + [f"{b:.6g}" for b in bp[1:]] return " ".join(parts) if len(event.bps) == 2 and event.bps[0][1:] == event.bps[1][1:]: t, pitch, amp = event.bps[0] row.append(f"{pitch:.4g} {pt.amp2db(amp):.1f}dB") elif len(event.bps) <= 3: bps = "; ".join(bprepr(bp) for bp in event.bps) row.append(bps) else: pre = "; ".join(bprepr(bp) for bp in event.bps[:2]) post = "; ".join(bprepr(bp) for bp in event.bps[-2:]) row.append(f"{pre}{post}") rows.append(row) import emlib.misc emlib.misc.print_table(rows, headers=("delay", "dur", "instr", "chan", "bps"))
def _repr_html_(self) -> str: rows = [[f"{bp[0] + self.delay:.3f}", f"{bp[0]:.3f}"] + ["%.6g" % x for x in bp[1:]] for bp in self.bps] headers = ["Abs time", "0. Rel. time", "1. Pitch", "2. Amp"] bplen = len(self.bps[0]) if bplen > 3: headers += [str(i) for i in range(4, bplen+1)] import emlib.misc htmltab = emlib.misc.html_table(rows, headers=headers) return f"SynthEvent({self._reprInfo()})<br>" + htmltab def _reprInfo(self) -> str: info = [f"delay={float(self.delay):.3g}, dur={self.dur:.3g}, " f"instr={self.instr}, " f"gain={self.gain:.4g}, chan={self.chan}" f", fade=({self.fadein:.5g}, {self.fadeout:.5g})"] if self.linkednext: info.append('linkednext=True') if self.args: info.append(f"args={self.args}") if self.sustain: info.append(f"sustain={self.sustain}") if self.position is not None and self.position >= 0: info.append(f"position={self.position}") if self.automationSegments: info.append(f'automationSegments={self.automationSegments}') if self.automations: info.append(f'automations={self.automations}') infostr = ", ".join(info) return infostr def __repr__(self) -> str: info = self._reprInfo() if len(self.bps) <= 3: def bprepr3(bp): parts = [f"{bp[0]:2.6}s"] + [f"{b:.6g}" for b in bp[1:]] return " ".join(parts) bps = "; ".join([bprepr3(bp) for bp in self.bps]) return f"SynthEvent({info}, bps=‹{bps}›)" else: lines = [f"SynthEvent({info})"] def bpline(bp): rest = " ".join(("%.6g" % b).ljust(8) if isinstance(b, float) else str(b) for b in bp[1:]) return f"{float(bp[0]):7.6g}s: {rest}" for i, bp in enumerate(self.bps): if i == 0: lines.append(f"bps {bpline(bp)}") else: lines.append(f" {bpline(bp)}") lines.append("") return "\n".join(lines)
[docs] @staticmethod def cropEvents(events: list[SynthEvent], start=0., end=math.inf ) -> list[SynthEvent]: """ Crop the events at the given time slice (staticmethod) Removes any event or part of an event outside the time slice start:end Args: events: the events to crop start: start of the time slice (None will only crop at the end) end: end of the time slice (None will only crop at the beginning) Returns: the cropped events """ if start >= end: return [] assert 0 <= start <= end, f"{start=}, {end=}" out = [] for event in events: if start <= event.delay and end >= event.end: out.append(event) elif hasoverlap(start, end, event.delay, event.end): out.append(event.cropped(start, end)) return out
[docs] @staticmethod def plotEvents(events: list[SynthEvent], axes: Axes | None = None, notenames=False, linewidth=1. ) -> Axes: """ Plot all given events within the same axes (static method) Args: events: the events to plot axes: the matplotlib Axes to use, if given notenames: if True, use notenames for the y axes linewidth: width to use between breakpoints Returns: the axes used Example ~~~~~~~ >>> from maelzel.core import * >>> from maelzel.core import synthevent >>> chord = Chord("4E 4G# 4B", 2, gliss="4Eb 4F 4G") >>> synthevent.plotEvents(chord.synthEvents(), notenames=True) """ import matplotlib.pyplot as plt import matplotlib.ticker if axes is None: # f: plt.Figure = plt.figure(figsize=figsize) f = plt.figure() axes = f.add_subplot(1, 1, 1) for event in events: event.plot(axes=axes, notenames=False, linewidth=linewidth) axes.grid() if notenames: noteformatter = matplotlib.ticker.FuncFormatter(lambda s, y: f'{str(s).ljust(3)}: {pt.m2n(s)}') axes.yaxis.set_major_formatter(noteformatter) axes.tick_params(axis='y', labelsize=8) return axes
[docs] @staticmethod def mergeEvents(events: Sequence[SynthEvent]) -> SynthEvent: """ Static method to merge events which are linked (tied, gliss) Args: events: the events to merge Returns: the merged event """ return mergeEvents(events)
def _flatBreakpoints(self) -> list[float]: out = [] for bp in self.bps: out.extend(bp) return out def _resolveParams(self: SynthEvent, instr: csoundengine.instr.Instr ) -> tuple[list[float | str], dict[str, float|str]]: """ Resolves the values for pfields and dynamic params Does the same as _resolveParamsGeneric, but ~8 times faster, at the cost of having the arguments hard-coded Args: instr: the actual Instr, corresponding to the name in self.instr Returns: a tuple (pfields5, dynargs), where pfields5 are the pfields starting at p5 (a list of float|str) and dynargs is a dict of dynamic parameters """ numNamedPfields = instr.numPfields() # if not self.linkednext and self.sustain > 0: if self.sustain > 0: self._applySustain() dynargs: dict[str, float|str] # |kpos, kgain, idataidx_, inumbps, ibplen, ichan, ifadein, ifadeout, ipchintrp_, ifadekind| if _csoundengineUseDynamicPfields(): # pfields are also used for dynamic (k) arguments pfields5: list[float | str] = [ self.position, # kpos self.gain, # kgain numNamedPfields + 5, # idataidx_ len(self.bps), # inumbps self.breakpointSize(), # ibplen self.chan, # ichan self.fadein, # ifadein self.fadeout, # ifadeout SynthEvent.pitchinterpolToInt[self.pitchinterpol], # ipchintrp_ SynthEvent.fadeshapeToInt[self.fadeshape] # ifadekind ] if self.args: dynargs = {arg: _normalizeSynthValue(val) for arg, val in self.args.items()} else: dynargs = {} else: pfields5: list[float | str] = [ numNamedPfields + 5, # idataidx_ len(self.bps), # inumbps self.breakpointSize(), # ibplen self.chan, # ichan self.fadein, # ifadein self.fadeout, # ifadeout SynthEvent.pitchinterpolToInt[self.pitchinterpol], # ipchintrp_ SynthEvent.fadeshapeToInt[self.fadeshape] # ifadekind ] dynargs = {'kpos': self.position, 'kgain': self.gain} if self.args: assert all(isinstance(value, float) for value in self.args.values()) dynargs |= self.args instrdefaults = instr.defaultPfieldValues() pfields5.extend(instrdefaults[len(pfields5):]) pfields5.extend(self._flatBreakpoints()) return pfields5, dynargs def _resolveParamsGeneric(self: SynthEvent, instr: csoundengine.instr.Instr ) -> tuple[list[float|str], dict[str, float | str]]: """ Resolves the values for pfields and dynamic params This is not used. It is here as a reference, since _resolveParams is used instead, which is faster. Args: instr: the actual Instr, corresponding to the name in self.instr Returns: a tuple (pfields5, dynargs), where pfields5 are the pfields starting at p5 (a list of float|str) and dynargs is a dict of dynamic parameters """ numNamedPfields = instr.numPfields() if not self.linkednext and self.sustain > 0: self._applySustain() # |kpos, idataidx_, inumbps, ibplen, igain, ichan, ifadein, ifadeout, ipchintrp_, ifadekind| pfields = { 'idataidx_': numNamedPfields + 5, 'inumbps': len(self.bps), 'ibplen': self.breakpointSize(), 'igain': self.gain, 'ichan': self.chan, 'ifadein': self.fadein, 'ifadeout': self.fadeout, 'ipchintrp_': SynthEvent.pitchinterpolToInt[self.pitchinterpol], 'ifadekind': SynthEvent.fadeshapeToInt[self.fadeshape] } dynargs: dict[str, float|str] = {'kpos': self.position, 'kgain': self.gain} if self.args: dynargs |= self.args pfields5, kwargs = instr.parseSchedArgs(args=pfields, kws=dynargs) pfields5.extend(self._flatBreakpoints()) assert isinstance(kwargs, dict) return pfields5, kwargs
[docs] def plot(self, axes: Axes | None = None, notenames=False, linewidth=1.) -> Axes: """ Plot the trajectory of this synthevent Args: axes: a matplotlib.pyplot.Axes, will be used if given notenames: if True, use notenames for the y axes linewidth: linewidth used for plotting Returns: the axes used """ import matplotlib.pyplot as plt import matplotlib.ticker ownaxes = axes is None if axes is None: # f: plt.Figure = plt.figure(figsize=figsize) f = plt.figure() axes = f.add_subplot(1, 1, 1) t0 = self.delay times = [t0 + bp[0] for bp in self.bps] midis = [bp[1] for bp in self.bps] if notenames: noteformatter = matplotlib.ticker.FuncFormatter(lambda s, y: f'{str(s).ljust(3)}: {pt.m2n(s)}') axes.yaxis.set_major_formatter(noteformatter) axes.tick_params(axis='y', labelsize=8) if ownaxes: axes.grid() axes.plot(times, midis, linewidth=linewidth) return axes
def mergeEvents(events: Sequence[SynthEvent], checkStaticAttributes=True ) -> SynthEvent: """ Merge linked events Two events are linked if the first event has its `.linkednext` attribute set as True and the last breakpoint of the first event is equal to the first breakpoint of the second. This is used within a Chain or Voice to join the playback events of multiple chords/notes to single synthevents. Since all breakpoints are merged into one event, any values regarding dynamic parameters (position, instr parameters, …) would be lost. These are kept as automation points. .. note:: raises ValueError if the events cannot be merged Args: events: the events to merge checkStaticAttributes: check if linked events modify attributes which are static (like gain or chann) show a warning if this is the case Returns: the merged event """ assert len(events) >= 2 assert all(ev.linkednext for ev in events[:-1]), f"Cannot merge events not marked as linked: {events}" assert all(ev.bps[0][0] == 0 for ev in events) firstevent = events[0] bps = [] eps = 1.e-10 firstdelay = firstevent.delay now = firstevent.delay for event in events: gap = abs(event.delay - now) if gap > eps: logger.error(f"Misaligned events:\n{event=}\nend of last event: {now}\n{events=}") raise ValueError(f"Trying to merge events. Event starting at {event.delay} is not " f"aligned with the end of last event " f"({now=}, {gap=}, {event=}") assert event.bps[0][0] == 0 now = event.bps[-1][0] + event.delay for bp in event.bps[:-1]: bp = (bp[0] + event.delay - firstdelay, *bp[1:]) bps.append(bp) # Add the last breakpoint of the last event lastevent = events[-1] lastbp = lastevent.bps[-1] lastbp = (lastbp[0] + lastevent.delay - firstdelay, *lastbp[1:]) bps.append(lastbp) # Fades are only relevant for the first and the last event fade = (events[0].fadein, events[-1].fadeout) mergedevent = firstevent.clone(bps=bps, linkednext=lastevent.linkednext, sustain=lastevent.sustain, fade=fade) restevents = events[1:] if mergedevent.args is None and any(ev.args for ev in restevents): mergedevent.args = {} argstate = mergedevent.args if mergedevent.args is not None else {} lastoffset = 0. automationPoints = [] state = {attr: getattr(firstevent, attr) for attr in SynthEvent.dynamicAttributes} for event in restevents: offset = event.delay - mergedevent.delay if event.args is not None and event.args: assert event.instr instr = presetmanager.presetManager.getInstr(event.instr) dynparams = instr.dynamicParams() for k, v in event.args.items(): if k in dynparams: assert isinstance(v, (float, int)), f"Expected a float/int, got {v}" automation = _AutomationSegment(param=k, time=offset, value=float(v), prevalue=_cast(float, argstate.get(k, dynparams[k])), pretime=lastoffset, kind='arg') automationPoints.append(automation) # argstate = _mergeOptionalDicts(mergedevent.args, event.args) if event.args: argstate |= event.args for attr in SynthEvent.dynamicAttributes: value = getattr(event, attr, None) prevalue = state.get(attr) if value is not None and value != prevalue: automation = _AutomationSegment(param=attr, time=offset, value=value, prevalue=prevalue, pretime=lastoffset, kind='normal') automationPoints.append(automation) state[attr] = value if checkStaticAttributes: for attr in SynthEvent.staticAttributes: value = getattr(event, attr) prevalue = getattr(firstevent, attr) if value is not None and value != prevalue: logger.warning(f"Linked event sets playback attribute {attr}={value}, " f"which is different from the previous value of {prevalue}, " f"but attribute '{attr}' is static and cannot change within " f"a linked event. Event: {event}") lastoffset = offset mergedevent.automationSegments = automationPoints return mergedevent