"""
distribute
==========
This module provides functionality to partition a number in multiple ways,
(partitionFib, partitionExpon) or to subdivide it following a given
distribution or curve.
Since many functions make use of curves, we rely heavily on the package
bpf4_, which allows to define and compute break-point-functions
.. _bpf4: https://bpf4.readthedocs.io/en/latest/
"""
from __future__ import annotations
import numpy as np
import bpf4
import warnings
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import TypeVar, Callable, Sequence
import matplotlib.pyplot
T = TypeVar('T')
PHI = 1.61803398874989484820458683436563811772030917
logger = logging.getLogger(__file__)
# ------------------------------------------------------------
#
# Utilities
#
# ------------------------------------------------------------
[docs]
def roundSeqPreservingSum(seq: list[float], maxdelta=1, maxsolutions=1,
ensureDirection=True
) -> list[int]:
"""
Round sequence preserving its integer sum
Find for each element in seq. an integer within the range x-maxdelta : x+maxdelta
so that the sum of these integers is the same as the rounded sum of seq.
.. note::
This function is implemented using contraint programming. The search
space grows very quickly as the seq grows so it can take a long time
for large sequences.
Args:
seq: a list of numbers
maxdelta: the max. deviation the integer can have from the original item
maxsolutions: the max. number of solutions to generate before finding the
best from those solutions. All solutions generated comply with the
constraints given; the returned solutions as the one which follows the
original seq. as closest
ensureDirection: if True, ensure that for any two numbers a and b, if
a < b then a_rounded <= b_rounded (and similarly if a > b)
Returns:
a list of integers representing the original seq. If there are no
solutions an empty list is returned
Example
~~~~~~~
>>> from maelzel import distribute
>>> from emlib.iterlib import pairwise
>>> import matplotlib.pyplot as plt
>>> parts = distribute.partitionExpon(40, 5, exp=3)
>>> parts
array([ 6.0952381 , 6.19047619, 6.85714286, 8.66666667, 12.19047619])
>>> round(sum(parts))
40
>>> intparts = distribute.roundSeqPreservingSum(parts, maxsolutions=10)
>>> intparts, sum(intparts)
[6, 6, 7, 8, 13], 40
"""
from math import ceil, floor
import constraint
p = constraint.Problem()
numvars = len(seq)
seqsum = round(sum(seq))
variables = list(range(numvars))
for i, item in enumerate(seq):
minval = ceil(item) - maxdelta
maxval = floor(item) + maxdelta
domain = list(range(minval, maxval + 1))
p.addVariable(i, domain)
p.addConstraint(constraint.ExactSumConstraint(seqsum), variables)
if ensureDirection:
for idx in range(numvars-1):
if seq[idx] < seq[idx+1]:
p.addConstraint(constraint.FunctionConstraint(lambda a, b: a <= b), (idx, idx+1))
elif seq[idx] > seq[idx+1]:
p.addConstraint(constraint.FunctionConstraint(lambda a, b: a >= b), (idx, idx + 1))
if maxsolutions > 0:
from itertools import islice
solutions = list(islice(p.getSolutionIter(), maxsolutions))
else:
solutions = p.getSolutions()
if not solutions:
return []
solutions.sort(key=lambda sol: sum(abs(v - x) for v, x in zip(list(sol.values()), seq)))
solution = list(solutions[0].items())
solution.sort()
varnames, values = list(zip(*solution))
return list(values)
# ------------------------------------------------------------
#
# PARTITIONS
#
# ------------------------------------------------------------
[docs]
def partitionFib(n: int, numpart: int) -> list[float]:
"""
Partition *n* into *numpart* partitions with fibonacci proportions
Args:
n: the number to partition
numpart: the number of partitions
Returns:
a list of partitions which add up to n
Example
~~~~~~~
>>> from maelzel import distribute
>>> from emlib.iterlib import pairwise
>>> parts = distribute.partitionFib(40, 5)
>>> parts
[2.4500439227299493,
3.964254340907179,
6.414298263637129,
10.378552604544307,
16.792850868181436]
>>> intparts = distribute.roundSeqPreservingSum(parts)
>>> intparts
[3, 4, 7, 10, 16]
>>> for p1, p2 in pairwise(intparts):
... print(f"{p1 = }\t{p2 = }\t\t{p2/p1 = :.3f}")
p1 = 3 p2 = 4 p2/p1 = 1.333
p1 = 4 p2 = 7 p2/p1 = 1.750
p1 = 7 p2 = 10 p2/p1 = 1.429
p1 = 10 p2 = 16 p2/p1 = 1.600
.. note::
In order to partition into integer values, use :func:`roundSeqPreservingSum`
"""
from emlib import mathlib
platonic = [mathlib.fib(i) for i in range(50, 50+numpart)]
ratio = n / float(sum(platonic))
seq = [x * ratio for x in platonic]
return seq
[docs]
def partitionExpon(n: float, numpart: int, exp=2.0) -> list[float]:
"""
Partition *n* into numpart following an exponential curve
Args:
n: the number to partition
numpart: the number of items to partition *n* into
exp: the exponential of the curve
Returns:
a seq. of values which sum up to *n* following an exponential curve
.. note::
In order to partition into integer values, use :func:`roundSeqPreservingSum`
"""
c = bpf4.expon(0, 1, 1, 2, exp=exp)
y0 = c.map(numpart)
r = n / sum(y0)
return y0 * r
[docs]
def chooseBestDistribution(values: Sequence[T], possibleValues: Sequence[T]) -> list[T]:
"""
Reconstruct the given sequence with items from *possibleValues*
Try to follow the distribution of values as close as possible
by drawing elements from *possibleValues*, so that
``sum(chosen)`` is as close as possible to ``sum(values)` at any
moment of the operation.
Args:
values: a seq. of values
possibleValues: a seq. of values to draw from
Returns:
a "reconstruction" of the sequenve *values* with items drawn from
*possibleValues*
"""
values = sorted(values)
possibleValues = sorted(possibleValues)
out = []
status = 0
def dist(a, b):
return abs(a - b)
for value in values:
bestfit = sorted((dist(elem, value + status), elem) for elem in possibleValues)[0][1]
dif = value - bestfit
status += dif
out.append(bestfit)
return out
[docs]
def partitionCurvedSpace(x: float,
numpart: int,
curve: bpf4.BpfInterface,
minval=1,
maxdev=1,
accuracy=1):
"""
Partition a number *x* into *numpart* partitions following a curve
Args:
x: the number to partition
numpart: number of partitions
curve: a bpf curve where y goes from 0 to some integer (this values is not important)
It must grow monotonically from 0, meaning that for any x_n f(x_n) >= f(x_(n-1))
minval: min. value of a partition
maxdev: max. deviation
accuracy: a value between 0 and 1
Reeturns:
a list of partitions, None if not solution is possible
.. note::
curve must be a monotonically growing curve starting at 0.
See the example using .integrated() to learn how to make a monotonically growing
curve out of any distribution
NB: returns None if no solution is possible
Examples
~~~~~~~~
Divide 23 into 6 partitions following an exponential curve
>>> import bpf4
>>> curve = bpf4.expon(0, 0, 1, 1, exp=3)
>>> partitionCurvedSpace(23, 6, curve)
[1, 1, 2, 4, 6, 9]
Partition a distance following, an arbitraty curve, where the y defines the
relative duration of the partitions. In this case, the curve defined the
derivative of our space.
>>> import bpf4
>>> curve = bpf4.linear(
... 0, 1,
... 0.5, 0,
... 1, 1)
>>> partitionCurvedSpace(21, 7, curve.integrated())
[5, 3, 2, 1, 2, 3, 5]
"""
assert curve(curve.x0) == 0 and curve(curve.x1) > 0, \
"The curve should be a monotonically growing curve, starting at 0"
if accuracy > 1 or accuracy < 0:
raise ValueError("0 < accuracy <= 1")
elif 0 < accuracy < 1:
scale = int(1.0/accuracy)
parts = partitionCurvedSpace(x*scale, numpart, curve, minval=minval*scale,
maxdev=maxdev, accuracy=1)
if not parts:
scale += 1
parts = partitionCurvedSpace(x*scale, numpart, curve, minval=minval*scale,
maxdev=maxdev, accuracy=1)
if not parts:
warnings.warn("No parts with this accuracy")
return None
fscale = float(scale)
def roundgrid(x, grid):
numdig = len(str(grid).split(".")[1])
return round(round(x*(1.0/grid))*grid, numdig)
return [roundgrid(part/fscale, accuracy) for part in parts]
else: # accuracy == 0
import constraint
normcurve = curve.fit_between(0, 1)
normcurve = (normcurve / normcurve(1)) * x
optimal_results = np.diff(normcurve.map(numpart+1))
maxval = int(min(int(max(optimal_results) + 1), x-(numpart-1)*minval))
# maxval = x - minval * (numpart - 1)
V = list(range(numpart))
p = constraint.Problem()
p.addVariables(V, list(range(minval, maxval)))
def objective(solution):
values = list(solution.values())
return sum(abs(val-res) for val, res in zip(values, optimal_results))
for var, res in zip(V, optimal_results):
p.addConstraint((lambda x, res=res: abs(x-res) <= maxdev), [var])
p.addConstraint(constraint.ExactSumConstraint(x), V)
solutions = p.getSolutions()
if not solutions:
logger.warning("No solutions")
return None
solutions.sort(key=objective)
# each solution is a dict with integers as keys
best = [value for name, value in sorted(solutions[0].items())]
return best
def _solution_getvalues(solution):
return [val for name, val in sorted(solution.items())]
[docs]
def partitionWithCurve(x: float,
numpart: int,
curve: bpf4.BpfInterface,
method='brentq',
) -> list[float]:
"""
Partition *x* in *numparts* parts following *curve*
Args:
x: the value to partition
numpart: the number of partitions
curve: the curve to follow. It is not important over which interval x
it is defined. The y coord defines the width of the partition (see example)
excluded: any value
Returns:
the list of the partitions
Example
~~~~~~~
Partition the value 45 into 7 partitions following the given curve
>>> import bpf4
>>> curve = bpf4.halfcos(0, 11, 1, 0.5, exp=0.5)
>>> distr = partitionWithCurve(45, 7, curve)
>>> distr
array([ 11. , 10.98316635, 10.4796218 , 7.89530421,
3.37336152, 0.76854613, 0.5 ])
>>> abs(sum(distr) - 45) < 0.001
True
"""
x0, x1 = curve.bounds()
n = x
def func(r):
return sum((bpf4.expon(x0, x0, x1, x1, exp=r)|curve).map(numpart)) - n
try:
if method == 'brentq':
from scipy.optimize.zeros import brentq
r = brentq(func, x0, x1)
curve = bpf4.expon(x0, x0, x1, x1, exp=r)|curve
parts = curve.map(numpart)
elif method == 'fsolve':
from scipy.optimize import fsolve
xs = np.linspace(x0, x1, 100)
rs = [round(float(fsolve(func, x)), 10) for x in xs]
rs = set(r for r in rs if x0 <= r <= x1)
parts = []
for r in rs:
curve = bpf4.expon(x0, x0, x1, x1, exp=r)|curve
parts0 = curve.map(numpart)
parts.extend(parts0)
else:
raise ValueError(f"Method {method} unknown. Possible methods: brentq, fsolve")
if abs(sum(parts) - n) / n > 0.001:
logger.error(f"Error exceeds threshold: {parts=}, {sum(parts)=}")
return parts
except ValueError:
minvalue = curve(bpf4.util.minimum(curve))
maxvalue = curve(bpf4.util.maximum(curve))
if n/numpart < minvalue:
logger.error("no solution can be found for the given parameters. x is too small "
"for the possible values given in the bpf, for this amount of "
" partition try either giving a bigger x, lowering the number of "
"partitions or allowing smaller possible values in the bpf")
raise ValueError("No solution found (x is too small)")
elif n/numpart > maxvalue:
logger.error("No solution can be found for the given parameters. x is too big "
"for the possible values given in the bpf. try either giving a "
"smaller x, increasing the number of partitions or allowing bigger "
"possible values in the bpf")
raise ValueError("No solutions found (x is too big)")
else:
raise ValueError("???")
[docs]
def partitionFollowingCurve(n: int, curve: bpf4.BpfInterface, ratio=0.5, margin=0.1,
method='brentq') -> list[float]:
"""
Partition *n* following curve
The difference with :func:`partitionWithCurve` is that in that function
you determine the number of partitions manually whereas here the number
of partitions is determined as a ratio of the possible number of partitions.
This ensures that you always get a valid result (albeit a trivial one if, for
example, *n* can only be split in 1 partition of *n*)
Args:
n: the number to partition
curve: a bpf
ratio: ratio indicates the number of partitions, where 0 means the least
number of partitions possible and 1 means the most number of partitions possible
margin: sets the max and min possible partitions as a ratio between the max and min
value of the given curve.
Returns:
the list of the partitions
.. seealso:: :func:`partitionWithCurve`, :func:`roundSeroundSeqPreservingSum`
"""
maxval = curve(bpf4.util.maximum(curve))
minval = curve(bpf4.util.minimum(curve))
minval2 = minval + (maxval-minval)*margin
maxval2 = minval + (maxval-minval)*(1-margin)
minpartitions = n / maxval2
maxpartitions = n / minval2
npart = round(minpartitions + (maxpartitions-minpartitions)*ratio)
return partitionWithCurve(n, int(npart), curve, method)
# ------------------------------------------------------------
#
# SUBSAMPLED CURVES
#
# ------------------------------------------------------------
[docs]
def onepulse(x: float, resolution: int, entropy=0.) -> list[int]:
"""
Represents *x* as a seq. of 0 and 1
Args:
x: a float number between 0 and 1
resolution: int. The number of pulses. NB: these are not binary bits,
all bits have the same significance
entropy: (float, 0-1). 0: no entropy (the same output for a given
input), > 0: output is shuffled, entropy represents the number of
shuffles
Returns:
a list of 0s and 1s representing x
Example
~~~~~~~
>>> from maelzel import distribute
>>> distribute.onepulse(0.5, 5)
[1, 0, 1, 1, 0]
"""
x = x % 1
ones = int(x * resolution + 0.5)
zeros = resolution - ones
o2 = [1] * ones
z2 = [0] * zeros
bins = interleave(o2, z2)
if entropy > 0:
from emlib import combinatorics
bins = combinatorics.unsort(bins, entropy)
return bins
[docs]
def ditherCurve(curve: bpf4.BpfInterface, numsamples: int, resolution=2) -> list[int]:
"""
Sample *curve* applying dithering to smooth transitions
Args:
curve: a curve defined between 0-numstates.
numsamples: the number of samples, determines also the size of the output
resolution: the number of values to average together to calculate
the resulting curve. There is a tradeoff between x and y resolution
Returns:
a list of ints representing the state at each time
Example
~~~~~~~
>>> import bpf4
>>> b = bpf4.linear(0, 0, 1, 2)
>>> ditherCurve(b, 20)
[0, 0, 1, 0, 1, 0, 1, 1, 1, 1, 1, 2, 1, 1, 2, 2, 2, 1, 2, 2]
"""
origs = curve.map(numsamples)
resolution = max(1, resolution-1)
out = [0]*int(resolution/2)
def avgnow(win, x):
return (sum(win) + x) / (len(win)+1)
for i, orig in enumerate(origs):
win = out[-resolution:]
minvalue = int(orig)
maxvalue = int(orig+0.99999999999)
states = list(range(minvalue, maxvalue+1))
diffs = [abs(orig-avgnow(win, x)) for x in states]
best = sorted(zip(diffs, states))[0][1]
out.append(best)
out = out[-numsamples:]
assert len(out) == numsamples
return out
[docs]
def pulseCurve(curve: bpf4.BpfInterface,
n: int,
resolution=5,
entropy=0.,
x0: float | None = None,
x1: float | None = None
) -> list[int]:
"""
Generates a list of 0s/1s of length n, following the curve
For a resolution of 4, this is how some values are represented::
0 --> 0000
1 --> 1111
0.5 --> 0011
0.25 --> 0001
0.33 --> 0001
Args:
curve: a bpf or a function defined between 0 and 1
n: the number of values to generate
resolution: how many rendered values represent a value of the curve.
In fact, the bit-rate values will be rounded to the nearest lower
representable value
entropy: a value between 0 and 1. Nearer to 0, the pulse representation is
unmodified, nearer to 1, the numbers representing the pulse are shuffled.
Imagine how to represent 0.5 with a resolution of 4, 0.5 --> 0011
With increasing entropy, this representation yields other shuffles:
0.5 --> 0101 or 1001 or 1010 ...
x0, x1: the range to evaluate the curve. If a bpf is passed, its bounds are used
as defaults. If a normal function is passed, these values default to 0, 1
Returns:
a seq. of 0s and 1s of length n
"""
if x0 is None:
try:
x0 = curve.x0
except AttributeError:
x0 = 0
if x1 is None:
try:
x1 = curve.x1
except AttributeError:
x1 = 1
resolutions = _dither_resolutions(n, resolution)
nums = len(resolutions)
xs = np.linspace(x0, x1, nums)
ys = [curve(x) for x in xs]
out = []
assert sum(resolutions) == n
for x, y, resolution in zip(xs, ys, resolutions):
bins = onepulse(y, resolution, entropy)
out.extend(bins)
assert len(out) == n
return out
def _dither_resolutions(numsamples, resolution):
nums = int(numsamples / resolution + 0.5)
intvalue = int(numsamples / nums)
rest = numsamples - (intvalue * nums)
resolutions = [intvalue + int(index < rest) for index in range(nums)]
return resolutions
[docs]
def interleave(A: list[T], B: list[T], weight=0.5) -> list[T]:
"""
interleave the elements of A and B
Args:
A: a list of elements
B: another list of elements
weight: Between 0-1. 0: first the elements of xs, then B, not interleaved;
0.5: interleave A and B regularly; 1: first the elements of B, then A
Returns:
a list with items of A and B interleaved
Example
~~~~~~~
>>> from maelzel import distribute
>>> A = ["A", "B", "C"]
>>> B = ["a", "b", "c", "d", "e"]
>>> "".join(distribute.interleave(A, B))
'aAbcBdCe'
"""
if not B:
return A
elif not A:
return B
c = bpf4.linear(0, len(A), 0.5, len(A)/len(B), 1, 1/len(A))
r = xr = c(weight)
out = []
L = len(A)+len(B)
A_index = 0
B_index = 0
while True:
if r >= 1:
if A_index < len(A):
out.append(A[A_index])
A_index += 1
r -= 1
else:
if B_index < len(B):
out.append(B[B_index])
B_index += 1
r += xr
if len(out) == L:
break
return out
# ------------------------------------------------------------
#
# OTHER
#
# ------------------------------------------------------------
[docs]
def interleaveWithDynamicWeights(streamSizes: list[int],
weightBpfs: list[Callable[[float], float]]
) -> list[tuple[int, int]]:
"""
Interleave items of multiple streams based on dynamic weights
Args:
streamSizes: the sizes of each stream
weightBpfs: for each stream, a bpf indicating the weighting. Each bpf is defined
between 0-1 (for both x and y coords)
Returns:
a list of tuples of the form (streamIndex: int, indexInStream: int)
Example
~~~~~~~
>>> import bpf4
>>> A = "AAAAAAAAAAAAAAAAAAA"
>>> B = "BBBBBBBBBBBBB"
>>> C = "CCCCC"
>>> D = "DDD"
>>> streams = (A, B, C, D)
>>> streamSizes = (len(A), len(B), len(C), len(D))
>>> bpfs = (bpf4.linear(0, 1, 1, 1), # bpfs must be defined within the unity
... bpf4.halfcos(0, 0, 0.5, 1, 1, 0),
... bpf4.linear(0, 1, 1, 0),
... bpf4.expon(0, 0, 1, 1, exp=3)
... )
>>> distributedItems = interleaveWithDynamicWeights(streamSizes, bpfs)
>>> for stream, idx in distributedItems:
... print(streams[stream][idx], end=' ')
ACAACABABCABABABCABBABABABDBACABABADAADA
"""
weights = list(map(bpf4.util.asbpf, weightBpfs))
weight_total = sum(weights)
normalized_weights = [w / weight_total for w in weights]
xss = [np.linspace(0, 1, stream_quant * 2 + 1)[1::2] for stream_quant in streamSizes]
warped_xss = [bpf4.util.warped(w[::w.ntodx(1000)]).map(np.ascontiguousarray(xs))
for w, xs in zip(normalized_weights, xss)]
# before we flatted all the frames to sort them, we need to attach
# the stream id to each frame so that we know were it belongs
annotated_xss = []
for stream, xs in enumerate(warped_xss):
annotated_xs = [(x, (stream, index_in_stream)) for index_in_stream, x in enumerate(xs)]
annotated_xss.extend(annotated_xs)
annotated_xss.sort()
frames = [frame for x, frame in annotated_xss]
return frames
_defaultPalette = [
'#6acc64',
'#d65f5f',
'#ee854a',
'#956cb4',
'#82c6e2',
'#4878d0',
'#8c613c'
]
[docs]
def plotFrames(xs, ids: list[str] | None = None, top=1., bottom=0., durs: list[float] | None = None,
palette: list[str] | None = None, edgecolor='black', ax=None, show=True
) -> matplotlib.pyplot.Axes:
"""
Plot a seq. of stacked frames
Imagine a section divided in measures. Each of these measures can be thought
of as a frame. This function permits to visualize their relative duration
by plotting these frames stacked to the left.
Args:
xs: (seq) The start time of each frame
ids: (seq, optional) The id of each frame. Each id is plotted differently
top, bottom : (number) The frames are defined between these y values.
it only makes sense if plotting against something else,
which shares the x coord
durs: (seq, optional) The duration of each section.
If not given, it is assumed that the frames are non-overlapping,
each frame ending where the next one begins. In this case, there are
len(xs) - 1 number of frames (the last x value is used to determine the
width of the last frame)
palette: (seq) A list of colours, will be applied to the IDs as they are found
edgecolor: the color of the edges
ax: if given (a matplotlib.pyplot.Axes) this axes will be used to plot to
Returns:
the pyplot.Axes used
Example
-------
>>> from maelzel.distribute import *
>>> sections = [0, 10, 11, 13, 18]
>>> ids = [0, 0, 1, 0, 1]
>>> plotFrames(sections, ids)
.. image:: ../assets/distribute-plotFrames2.png
>>> sections = [0, 10, 11, 13, 18]
>>> ids = [0, 2, 3, 5, 4]
>>> plotFrames(sections, ids)
.. image:: ../assets/distribute-plotFrames1.png
"""
import matplotlib.pyplot as plt
if durs is None:
durs = list(np.diff(xs))
durs.append(durs[-1])
if ids is None:
ids = np.ones((len(durs),))
if not ax:
fig = plt.figure()
ax = fig.add_subplot(111)
ys = np.ones_like(ids) * top
bottom = np.ones_like(xs) * bottom
C = palette if palette is not None else _defaultPalette
colors = [C[id] for id in ids]
xs2 = [x + w / 2 for x, w in zip(xs, durs)]
container = ax.bar(xs2, height=ys, width=durs, bottom=bottom, color=colors,
edgecolor=edgecolor)
ax.bar_label(container, labels=ids, label_type='center')
if show:
plt.show()
return ax
[docs]
def dohndt(numseats: int, votesPerParty: list[int | float]) -> list[int]:
"""
Perform a D'Ohndt distribution
Args:
numseats: the number of seats to distribute across the parties
votesPerParty: the votes (can be interpreted as the weight of each party)
of each party.
Returns:
the list of assigned seats per party
Example
~~~~~~~
Distribute a number of items across streams according to
a set of weights.
>>> from maelzel.distribute import dohndt
>>> levels = 10
>>> numstreams = 4
>>> weights = [10, 6, 5, 3]
>>> assigned = dohndt(levels, weights)
>>> assigned
[3, 2, 2, 1]
"""
numparties = len(votesPerParty)
assignedSeats = [0] * numparties
indices = list(range(numparties))
for seat in range(numseats):
costs = [(votes/(assigned+1), index)
for votes, assigned, index in zip(votesPerParty, assignedSeats, indices)]
costs.sort(key=lambda cost:cost[0])
winnerindex = costs[-1][1]
assignedSeats[winnerindex] += 1
return assignedSeats
if __name__ == '__main__':
import doctest
doctest.testmod() # optionflags=doctest.NORMALIZE_WHITESPACE)