Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Add declarations and examples for Python HDK API. #170

Merged
merged 3 commits into from
Feb 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
trip_id,vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,pickup,dropoff,cab_type,precipitation,snow_depth,snowfall,max_temperature,min_temperature,average_wind_speed,pickup_nyct2010_gid,pickup_ctlabel,pickup_borocode,pickup_boroname,pickup_ct2010,pickup_boroct2010,pickup_cdeligibil,pickup_ntacode,pickup_ntaname,pickup_puma,dropoff_nyct2010_gid,dropoff_ctlabel,dropoff_borocode,dropoff_boroname,dropoff_ct2010,dropoff_boroct2010,dropoff_cdeligibil,dropoff_ntacode,dropoff_ntaname,dropoff_puma
1,2,2013-08-01 08:14:37,2013-08-01 09:09:06,N,1,0,0,0,0,1,0.00,21.25,0,0,0,0,0,0,21.25,2,0,0,0,green,0.65,0,0.0,76,66,2.91,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0
2,2,2013-08-01 09:13:00,2013-08-01 11:38:00,N,1,,,,,2,0.00,74.5,0,0.5,0,0,,,75,2,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
3,2,2013-08-01 09:48:00,2013-08-01 09:49:00,N,5,,,,,1,0.00,1,0.1,0,0,1,,,2.1,2,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
4,2,2013-08-01 10:38:35,2013-08-01 10:38:51,N,1,,,,,1,0.00,3.25,0,0,0,0,,,3.25,2,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
5,2,2013-08-01 11:51:45,2013-08-01 12:03:52,N,1,,,,,1,0.00,8.5,0,0.5,0,0,,,9,2,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
6,2,2013-08-01 14:33:39,2013-08-01 15:49:00,N,1,,,,,1,0.00,9,0,0.5,0,0,,,9.5,2,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
7,2,2013-08-01 17:19:00,2013-08-01 17:19:00,N,1,,,,,1,0.00,2.5,1,0.5,0,0,,,4,2,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
8,2,2013-08-01 17:22:00,2013-08-01 17:22:00,N,1,-73.937767028808594,40.758480072021484,-73.937767028808594,40.758480072021484,1,0.00,2.5,1,0.5,0,5.33,,,9.33,2,,0101000020E610000000000060047C52C0000000E015614440,0101000020E610000000000060047C52C0000000E015614440,green,0.65,0,0.0,76,66,2.91,1658,33,4,Queens,003300,4003300,E,QN68,Queensbridge-Ravenswood-Long Island City,4101,1658,33,4,Queens,003300,4003300,E,QN68,Queensbridge-Ravenswood-Long Island City,4101
9,2,2013-08-01 17:24:00,2013-08-01 17:25:00,N,1,-73.93792724609375,40.757843017578125,-73.93792724609375,40.757843017578125,1,0.00,2.5,1,0.5,0,1.11,,,5.11,2,,0101000020E610000000000000077C52C00000000001614440,0101000020E610000000000000077C52C00000000001614440,green,0.65,0,0.0,76,66,2.91,1658,33,4,Queens,003300,4003300,E,QN68,Queensbridge-Ravenswood-Long Island City,4101,1658,33,4,Queens,003300,4003300,E,QN68,Queensbridge-Ravenswood-Long Island City,4101
10,2,2013-08-01 19:21:09,2013-08-01 19:22:30,N,1,,,,,5,0.00,3,1,0.5,0,0,,,4.5,1,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
11,2,2013-08-01 19:29:27,2013-08-01 19:32:38,N,4,,,,,5,0.00,3,1,0.5,0,0,,,4.5,2,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
12,2,2013-08-01 19:33:28,2013-08-01 19:35:21,N,1,,,,,5,0.00,3,1,0.5,0.08,0,,,4.58,1,,,,green,0.65,0,0.0,76,66,2.91,,,,,,,,,,,,,,,,,,,,
13,2,2013-08-02 09:37:44,2013-08-02 09:38:08,N,1,,,,,1,0.00,1.5,0,0.5,0,0,,,2,1,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
14,2,2013-08-02 09:43:58,2013-08-02 09:44:13,N,1,,,,,1,0.00,1.5,0,0.5,0,0,,,2,1,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
15,2,2013-08-02 09:44:43,2013-08-02 09:45:15,N,1,,,,,1,0.00,1.5,0,0.5,0,0,,,2,1,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
16,2,2013-08-02 11:26:32,2013-08-02 11:26:54,N,1,,,,,1,0.00,1.5,0,0.5,0,0,,,11,2,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
17,2,2013-08-02 11:32:36,2013-08-02 11:45:45,N,1,,,,,1,0.00,7.45,0,0,0,0,,,7.45,2,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
18,2,2013-08-02 11:46:27,2013-08-02 11:47:32,N,1,,,,,1,0.00,3,0,0.5,0,0,,,3.5,2,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
19,2,2013-08-02 12:10:46,2013-08-02 12:11:45,N,1,,,,,1,0.00,3.45,0,0,0,0,,,3.45,2,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
20,2,2013-08-02 12:19:55,2013-08-02 12:20:11,N,1,,,,,1,0.00,3.25,0,0,0,0,,,3.25,2,,,,green,0.00,0,0.0,83,67,4.47,,,,,,,,,,,,,,,,,,,,
2,064 changes: 2,064 additions & 0 deletions python/pyhdk/hdk.py

Large diffs are not rendered by default.

447 changes: 447 additions & 0 deletions python/tests/test_pyhdk_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,447 @@
#!/usr/bin/env python3

#
# Copyright 2022 Intel Corporation.
#
# SPDX-License-Identifier: Apache-2.0


import json
import pandas
import pyarrow
import pytest
import pyhdk


class BaseTest:
@staticmethod
def check_taxi_q1_res(res):
df = res.to_arrow().to_pandas()
assert df["cab_type"].tolist() == ["green"]
assert df["cnt"].tolist() == [20]

@staticmethod
def check_taxi_q2_res(res):
df = res.to_arrow().to_pandas()
assert df["passenger_count"].tolist() == [1, 2, 5]
assert df["total_amount_avg"].tolist() == [98.19 / 16, 75.0, 13.58 / 3]

@staticmethod
def check_taxi_q3_res(res):
df = res.to_arrow().to_pandas()
assert df["passenger_count"].tolist() == [1, 2, 5]
assert df["pickup_year"].tolist() == [2013, 2013, 2013]
assert df["cnt"].tolist() == [16, 1, 3]

@staticmethod
def check_taxi_q4_res(res):
df = res.to_arrow().to_pandas()
assert df["passenger_count"].tolist() == [1, 5, 2]
assert df["pickup_year"].tolist() == [2013, 2013, 2013]
assert df["distance"].tolist() == [0, 0, 0]
assert df["cnt"].tolist() == [16, 3, 1]


class TestTaxiSql(BaseTest):
def test_taxi_over_csv_modular(self):
# Initialize HDK components
config = pyhdk.buildConfig()
storage = pyhdk.storage.ArrowStorage(1)
data_mgr = pyhdk.storage.DataMgr(config)
data_mgr.registerDataProvider(storage)
calcite = pyhdk.sql.Calcite(storage, config)
executor = pyhdk.Executor(data_mgr, config)

# Import data
storage.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv",
"trips",
)

# Run Taxi Q1 SQL query
ra = calcite.process(
"SELECT cab_type, count(*) as cnt FROM trips GROUP BY cab_type;"
)
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)
res = rel_alg_executor.execute()
self.check_taxi_q1_res(res)

# Run Taxi Q2 SQL query
ra = calcite.process(
"""SELECT passenger_count, AVG(total_amount) as total_amount_avg
FROM trips
GROUP BY passenger_count
ORDER BY passenger_count;"""
)
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)
res = rel_alg_executor.execute()
self.check_taxi_q2_res(res)

# Run Taxi Q3 SQL query
ra = calcite.process(
"""SELECT passenger_count, extract(year from pickup_datetime) AS pickup_year, count(*) as cnt
FROM trips
GROUP BY passenger_count, pickup_year
ORDER BY passenger_count;"""
)
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)
res = rel_alg_executor.execute()
self.check_taxi_q3_res(res)

# Run Taxi Q4 SQL query
ra = calcite.process(
"""SELECT
passenger_count,
extract(year from pickup_datetime) AS pickup_year,
cast(trip_distance as int) AS distance,
count(*) AS cnt
FROM trips
GROUP BY passenger_count, pickup_year, distance
ORDER BY pickup_year, cnt desc;"""
)
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, ra)
res = rel_alg_executor.execute()
self.check_taxi_q4_res(res)

@pytest.mark.skip(reason="unimplemented concept")
def test_taxi_over_csv_explicit_instance(self):
# Initialize HDK components wrapped into a single object
hdk = pyhdk.init()

# Import data
# import for all cases?
# globs?
hdk.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv",
"trips",
)

# Run Taxi Q1 SQL query
res = hdk.sql("SELECT cab_type, count(*) as cnt FROM trips GROUP BY cab_type;")
self.check_taxi_q1_res(res)

# Run Taxi Q2 SQL query
res = hdk.sql(
"""SELECT passenger_count, AVG(total_amount) as total_amount_avg
FROM trips
GROUP BY passenger_count
ORDER BY passenger_count;"""
)
self.check_taxi_q2_res(res)

# Run Taxi Q3 SQL query
res = hdk.sql(
"""SELECT passenger_count, extract(year from pickup_datetime) AS pickup_year, count(*) as cnt
FROM trips
GROUP BY passenger_count, pickup_year
ORDER BY passenger_count;"""
)
self.check_taxi_q3_res(res)

# Run Taxi Q4 SQL query
res = hdk.sql(
"""SELECT
passenger_count,
extract(year from pickup_datetime) AS pickup_year,
cast(trip_distance as int) AS distance,
count(*) AS cnt
FROM trips
GROUP BY passenger_count, pickup_year, distance
ORDER BY pickup_year, cnt desc;"""
)
self.check_taxi_q4_res(res)

@pytest.mark.skip(reason="unimplemented concept")
def test_taxi_over_csv_explicit_aliases(self):
# Initialize HDK components hidden from users
hdk = pyhdk.init()

# Import data
# Tables are referenced through the resulting object
# Might allow to unify work with imported tables and temporary
# tables (results of other queries)
trips = hdk.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv"
)

# Run Taxi Q1 SQL query
res = hdk.sql(
"SELECT cab_type, count(*) as cnt FROM trips GROUP BY cab_type;",
trips=trips,
)
self.check_taxi_q1_res(res)

# Run Taxi Q2 SQL query
res = hdk.sql(
"""SELECT passenger_count, AVG(total_amount) as total_amount_avg
FROM trips
GROUP BY passenger_count
ORDER BY passenger_count;""",
trips=trips,
)
self.check_taxi_q2_res(res)

# Run Taxi Q3 SQL query
res = hdk.sql(
"""SELECT passenger_count, extract(year from pickup_datetime) AS pickup_year, count(*) as cnt
FROM trips
GROUP BY passenger_count, pickup_year
ORDER BY passenger_count;""",
trips=trips,
)
self.check_taxi_q3_res(res)

# Run Taxi Q4 SQL query
res = hdk.sql(
"""SELECT
passenger_count,
extract(year from pickup_datetime) AS pickup_year,
cast(trip_distance as int) AS distance,
count(*) AS cnt
FROM trips
GROUP BY passenger_count, pickup_year, distance
ORDER BY pickup_year, cnt desc;""",
trips=trips,
)
self.check_taxi_q4_res(res)

@pytest.mark.skip(reason="unimplemented concept")
def test_taxi_over_csv_multistep(self):
# Initialize HDK components hidden from users
hdk = pyhdk.init()

# Import data
trips = hdk.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv"
)

# Run Taxi Q3 SQL query in 2 steps
tmp = hdk.sql(
"""SELECT passenger_count, extract(year from pickup_datetime) AS pickup_year
FROM trips""",
trips=trips,
)
res = hdk.sql(
"""SELECT passenger_count, pickup_year, count(*) as cnt
FROM trips
GROUP BY passenger_count, pickup_year
ORDER BY passenger_count;""",
trips=tmp,
)
self.check_taxi_q3_res(res)

# Run Taxi Q4 SQL query in 3 steps
tmp = hdk.sql(
"""SELECT
passenger_count,
extract(year from pickup_datetime) AS pickup_year,
cast(trip_distance as int) AS distance
FROM trips;""",
trips=trips,
)
tmp = hdk.sql(
"""SELECT passenger_count, pickup_year, distance, count(*) AS cnt
FROM trips
GROUP BY passenger_count, pickup_year, distance;""",
trips=tmp,
)
res = hdk.sql("SELET * FROM trips ORDER BY pickup_year, cnt desc;", trips=tmp)
self.check_taxi_q4_res(res)


class TestTaxiIR(BaseTest):
@pytest.mark.skip(reason="QueryBuilder API is not yet available in PyHDK")
def test_taxi_over_csv_modular(self):
# Initialize HDK components
config = pyhdk.buildConfig()
storage = pyhdk.storage.ArrowStorage(1)
data_mgr = pyhdk.storage.DataMgr(config)
data_mgr.registerDataProvider(storage)
executor = pyhdk.Executor(data_mgr, config)

# Import data
storage.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv",
"trips",
)

# Run Taxi Q1 IR query
builder = pyhdk.QueryBuilder(config, storage)
dag = builder.scan("trips").agg("cab_type", "count").finalize()
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, dag)
res = rel_alg_executor.execute()
self.check_taxi_q1_res(res)

# Run Taxi Q2 IR query
builder = pyhdk.QueryBuilder(config, storage)
dag = (
builder.scan("trips")
.agg("passenger_count", "avg(total_amount)")
.sort("passenger_count")
.finalize()
)
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, dag)
res = rel_alg_executor.execute()
self.check_taxi_q2_res(res)

# Run Taxi Q3 IR query
builder = pyhdk.QueryBuilder(config, storage)
trips = builder.scan("trips")
dag = (
trips.proj(
[
trips["passenger_count"],
trips["pickup_datetime"].extract("year").name("pickup_year"),
]
)
.agg(["passenger_count", "pickup_year"], "count")
.sort("passenger_count")
.finalize()
)
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, dag)
res = rel_alg_executor.execute()
self.check_taxi_q3_res(res)

# Run Taxi Q4 IR query
builder = pyhdk.QueryBuilder(config, storage)
trips = builder.scan("trips")
dag = (
trips.proj(
[
trips["passenger_count"],
trips["pickup_datetime"].extract("year").name("pickup_year"),
trips["trip_distance"].cast("int32").name("distance"),
]
)
.agg(["passenger_count", "pickup_year", "distance"], "count")
.sort(("pickup_year", "asc"), ("count", "desc"))
.finalize()
)
rel_alg_executor = pyhdk.sql.RelAlgExecutor(executor, storage, data_mgr, dag)
res = rel_alg_executor.execute()
self.check_taxi_q4_res(res)

@pytest.mark.skip(reason="unimplemented concept")
def test_taxi_over_csv_explicit_instance(self):
# Initialize HDK components wrapped into a single object
hdk = pyhdk.init()

# Import data
hdk.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv",
"trips",
)

# Run Taxi Q1 IR query
res = hdk.scan("trips").agg("cab_type", "count").run()
self.check_taxi_q1_res(res)

# Run Taxi Q2 IR query
res = (
hdk.scan("trips")
.agg("passenger_count", "avg(total_amount)")
.sort("passenger_count")
.run()
)
self.check_taxi_q2_res(res)

# Run Taxi Q3 IR query
trips = hdk.scan("trips")
res = (
trips.proj(
[
trips["passenger_count"],
trips["pickup_datetime"].extract("year").name("pickup_year"),
]
)
.agg(["passenger_count", "pickup_year"], "count")
.sort("passenger_count")
.run()
)
self.check_taxi_q3_res(res)

# Run Taxi Q4 IR query
trips = hdk.scan("trips")
res = (
trips.proj(
[
trips["passenger_count"],
trips["pickup_datetime"].extract("year").name("pickup_year"),
trips["trip_distance"].cast("int32").name("distance"),
]
)
.agg(["passenger_count", "pickup_year", "distance"], "count")
.sort(("pickup_year", "asc"), ("count", "desc"))
.run()
)
self.check_taxi_q4_res(res)

@pytest.mark.skip(reason="unimplemented concept")
def test_taxi_over_csv_implicit_scan(self):
# Initialize HDK components hidden from users
hdk = pyhdk.init()

# Import data
# How to reference it in SQL? Use file name as a table name? Use the same approach as in Modin?
# When is it deleted from the storage? Only exlicit tables drop for simplicity?
trips = hdk.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv"
)

# Run Taxi Q1 IR query
res = trips.agg("cab_type", "count").run()
self.check_taxi_q1_res(res)

# Run Taxi Q2 IR query
res = (
trips.agg("passenger_count", "avg(total_amount)")
.sort("passenger_count")
.run()
)
self.check_taxi_q2_res(res)

# Run Taxi Q3 IR query
res = (
trips.proj(
[
"passenger_count",
trips["pickup_datetime"].extract("year").name("pickup_year"),
]
)
.agg(["passenger_count", "pickup_year"], "count")
.sort("passenger_count")
.run()
)
self.check_taxi_q3_res(res)

# Run Taxi Q4 IR query
res = (
trips.proj(
[
"passenger_count",
trips["pickup_datetime"].extract("year").name("pickup_year"),
trips["trip_distance"].cast("int32").name("distance"),
]
)
.agg([0, 1, 2], "count")
.sort(("pickup_year", "asc"), ("count", "desc"))
.run()
)
self.check_taxi_q4_res(res)

@pytest.mark.skip(reason="unimplemented concept")
def test_run_query_on_results(self):
# Initialize HDK components hidden from users
hdk = pyhdk.init()

# Import data
trips = hdk.importCsvFile(
"omniscidb/Tests/ArrowStorageDataFiles/taxi_sample_header_no_null.csv"
)

# Run a part of Taxi Q2 IR query
res = trips.agg("passenger_count", "avg(total_amount)").run()
# Now sort it to get the final result
# Can we make it without transforming to Arrow with the following import to ArrowStorage?
res = res.sort("passenger_count").run()
self.check_taxi_q2_res(res)