Data partitioning in Kusto
Last modified: 07/27/2021
By default, tables in Kusto are partitioned according to the time at which data is ingested. In the majority of use cases, there is no need to change that, unlike in other technologies, in which data partitioning is necessary in many cases, to reach better performance.
In specific cases, it is possible and recommended to define a Data partitioning policy, to reach improved performance at query time.
When to use data partitioning
There are very specific scenarios in which the benefits of partitioning a table outweigh the overhead of resources being continuously invested in the process. In such cases, the upside is expected to be significant, and improve query performance several times. In other cases, however, it can do more damage than good.
Filtering
Homogeneous extents include metadata on their partition values, that allows Kusto’s query planner to filter out data shards without having to perform an index lookup or scanning the data. As result, a significant reduction in resources required to serve queries is expected, when a query includes an equality filter on the partition key.
Example
- Table
T
has astring
-typed column namedTenantId
, which represents a unique identifier for a tenant that the data in the table belongs to. T
includes data for multiple tenants, say10,000
or more.- The majority of queries running against
T
use equality filters (==
orin()
) on theTenantId
column.- For example:
T | where Timestamp > ago(30d) | where TenantId == 'f83a65e0-da2b-42be-b59b-a8e25ea3954c' // <-- | where Severity == "Error" | summarize count() by bin(Timestamp, 1d) | render timechart
- The value
f83a65e0-da2b-42be-b59b-a8e25ea3954c
belongs to a single partition, out of the maximum number of partitions defined in the policy (for example: partition number10
out of a total of128
).
- For example:
- The filter on
TenantId
is highly efficient, as it allows Kusto’s query planner to filter out any extents that belongs to partitions that aren’t partition number10
.- Assuming equal-distribution of data across tenants in
T
, that means we’re left with 1/128 (~0.78%) of the extents, even before evaluating the datetime pre-filter and leveraging the default index onTenantId
.
- Assuming equal-distribution of data across tenants in
- When the amount of concurrent queries is higher (dozens or more), the benefit increases significantly - as each such query consumes less resources.
- In this sample use case, it’s appropriate to set the partition assignment mode for the hash partition key to
Uniform
.
Joins / aggregations
When a table has a hash partition key defined in its partitioning policy, there are two different assignment modes:
ByPartition
: All homogeneous extents that belong to the same partition are assigned to the same node in the cluster.Uniform
: Extents’ partition values are not taken into account when assigning extents uniformly to the cluster’s nodes.
Choosing the ByPartition
mode makes sense when the cardinality of the hash partition key is very high, partitions are expected to be ~equally-sized, and the query pattern uses the shuffle strategy often.
- Queries that use the
shuffle
strategy, and in which the shuffle key used injoin
,summarize
ormake-series
is the table’s hash partition key - are expected to perform better. This is because the amount of data required to move across the cluster’s nodes is significantly reduced.
Example
- Table
T
has astring
-typed column namedDeviceId
, which represents a unique identifier for a device that the data in the table was sent from. - 10s of millions of such devices are spread across the world, and they all emit a similar amount of metrics per unit of time.
- Queries running against
T
aggregate overDeviceId
using theshuffle
strategy, withDeviceId
as the shuffle key.- For example:
T | where Timestamp > ago(1d) | summarize hint.shufflekey = DeviceId arg_max(Temperature, RelativeHumidity) by DeviceId // <-- | where RelativeHumidity > 75 | count
- For example:
Backfill or unordered ingestion
In many systems, data is ingested close to when it was generated at its source. More than that - data is ingested in-order, e.g. data from 10 minutes ago is ingested after data from 15 minutes ago. There are some cases, however, in which the patterns are different - these are the ones referred to in this section.
Example
- Data can be spread across source files according to a different partition key, and not according to time.
- For example: A single file may include all records for a single division, however those records span a period of 3 years.
- This data includes a datetime column named
record_timestamp
and is ingested into tableT
. - When this data is ingested into
T
, the creation time of the result extents is set to the time of ingestion, and has no relationship with the datetime values in the dataset.- As the source files are mixed, the ingestor can’t solve this by specifying the CreationTime property at ingestion time.
- Data retention and caching policies act according to the extents’ creation times. A different way is required for overriding it, based on the values of
records_timestamp
. - In this case, setting
record_timestamp
as the uniform range datetime partition key ofT
, while specifying theOverrideCreationTime
property astrue
will have the cluster re-partition the data and organize the records - And, if the table has an effective caching policy of 30 days, any records older than that period will not be cached.
Data compression
Data partitioning results with similar values of the partition key ending up in the same data shards. In some cases, data in other columns behaves similarly, as values sent from the same device or tenant have similar characteristics, compared to those sent from others.
As a result, in many cases the compression ratio of such columns, and not only the partition key, improves. Meaning - less storage is required to store the data, and a potential cost reduction is achievable.
As different data sets may have different characteristics - YMMV.
When not to use data partitioning
Any other case which doesn’t meet the criteria mentioned above will not benefit, and may be harmed by enabling data partitioning.
Specifically, don’t use data partitioning when:
- You don’t frequently use equality filters or join/aggregate on the chosen hash partition key.
- Your data is unevenly distributed across the partitions defined by your policy, and you want each partition to be assigned to a single node.
- If data is already ingested into a non-partitioned table in Kusto, an easy way to check this is by running a query such as the following, and making sure the values under the
count_
column are ~even:T datascope=hotcache | summarize count() by p = hash(my_column, 128) | order by count_ desc
- If data is already ingested into a non-partitioned table in Kusto, an easy way to check this is by running a query such as the following, and making sure the values under the
- The cardinality of the chosen hash partition key is low.
- For example: Table
T
has a columnSeverity
with only 5 unique values (“Verbose”, “Info”, “Warning”, “Error”, “Critical”), even if you frequently filter on this column.
- For example: Table
- You have many outliers in your data, that will result with a large amount of small extents.
- For example: you defined a uniform range datetime partition key, with a range of 1 day, but there are many records with ‘bad’ datetime values in the column from the distant past/future (e.g.
0003-12-31
and3020-07-11
).
- For example: you defined a uniform range datetime partition key, with a range of 1 day, but there are many records with ‘bad’ datetime values in the column from the distant past/future (e.g.
More importantly - make sure you:
- Follow the guidelines in the documentation
- Monitor & measure the impact of the policies you set on the cluster.
How data partitioning works
- When data is ingested into a table in a Kusto database, data shards (extents) are created.
- To maintain query efficiency, smaller extents are merged into larger ones.
- That is done automatically, as a background process.
- It reduces the management overhead of having a large number of extents to track.
- It also allows Kusto to optimize its indexes and improve compression.
- When a partitioning policy is defined on a table, extents participate in another background process after they’re created (ingested), and before they are merged.
- This process re-ingests the data from the source extents and creates homogeneous extents.
- In these, all values in the column that is the table’s partition key belong to the same partition.
- Extents that belong to different partitions do not get merged together.
- Because extents are not merged before they are homogeneous, it is important to make sure the cluster’s maximum capacity for partitioning and for merges are balanced, so that:
- The rate of creating new extents isn’t significantly higher than the rate of merging them.
- The overall number of extent in the database/cluster doesn’t keep growing significantly over time.
- Because the partitioning process requires reading and re-ingesting the data again, the process is expected to have a continuous overhead on the cluster’s resources utilization.
- In certain cases, this means that increasing the size of the cluster would be required.
- The partitioning process prioritizes the table with a data partitioning policy that has the largest amount of non-partitioned records.
- It is possible that partitioning data in one very large table may delay partitioning data in other, smaller tables.
Advanced scenarios
Multiple high cardinality keys
At times, you may find that queries may frequently filter on more than a single high cardinality string column, which makes the choice of a hash partition key more challenging. The choice in these cases may depend on the relationship between these columns, and, if such exists - the ability to create a lookup table from one column to another.
Example
- You have a table named
telemetry
, with 2 columns:device_id
: cardinality = 100M.manufacturer_id
: cardinality = 10M.
- Queries always filter on
device_id
or onmanufacturer_id
, or on both. - Each device is manufactured by a single manufacturer, meaning that given a certain value of
device_id
, there’s a single matching value ofmanufacturer_id
.
In this case, it would be recommended to:
- Define
manufacturer_id
as the hash partition key of the table. - Create a lookup table of
device_id
tomanufacturer_id
, so that given the former you can find the value of the latter.- This table can be created using a materialized-view.
- This table (or materialized view) will have
device_id
as its hash partition key, as lookups over it will always filter bydevice_id
.
.create async materialized-view with (backfill=true) device_to_manufacturer_lookup on table telemetry { telemetry | summarize take_any(manufacturer_id) by device_id }
.alter materialized-view device_to_manufacturer_lookup policy partitioning ```{ "PartitionKeys": [ { "ColumnName": "device_id", "Kind":"Hash", "Properties": { "Function": "XxHash64", "MaxPartitionCount":128, "PartitionAssignmentMode":"Uniform" } } ] }```
-
Create a function to get the manufacturer ID by a device ID:
.create function get_manufacturer_by_device (_device_id:string) { toscalar( device_to_manufacturer_lookup // <-- the lookup table / materialized-view | where device_id == _device_id // <-- the filter on the hash partition key | project manufacturer_id ) }
- Queries or functions that would have filtered only by
manufacturer_id
remain the same. - Queries or functions that would have filtered by both
device_id
andmanufacturer_id
remain the same. - Queries or functions that would have filtered only by
device_id
will change as follows:-
Instead of running this:
telemetry | where device_id == 'input device ID'
Run this:
telemetry | where manufacturer_id == get_manufacturer_by_device('input device ID') // <-- filter on the hash partition key | where device_id == 'input device ID'
-
- With that, you will guarantee to all queries filter on the table’s hash partition key, and benefit from pre-filtering done during query planning.
Frequently asked questions
1. The vast majority of my queries look at data that was ingested in the last 30 minutes or less. Should I define a partitioning policy on my table?
No. The partitioning process runs after ingestion, and may not be able to partition all of the recently ingested data within a short period (e.g. 30 minutes or less).
2. The cardinality of my candidate column for a hash partition key is low (e.g 10 values). Should I define a partitioning policy on my table?
No. With lower cardinalities, the benefits of re-partitioning the data decrease significantly, and aren’t likely to overweigh the overhead of the background process.
3. I have a large backlog (e.g. multi-terabytes) of data that is already ingested from a while back. Should I set the EffectiveDateTime
property in the partitioning policy to include it?
It is recommended to focus on newly ingested data, that hasn’t yet been merged into larger data shards - those are likely to take much longer to re-partition, and that may impact partitioning of newly-ingested data in other tables in the database.
It is possible to re-partition old data, however if you do choose to partition historical data, consider doing so gradually - i.e. set the EffectiveDateTime
to a previous datetime in steps of up to a few days each time you alter the policy, while monitoring the health of the process before moving on to the next step.
4. I’m importing a large data set stored in a different database technology, where it had a partitioning policy defined. Should I use the same partition keys in Kusto?
Not necessarily. By default, tables in Kusto are partitioned according to the time at which data is ingested. In the majority of use cases, there is no need to change that, unlike in other technologies, in which data partitioning is necessary in many cases, to reach better performance.
The specific scenarios in which data partitioning in Kusto should be considered are detailed above.
5. Is there a way for me to see the percentage of partitioned records across all tables in my database?
Yes. You may use the following command:
.show database DATABASE_NAME extents partitioning statistics
6. Using .show commands
, I’m seeing that commands of kind ExtentsPartition
are taking long to complete and are consuming a very high amount of memory and CPU. Is there something I can do to improve the runtime of each command?
This could be a result of too many records / too much data in the source extents being processed as part of a single command. The default targets (5M records / 5GB of original size) may be too high in certain cases, e.g. very wide tables (100s of columns).
In such cases, you can set lower limits for the following properties in the data partitioning policy: MaxRowCountPerOperation
, MaxOriginalSizePerOperation
(documentation).
Furthermore - for a hash partition key, decreasing the value set for MaxPartitionCount
could help reduce the overhead of the partitioning process, though at some cost to the benefits at query runtime. The same goes for the Range
of a uniform range datetime partition key.