- Wednesday, 22 February, 2023
- First Draft
- Wednesday, 15 May, 2024
- Add about section
In this article I described algorithm which I used for many years for different applications. Probably since 2011. The algorithm aggregates small partitions(chunks) into bigger partitions(chunks). On first glance it looks trivial, however it has subtle details which described in the article. Also I gave and proved estimate for bigger partition(chunk) boundary for case when small partition(chunk) has size less or equal to desired big partition size.
Consider a list of non negative real numbers.
data = [14, 1, 14, 9, 7, 6, 10, 6, 0, 0, 5, 10, 10, 11, 6, 10, 9]We are going to develop efficient algorithm to split given list of non negative
real numbers to list of sublists, where sum of each sublist close to some
desired value. By close we mean a number called distance, which equals to
absolute value of difference between sum of sublist values and desired value.
For instance, for list data and desired_value = 40 we may have next result.
desired_value = 40
partitions = [
[14, 1, 14, 9],
[7, 6, 10, 6, 0, 0, 5, 10],
[10, 11, 6, 10],
[9],
]We will call each sublist -- partition, list of sublist -- partitions,
sum(partition) -- partition value. Terminology depends from application
domain. Another popular option is call each sublist -- chunk, list of sublist
-- chunks, sum(chunk) -- chunk value.
Let's print sum and distance for every partition from above example.
for partition in partitions:
print(
f"{partition=},{sum(partition)=}, distance={abs(sum(partition) - desired_value)}"
)partition=[14, 1, 14, 9],sum(partition)=38, distance=2
partition=[7, 6, 10, 6, 0, 0, 5, 10],sum(partition)=44, distance=4
partition=[10, 11, 6, 10],sum(partition)=37, distance=3
partition=[9],sum(partition)=9, distance=31
As we see all partitions except last are close to 40, but last partition is
not. There is no more numbers in data to add to last partition to make last
partition more close to 40.
First we explain the algorithm with words and then will provide code. Let's
iterate over data and build first partition. We will join an element from
data into first partition until it is better to add the element into
partition than stop build first partition. Better means close to 40. We
definitely must put 14 into 1-st partition [14]. Then we have a choice ,
add next element into current partition or say that first partition is ready
and start build second partition. To make the decision we compare distance of
current partition [14] with candidate partition [14,1], where we join next
element from data to current partition. List [14,1] is more close to 40
i.e. abs(sum([14,1]) - 40) <= abs(sum([14]) - 40). Then we accept that
partition [14,1] is better then [14] and consider next candidate
[14,1,14]. It better then [14,1]. Then we consider [14,1,14,9], it is
better than [14,1,14]. But [14,1,14,9,7] is not better than [14,1,14,9]
because partition [14,1,14,9] more close to 40 than candidate partition
[14,1,14,9,7]. Similarly we build second partition as [7, 6, 10, 6, 0, 0, 5, 10], 3-rd partition as [10,11,6,10], 4-rd partition as [9]. Now we write
algorithm with python code.
from numbers import Real
def distance(v1: Real, v2: Real) -> Real:
"Returns distance between two values."
return abs(v1 - v2)
def aggregate(items: list[Real], desired_value) -> list[list[Real]]:
"""Split list into list of sublists, where sum of each sublist close to `desired_volume`"""
partitions = []
current = None
for item in items:
if current is None:
current = [item]
continue
candidate = current + [item]
if distance(sum(candidate), desired_value) <= distance(
sum(current), desired_value
):
current = candidate
else:
partitions.append(current)
current = [item]
if current != None:
partitions.append(current)
return partitionsLet's test it with our data
partitions = aggregate(data, desired_value)
for partition in partitions:
print( f"{partition=},{sum(partition)=}, distance={abs(sum(partition) - desired_value)}")partition=[14, 1, 14, 9],sum(partition)=38, distance=2
partition=[7, 6, 10, 6, 0, 0, 5, 10],sum(partition)=44, distance=4
partition=[10, 11, 6, 10],sum(partition)=37, distance=3
partition=[9],sum(partition)=9, distance=31
In words, we iterate over items, every time we have current partition and
candidate partition , which is current + [item]. We accept candidate
partition as current until distance between candidate and desired_value
less or equal with distance between current and desired_value. It is
important accept candidate as current until it is less or equal. It allow
always join items with value 0 to candidate. Other words it is greedy
algorithms. This property is very important to many real applications. In
general, we like to avoid partition [0] if it possible. However current
version of aggregate function still allow get partition [0] in case where
possible to avoid this.
partitions = aggregate([0, 120], desired_value)
for partition in partitions:
print(
f"{partition=},{sum(partition)=}, distance={abs(sum(partition) - desired_value)}"
)partition=[0],sum(partition)=0, distance=40
partition=[120],sum(partition)=120, distance=80
This happened because distance between 0 and 40 less than distance between
120 and 40. We will a bit improve our algorithm on that way, if
sum(current) == 0 we accept candidate.
def aggregate(items: list[Real], desired_value) -> list[list[Real]]:
"Split list into sublists, where sum of each sublist close to `desired_volume`"
partitions = []
current = None
for item in items:
if current is None:
current = [item]
continue
candidate = current + [item]
if sum(current) == 0 or distance(
sum(candidate), desired_value
) <= distance(sum(current), desired_value):
current = candidate
else:
partitions.append(current)
current = [item]
if current != None:
partitions.append(current)
return partitionspartitions = aggregate([0, 120], desired_value)
for partition in partitions:
print(
f"{partition=},{sum(partition)=}, distance={abs(sum(partition) - desired_value)}"
)partition=[0, 120],sum(partition)=120, distance=80
Now we will analyze range of a distance between a partition and desired value.
But before let make some assumptions about input list of values. Let denote
desired_value as current
partition value as
In this new notation we can describe algorithm as
- A = next(data), a = next(data)
- If
$| A + a - d | \le | A - d |$ - A = A + a, a = next(data)
- Else
$| A + a - d | \gt | A - d |$ - Accept A
- A, a = next(data),next(data)
- Go to step 1.
Depends of value
- a)
$A + a \le d$ ,$A \le d$ - b)
$A + a \le d$ ,$A \gt d$ , impossible,$A \gt d => A + a \gt d$ - c)
$A + a \gt d$ ,$A \le d$ - d)
$A + a \gt d$ ,$A \gt d$
And we need also initial state which we have after execute step 0.
- o)
A = next(data),a = next(data)
Now we can draw possible transitions between cases a),b),c),d); our start state is o).
stateDiagram-v2
%% Initialize
[*] --> a: A,a
%% Self transitions
a --> a: +a
c --> c: +0
d --> d: +0
%% Iterations
a --> c: +a
c --> d: +a
%% Stop
c --> [*]: A
d --> [*]: A
Where notation means:
- +a:
A = A + a,a = next(data) - +0:
A = A + 0,a = next(data) - A, a: Pass
A,ato next state - A: accept
A
Now let's analyze cases a),b),c),d) for step 1. or 2.. It is convenient denote a.1) the case a) on step 1. , similarly a.2) means case a) on step 2. etc.
Out goal estimate distance x -> y.
a.1) Case a) is
It is identity. Other words we join items to current partition while a) satisfied.
a.2) Inequality 2. is negation of identity a.1) which is
c.1) Case c) is
Let's estimate distance.
therefore
c.2) We accept partition A. Inequality 2. is negation of 1., which is
Let's estimate distance
thus
d.1) Case d) is
It means join only items with value 0.
d.2) We accept partition A.Inequality 2. is negation of inequality 1., which is
It is identity. Thus we have to use estimate
Estimate of c.1 after replace
We accept partition value A on c.2 and d.2. On c.2
We also note that with our assumptions it is the best estimate. It means
equality holds in the right part
For analysis we have used list of real numbers as input. For real applications
each element of input list has a value and may have some info. In next example
info is batch_num and we renamed value to volume. In this example
batch_num is a number of loading data, and volume is number of loaded rows
for the batch. We aggregate batch numbers into partition as a list, like we did
for numbers.
from dataclasses import dataclass
@dataclass
class Batch:
batch_num: int
volume: int
@dataclass
class Partition:
batch_nums: list[int]
volume: int = 0
def __add__(self, other: "Partition") -> "Partition":
return Partition(
self.batch_nums + other.batch_nums,
self.volume + other.volume,
)
def create_partition(batch: Batch) -> Partition:
return Partition([batch.batch_num], batch.volume)
def aggregate_partition(
batches: list[Batch], desired_volume
) -> list[Partition]:
"Split list into sublists, where sum of each sublist close to `desired_volume`"
partitions = []
current = None
for batch in batches:
if current is None:
current = create_partition(batch)
continue
candidate = current + create_partition(batch)
if current.volume == 0 or distance(
candidate.volume, desired_volume
) <= distance(current.volume, desired_volume):
current = candidate
else:
partitions.append(current)
current = create_partition(batch)
if current != None:
partitions.append(current)
return partitionsdesired_volume = 40
batches = [Batch(batch_num=i, volume=x) for i, x in enumerate(data)]
partitions = aggregate_partition(batches, desired_volume)
for partition in partitions:
print(
f"{partition=},{partition.volume=}, distance={abs(partition.volume - desired_value)}"
)partition=Partition(batch_nums=[0, 1, 2, 3], volume=38),partition.volume=38, distance=2
partition=Partition(batch_nums=[4, 5, 6, 7, 8, 9, 10, 11], volume=44),partition.volume=44, distance=4
partition=Partition(batch_nums=[12, 13, 14, 15], volume=37),partition.volume=37, distance=3
partition=Partition(batch_nums=[16], volume=9),partition.volume=9, distance=31
We can call list of Batch as list of small partitions, and output list of
Partition as list of big partitions. Or in chunk terminology list of small
chunks and list of big chunks. Other words we aggregate list of something which
has a value into list of bigger something which has a value.
Suppose batches are already ordered, and for an application it is enough to remember only start batch number and stop batch number instead to remember list of all batches in a partition. Here is an example.
@dataclass
class Partition:
batch_num_from: int
batch_num_to: int
volume: int = 0
def __add__(self, other: "Partition") -> "Partition":
return Partition(
self.batch_num_from,
other.batch_num_to,
self.volume + other.volume,
)
def create_partition(batch: Batch) -> Partition:
return Partition(batch.batch_num, batch.batch_num, batch.volume)partitions = aggregate_partition(batches, desired_volume)
for partition in partitions:
print( f"{partition=},distance = {abs(partition.volume - desired_value)}")partition=Partition(batch_num_from=0, batch_num_to=3, volume=38),distance = 2
partition=Partition(batch_num_from=4, batch_num_to=11, volume=44),distance = 4
partition=Partition(batch_num_from=12, batch_num_to=15, volume=37),distance = 3
partition=Partition(batch_num_from=16, batch_num_to=16, volume=9),distance = 31
In general, info usually integer or string or date or range of integers or range of strings or range of dates. We can aggregate info of input items into a partition as a list as we did in Example 1. Or when items is ordered we can aggregate info of input items into a range as we did in Exmple 2. Often for an application value is an integer and desired value is an integer. But it is also acceptable to have value is an integer and desired value is a real.
We have represented algorithm for aggregate list of small partitions into list
of big partitions with desired value
The algorithm can be called as a half -- one and a half.