Ingesting 2 billion new york city taxi rides into Kusto

Last modified: 04/24/2023

The NYC Taxi & Limousine Commission makes historical data about taxi trips and for-hire-vehicle trips (such as Uber, Lyft, Juno, Via, etc.) available for anyone to download and analyze. These records capture pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.

Todd W. Schneider took this data to the next level, and in his nyc-taxi-data GitHub repo made it easy to import the data into PostgreSQL, and then use PostGIS for doing spatial calculations over it.

At the time of writing this post (February 2019), the available data set included:

  • 2.06 billion total trips
    • ~ 1.5 billion taxi, from 2009-01-01 to 2018-06-30.
    • ~ 0.5 billion for-hire vehicle, from from 2015-01-01 to 2018-06-30.

This is a fair amount of records, and for getting it ingested and analyzed quickly, I made the natural choice of using Kusto. After preparing the data set in PostgreSQL, I easily exported it to blobs in CSV format, and made it available for Kusto to consume.

This post covers ingestion of the data into Kusto, while another post covers analyzing the data, post-ingestion.

Ingesting 1.55 Billion Taxi Trips

Preparing the data set

To prepare this data set, I mostly followed the instructions by Todd W. Schneider on his nyc-taxi-data GitHub repo. As the process is a little tricky and time consuming (using PostgreSQL on a single virtual machine), I’ve included a section with a few tips at the bottom of this post: Appendix: Tips for preparing the Yellow/Green Taxi trips data set.

Update: the enriched data set I used is now available in a public Azure blob storage container: https://kustosamplefiles.blob.core.windows.net/taxirides

Ingesting the files from Azure blob storage

Once the data set was prepared in Azure blob storage, the easy part was getting it into Kusto. First, I created the table with a schema which matches the data I exported from PostgreSQL:

.create table Trips (
    trip_id:long,
    vendor_id:string,
    pickup_datetime:datetime,
    dropoff_datetime:datetime,
    store_and_fwd_flag:string,
    rate_code_id:int,
    pickup_longitude:real,
    pickup_latitude:real,
    dropoff_longitude:real,
    dropoff_latitude:real,
    passenger_count:int,
    trip_distance:real,
    fare_amount:real,
    extra:real,
    mta_tax:real,
    tip_amount:real,
    tolls_amount:real,
    ehail_fee:real,
    improvement_surcharge:real,
    total_amount:real,
    payment_type:string,
    trip_type:int,
    pickup:string,
    dropoff:string,
    cab_type:string,
    precipitation:int,
    snow_depth:int,
    snowfall:int,
    max_temperature:int,
    min_temperature:int,
    average_wind_speed:int,
    pickup_nyct2010_gid:int,
    pickup_ctlabel:string,
    pickup_borocode:int,
    pickup_boroname:string,
    pickup_ct2010:string,
    pickup_boroct2010:string,
    pickup_cdeligibil:string,
    pickup_ntacode:string,
    pickup_ntaname:string,
    pickup_puma:string,
    dropoff_nyct2010_gid:int,
    dropoff_ctlabel:string,
    dropoff_borocode:int,
    dropoff_boroname:string,
    dropoff_ct2010:string,
    dropoff_boroct2010:string,
    dropoff_cdeligibil:string,
    dropoff_ntacode:string,
    dropoff_ntaname:string,
    dropoff_puma:string
)

For ingestion, I chose using LightIngest - a simple command line utility I find very useful and simple to use, if you want to some ad-hoc ingestion.

All I need to know is:

  • The name of my database (TaxiRides)
  • The name of my table (Trips)
  • The name and region for my data management cluster (ingest-myclustername.region)
  • The path to my Azure blob container (https://kustosamplefiles.blob.core.windows.net/taxirides)
  • The format my files were created with (CSV, Gzip-compressed).

And then I run the command:

LightIngest.exe
   https://ingest-<myclustername>.<region>.kusto.windows.net;fed=true
   -database:TaxiRides
   -table:Trips
   -source:https://kustosamplefiles.blob.core.windows.net/taxirides
   -pattern:*.csv.gz
   -format:csv

Measuring ingestion duration

On the client side, this runs in a matter of seconds, as it only queues the files for asynchronous ingestion (read more here).

How long did it take the service to ingest these 1548 files with 1.55 billion records?

I ran this with 2 different configurations, to demonstrate Kusto’s ability to scale its ingestion capacity, depending on the number and kind of nods the cluster has.

I ran the ingestion of the same set of blobs twice, while changing the number of the instances my cluster had) in between:

  • 2 X D14_v2, with a table named Trips
  • 6 X D14_v2, with a table named Trips2
    • These VMs have 16 vCPUs and 112GB of RAM.

Then, I can simply ask the service, using either of the following options, how long it took for each case:

Using the .show commands command:

.show commands 
| where StartedOn > datetime(2019-02-04 06:00)
| where CommandType == "DataIngestPull"
| where Text contains 'into Trips'
| parse Text with * "into " TableName " (" *
| extend ClusterSize = case(TableName == "Trips", "2xD14_v2",
                            TableName == "Trips2", "6xD14_v2",
                            "N/A")
| summarize ['# Commands'] = count(), 
            StartTime = min(StartedOn), 
            EndTime = max(LastUpdatedOn)
            by ClusterSize,
            CommandType, 
            State
| extend Duration = EndTime - StartTime
ClusterSize Duration CommandType State # Commands StartTime EndTime
2xD14_v2 00:47:33.2867817 DataIngestPull Completed 1417 2019-02-04 06:00:39.4265609 2019-02-04 06:48:12.7133426
6xD14_v2 00:20:25.5162013 DataIngestPull Completed 1415 2019-02-08 03:34:09.6342569 2019-02-08 03:54:35.1504582

Or, using the ingestion_time() function:

union withsource=TableName Trips, Trips2
| where pickup_datetime between(datetime(2009-01-01) .. datetime(2018-07-01))
| summarize 
    TotalTrips = count(),
    EarliestTrip = min(pickup_datetime),
    LatestTrip = max(pickup_datetime),
    IngestionDuration = max(ingestion_time()) - min(ingestion_time())
by TableName 
| extend ClusterSize = case(TableName == "Trips", "2xD14_v2",
                            TableName == "Trips2", "6xD14_v2",
                            "N/A")
| project-away TableName
ClusterSize IngestionDuration TotalTrips EarliestTrip LatestTrip
2xD14_v2 00:46:57.8493213 1547471140 2009-01-01 00:00:00.0000000 2018-07-01 00:00:00.0000000
6xD14_v2 00:19:54.1510651 1547471140 2009-01-01 00:00:00.0000000 2018-07-01 00:00:00.0000000

And as you can see, it took only 20 minutes, to ingest these 1,547,471,140 records, from 1548 source files, with 9.5 years’ worth of data. And they’re now fully indexed and ready to query.

Ingesting 0.5 Billion For-Hire-Vehicle Trips

To demonstrate how easy it is to use Kusto’s client libraries to ingest data in supported formats, I chose taking this data set, in parquet format (with this schema) and ingesting it using a Queued Ingestion client, which is available in Kusto’s .NET client library. Needless to say, that C# is just one of the languages in which the client libraries are available.

Looking at the NYC Taxi & Limousine Commission’s site, it’s easy to dynamically build a list of URLs for these parquet files, and have them ingested from their source.

For the purpose of this ingestion I used:

  • A Kusto cluster with 6 D14_v2 nodes (it was over-provisioned for the purpose of the previous section, I scaled it down later).
  • The [Microsoft.Azure.Kusto.Ingest](https://www.nuget.org/packages/Microsoft.Azure.Kusto.Ingest.NETStandard](https://www.nuget.org/packages/Microsoft.Azure.Kusto.Ingest){:target=”_blank”} NuGet package.

Based on the schema provided on the site, I created the following table and ingestion mapping in my database:

.create table FHV_Trips (
    dispatching_base_num:string,
    pickup_datetime:datetime,
    dropoff_datetime:datetime,
    pickup_location_id:int,
    dropoff_location_id:int,
    shared_ride_flag:bool
)

.create-or-alter table FHV_Trips ingestion parquet mapping "FHV_Trips_mapping" '['
  '{"Column": "dispatching_base_num", "Properties": {"Path": "$.dispatching_base_num"}},'
  '{"Column": "pickup_datetime",      "Properties": {"Path": "$.pickup_datetime"}},'
  '{"Column": "dropoff_datetime",     "Properties": {"Path": "$.dropOff_datetime"}},'
  '{"Column": "pickup_location_id",   "Properties": {"Path": "$.PUlocationID"}},'
  '{"Column": "dropoff_location_id",  "Properties": {"Path": "$.DOlocationID"}},'
  '{"Column": "shared_ride_flag",     "Properties": {"Path": "$.SR_Flag"}}'
']'

And here’s the simple application I wrote and ran:

public static void Main()
{
    var kustoConnectionStringBuilder = 
            new KustoConnectionStringBuilder(@"https://ingest-mycluster.region.kusto.windows.net")
            .WithAadApplicationKeyAuthentication(
                "<application id>",
                "<application key>",
                "<authority id>");

    var startTime = new DateTime(2015, 01, 01);
    var endTime = new DateTime(2018, 07, 01);
    
    var ingestionMapping = new IngestionMapping
    {
        IngestionMappingKind = Kusto.Data.Ingestion.IngestionMappingKind.Parquet,
        IngestionMappingReference = "FHV_Trips_mapping"
    };
    
    var ingestionProperties = new KustoIngestionProperties(databaseName: "TaxiRides", tableName: "FHV_Trips")
    {
        IngestionMapping = ingestionMapping
    };

    using (var ingestClient = KustoIngestFactory.CreateQueuedIngestClient(kustoConnectionStringBuilder))
    {
        for (var dt = startTime; dt < endTime; dt = dt.AddMonths(1))
        {
            var uri = $"https://d37ci6vzurychx.cloudfront.net/trip-data/fhv_tripdata_{dt.ToString("yyyy-MM")}.parquet";
            Console.WriteLine("Queueing file '{0}' for ingestion", uri);
            ingestClient.IngestFromStorage(uri, ingestionProperties);
        }
    }
}

Measuring ingestion duration

On the client side, this runs in a matter of seconds, as it only queues the files for asynchronous ingestion (read more here).

How long did it take the service to ingest these 42 files with 0.5 billion records?

I can simply ask the service, using either of the following options:

Using the .show commands command:

.show commands 
| where StartedOn > datetime(2019-02-04 06:00)
| where CommandType == "DataIngestPull"
| where Text has '.ingest-from-storage async into FHV_Trips'
| summarize ['# Commands'] = count(), 
            StartTime = min(StartedOn), 
            EndTime = max(LastUpdatedOn)
| extend Duration = EndTime - StartTime
Duration # Commands StartTime EndTime
00:02:35.0245767 21 2019-02-08 04:10:40.9281504 2019-02-08 04:13:15.9527271

Or, using the ingestion_time() function:

FHV_Trips
| where pickup_datetime between(datetime(2009-01-01) .. datetime(2018-07-01))
| summarize 
    TotalTrips = count(),
    EarliestTrip = min(pickup_datetime),
    LatestTrip = max(pickup_datetime),
    IngestionDuration = max(ingestion_time()) - min(ingestion_time())
IngestionDuration TotalTrips EarliestTrip LatestTrip
00:02:25.3214546 514304551 2015-01-01 00:00:00.0000000 2018-06-30 23:59:59.0000000

And as you can see, it took only 2.5 minutes, to ingest these 514,304,551 records, with 3.5 years’ worth of data. And they’re now fully indexed and ready to query.

Now that I have all of this data ingested, it’s time to start analyzing it.


Appendix: Tips for preparing the Yellow/Green Taxi trips data set

In case you’re going to perform this on an Azure VM, you may find the following tips useful.

Note: These are not related to Kusto, but they may help you get the data prepared, before ingesting it.

  • I used an Ubuntu Server 16.04 LTS D5_v2 virtual machine.

  • You should have at least 700GB of storage available for the raw data and PostgreSQL database, and you should prefer that to be backed by a SSD.

  • I made sure my PostgreSQL instance’s data location was on the larger SSD and not where it is by default, which is the OS drive. I did so by following the useful instructions here.

  • Once the database was ready, I exported it to CSV files with GZip compression, using the COPY command.
    • I chose to generate files with 1,000,000 records each, so I ended up with 1,548 *.csv.gz files.
    • I made sure I deleted the source CSV files from the SSD before starting the COPY, to have enough available disk space.
  • Once the export was complete, I used Azure CLI for uploading the files to an Azure storage blob container.
  • The process is long - It took approximately 2 days to run on my VM.
    • If you’re not interested in enriching the original data set, you might as well ingest it directly from the source (like I did for the FHV trips).

Questions? Comments? Start a discussion