"
]
},
- "metadata": {},
- "output_type": "display_data"
+ "metadata": {}
}
],
- "source": [
- "parameters = ['rank', 'reg']\n",
- "cols = len(parameters)\n",
- "f, axes = plt.subplots(nrows=1, ncols=cols, figsize=(15,5))\n",
- "cmap = plt.cm.jet\n",
- "for i, val in enumerate(parameters):\n",
- " xs = np.array([t['misc']['vals'][val] for t in trials.trials]).ravel()\n",
- " ys = [t['result']['loss'] for t in trials.trials]\n",
- " xs, ys = zip(*sorted(zip(xs, ys)))\n",
- " ys = np.array(ys)\n",
- " axes[i].scatter(xs, ys, s=20, linewidth=0.01, alpha=0.75, c=cmap(float(i)/len(parameters)))\n",
- " axes[i].set_title(val)"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"It can be seen from the above plot that\n",
"* The actual impact of rank is in line with the intuition - the smaller the value the better the result.\n",
"* It is interesting to see that the optimal value of reg is around 0.1 to 0.15. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Get the best model."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 26,
- "metadata": {},
- "outputs": [],
"source": [
"als = ALS(\n",
" rank=best[\"rank\"],\n",
@@ -1902,20 +1131,20 @@
")\n",
" \n",
"model_best_hyperopt = als.fit(train)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Tuning prameters against other metrics can be simply done by modifying the `objective` function. The following shows an objective function of how to tune \"precision@k\". Since `fmin` in `hyperopt` only supports minimization while the actual objective of the loss is to maximize \"precision@k\", `-precision` instead of `precision` is used in the returned value of the `objective` function."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 27,
- "metadata": {},
- "outputs": [],
"source": [
"# Customize an objective function\n",
"def objective_precision(params):\n",
@@ -1988,29 +1217,29 @@
" 'status': STATUS_OK,\n",
" 'eval_time': time_run_start.interval\n",
" }"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"### 4.2 Hyperparameter tuning with `hyperopt` sampling methods"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Though `hyperopt` works well in a single node machine, its features (e.g., `Trials` module) do not support Spark environment, which makes it hard to perform the tuning tasks in a distributed/parallel manner. It is useful to use `hyperopt` for sampling parameter values from the pre-defined sampling space, and then parallelize the model training onto Spark cluster with the sampled parameter combinations.\n",
"\n",
"The downside of this method is that the intelligent searching algorithm (i.e., TPE) of `hyperopt` cannot be used. The approach introduced here is therefore equivalent to random search."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 28,
- "metadata": {},
- "outputs": [],
"source": [
"with Timer() as time_sample:\n",
" # Sample the parameters used for model building from the pre-defined space. \n",
@@ -2018,14 +1247,19 @@
" \n",
" # The following runs model building on the sampled parameter values with the pre-defined objective function.\n",
" results_map = list(map(lambda x: objective(x), sample_params))\n"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 30,
- "metadata": {},
+ "source": [
+ "results_map"
+ ],
"outputs": [
{
+ "output_type": "execute_result",
"data": {
"text/plain": [
"[{'eval_time': 9.468051671981812, 'loss': 1.027085217204854, 'status': 'ok'},\n",
@@ -2055,46 +1289,41 @@
" {'eval_time': 9.08506464958191, 'loss': 1.254533287299843, 'status': 'ok'}]"
]
},
- "execution_count": 30,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 30
}
],
- "source": [
- "results_map"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"Get the best model."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 31,
- "metadata": {},
- "outputs": [],
"source": [
"loss_metrics = np.array([x['loss'] for x in results_map])\n",
"best_loss = np.where(loss_metrics == min(loss_metrics))"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 32,
- "metadata": {},
- "outputs": [],
"source": [
"best_param = sample_params[best_loss[0].item()]"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 33,
- "metadata": {},
- "outputs": [],
"source": [
"als = ALS(\n",
" rank=best_param[\"rank\"],\n",
@@ -2109,29 +1338,29 @@
")\n",
" \n",
"model_best_sample = als.fit(train)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"## 5 Evaluation on testing data"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"The optimal parameters can then be used for building a recommender, which is then evaluated on the testing data.\n",
"\n",
"The following codes generate the evaluation results by using the testing dataset with the optimal model selected against the pre-defined loss. Without loss of generity, in this case, the optimal model that performs the best w.r.t regression loss (i.e., the RMSE metric) is used. One can simply use other metrics like precision@k, as illustrated in the above sections, to evaluate the optimal model on the testing dataset."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 34,
- "metadata": {},
- "outputs": [],
"source": [
"# Get prediction results with the optimal modesl from different approaches.\n",
"prediction_spark = model_best_spark.transform(test)\n",
@@ -2160,14 +1389,19 @@
" }, index=[0])\n",
" \n",
" test_evaluations = test_evaluations.append(result)"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 35,
- "metadata": {},
+ "source": [
+ "test_evaluations"
+ ],
"outputs": [
{
+ "output_type": "execute_result",
"data": {
"text/html": [
"\n",
@@ -2235,62 +1469,58 @@
"0 sample 230.902271 0.287638 0.791199 0.232688 0.988922"
]
},
- "execution_count": 35,
"metadata": {},
- "output_type": "execute_result"
+ "execution_count": 35
}
],
- "source": [
- "test_evaluations"
- ]
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"From the results, it can be seen that, *with the same number of iterations*, Spark native construct based approach takes the least amount of time, even if there is no parallel computing. This is simply because Spark native constructs leverage the underlying Java codes for running the actual analytics with high performance efficiency. Interestingly, the run time for `hyperopt` with TPE algorithm and random search methods are almost the same. Possible reasons for this are that, the TPE algorithm searches optimal parameters intelligently but runs the tuning iterations sequentially. Also, the advantage of TPE may become obvious when there is a higher dimensionality of hyperparameters. \n",
"\n",
"The three approaches use the same RMSE loss. In this measure, the native Spark construct performs the best. The `hyperopt` based approach performs the second best, but the advantage is very subtle. It should be noted that these differences may be owing to many factors like characteristics of datasets, dimensionality of hyperparameter space, sampling size in the searching, etc. Note the differences in the RMSE metrics may also come from the randomness of the intermediate steps in parameter tuning process. In practice, multiple runs are required for generating statistically robust comparison results. We have tried 5 times for running the same comparison codes above. The results aligned well with each other in terms of objective metric values and elapsed time. "
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"# Conclusions"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"In summary, there are mainly three different approaches for running hyperparameter tuning for Spark based recommendation algorithm. The three different approaches are compared as follows."
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"|Approach|Distributed (on Spark)|Param sampling|Advanced hyperparam searching algo|Custom evaluation metrics|Custom data split|\n",
"|---------|-------------|--------------|--------------------------|--------------|------------|\n",
"|AzureML Services|Parallelizing Spark sessions on multi-node cluster or single Spark session on one VM node.)|Random, Grid, Bayesian sampling for discrete and continuous variables.|Bandit policy, Median stopping policy, and truncation selection policy.|Yes|Yes|\n",
"|Spark native construct|Distributed in single-node standalone Spark environment or multi-node Spark cluster.|No|No|Need to re-engineer Spark modules|Need to re-engineer Spark modules.|\n",
"|`hyperopt`|No (only support parallelization on MongoDB)|Random sampling for discrete and continuous variables.|Tree Parzen Estimator|Yes|Yes|"
- ]
+ ],
+ "metadata": {}
},
{
"cell_type": "code",
"execution_count": 36,
- "metadata": {},
- "outputs": [],
"source": [
"# cleanup spark instance\n",
"spark.stop()"
- ]
+ ],
+ "outputs": [],
+ "metadata": {}
},
{
"cell_type": "markdown",
- "metadata": {},
"source": [
"# References\n",
"\n",
@@ -2300,7 +1530,8 @@
"* `hyperopt`, url: http://hyperopt.github.io/hyperopt/.\n",
"* Bergstra, J., Yamins, D., Cox, D. D. (2013) Making a Science of Model Search: Hyperparameter Optimization in Hundreds of Dimensions for Vision Architectures. Proc. of the 30th International Conference on Machine Learning (ICML 2013).\n",
"* Kris Wright, \"Hyper parameter tuning with hyperopt\", url:https://districtdatalabs.silvrback.com/parameter-tuning-with-hyperopt"
- ]
+ ],
+ "metadata": {}
}
],
"metadata": {
@@ -2325,4 +1556,4 @@
},
"nbformat": 4,
"nbformat_minor": 2
-}
+}
\ No newline at end of file
diff --git a/recommenders/datasets/movielens.py b/recommenders/datasets/movielens.py
index 73d7a58f1c..d054bc64fb 100644
--- a/recommenders/datasets/movielens.py
+++ b/recommenders/datasets/movielens.py
@@ -3,33 +3,44 @@
import os
import re
+import random
import shutil
import warnings
import pandas as pd
+from typing import Optional
from zipfile import ZipFile
from recommenders.datasets.download_utils import maybe_download, download_path
from recommenders.utils.notebook_utils import is_databricks
from recommenders.utils.constants import (
- DEFAULT_USER_COL,
+ DEFAULT_HEADER,
DEFAULT_ITEM_COL,
+ DEFAULT_USER_COL,
DEFAULT_RATING_COL,
DEFAULT_TIMESTAMP_COL,
+ DEFAULT_TITLE_COL,
+ DEFAULT_GENRE_COL,
)
try:
from pyspark.sql.types import (
StructType,
StructField,
+ StringType,
IntegerType,
FloatType,
- DoubleType,
- LongType,
- StringType,
+ LongType
)
from pyspark.sql.functions import concat_ws, col
except ImportError:
pass # so the environment without spark doesn't break
+try:
+ import pandera as pa
+ from pandera import Field
+ from pandera.typing import Series
+except ImportError:
+ pass # so the environment without recommender['dev'] doesn't break
+
class _DataFormat:
def __init__(
@@ -100,6 +111,11 @@ def item_has_header(self):
"20m": _DataFormat(",", "ml-20m/ratings.csv", True, ",", "ml-20m/movies.csv", True),
}
+# Fake data for testing only
+MOCK_DATA_FORMAT = {
+ "mock100": {"size": 100, "seed": 6},
+}
+
# 100K data genres index to string mapper. For 1m, 10m, and 20m, the genres labels are already in the dataset.
GENRES = (
"unknown",
@@ -123,12 +139,6 @@ def item_has_header(self):
"Western",
)
-DEFAULT_HEADER = (
- DEFAULT_USER_COL,
- DEFAULT_ITEM_COL,
- DEFAULT_RATING_COL,
- DEFAULT_TIMESTAMP_COL,
-)
# Warning and error messages
WARNING_MOVIE_LENS_HEADER = """MovieLens rating dataset has four columns
@@ -136,7 +146,7 @@ def item_has_header(self):
Will only use the first four column names."""
WARNING_HAVE_SCHEMA_AND_HEADER = """Both schema and header are provided.
The header argument will be ignored."""
-ERROR_MOVIE_LENS_SIZE = "Invalid data size. Should be one of {100k, 1m, 10m, or 20m}"
+ERROR_MOVIE_LENS_SIZE = "Invalid data size. Should be one of {100k, 1m, 10m, or 20m, or mock100}"
ERROR_HEADER = "Header error. At least user and movie column names should be provided"
@@ -154,14 +164,17 @@ def load_pandas_df(
To load movie information only, you can use load_item_df function.
Args:
- size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m").
+ size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m", "mock100").
header (list or tuple or None): Rating dataset header.
+ If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored and data is rendered using the 'DEFAULT_HEADER' instead.
local_cache_path (str): Path (directory or a zip file) to cache the downloaded zip file.
If None, all the intermediate files will be stored in a temporary directory and removed after use.
+ If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored.
title_col (str): Movie title column name. If None, the column will not be loaded.
genres_col (str): Genres column name. Genres are '|' separated string.
If None, the column will not be loaded.
year_col (str): Movie release year column name. If None, the column will not be loaded.
+ If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored.
Returns:
pandas.DataFrame: Movie rating dataset.
@@ -185,7 +198,7 @@ def load_pandas_df(
)
"""
size = size.lower()
- if size not in DATA_FORMAT:
+ if size not in DATA_FORMAT and size not in MOCK_DATA_FORMAT:
raise ValueError(ERROR_MOVIE_LENS_SIZE)
if header is None:
@@ -196,6 +209,15 @@ def load_pandas_df(
warnings.warn(WARNING_MOVIE_LENS_HEADER)
header = header[:4]
+ if size in MOCK_DATA_FORMAT:
+ # generate fake data
+ return MockMovielensSchema.get_df(
+ keep_first_n_cols=len(header),
+ keep_title_col=(title_col is not None),
+ keep_genre_col=(genres_col is not None),
+ **MOCK_DATA_FORMAT[size] # supply the rest of the kwarg with the dictionary
+ )
+
movie_col = header[1]
with download_path(local_cache_path) as path:
@@ -349,17 +371,20 @@ def load_spark_df(
Args:
spark (pyspark.SparkSession): Spark session.
- size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m").
+ size (str): Size of the data to load. One of ("100k", "1m", "10m", "20m", "mock100").
header (list or tuple): Rating dataset header.
- If schema is provided, this argument is ignored.
+ If `schema` is provided or `size` is set to any of 'MOCK_DATA_FORMAT', this argument is ignored.
schema (pyspark.StructType): Dataset schema.
+ If `size` is set to any of 'MOCK_DATA_FORMAT', data is rendered in the 'MockMovielensSchema' instead.
local_cache_path (str): Path (directory or a zip file) to cache the downloaded zip file.
If None, all the intermediate files will be stored in a temporary directory and removed after use.
dbutils (Databricks.dbutils): Databricks utility object
+ If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored.
title_col (str): Title column name. If None, the column will not be loaded.
genres_col (str): Genres column name. Genres are '|' separated string.
If None, the column will not be loaded.
year_col (str): Movie release year column name. If None, the column will not be loaded.
+ If `size` is set to any of 'MOCK_DATA_FORMAT', this parameter is ignored.
Returns:
pyspark.sql.DataFrame: Movie rating dataset.
@@ -394,9 +419,18 @@ def load_spark_df(
spark_df = load_spark_df(spark, dbutils=dbutils)
"""
size = size.lower()
- if size not in DATA_FORMAT:
+ if size not in DATA_FORMAT and size not in MOCK_DATA_FORMAT:
raise ValueError(ERROR_MOVIE_LENS_SIZE)
+ if size in MOCK_DATA_FORMAT:
+ # generate fake data
+ return MockMovielensSchema.get_spark_df(
+ spark,
+ keep_title_col=(title_col is not None),
+ keep_genre_col=(genres_col is not None),
+ **MOCK_DATA_FORMAT[size] # supply the rest of the kwarg with the dictionary
+ )
+
schema = _get_schema(header, schema)
if len(schema) < 2:
raise ValueError(ERROR_HEADER)
@@ -537,3 +571,109 @@ def extract_movielens(size, rating_path, item_path, zip_path):
shutil.copyfileobj(zf, f)
with z.open(DATA_FORMAT[size].item_path) as zf, open(item_path, "wb") as f:
shutil.copyfileobj(zf, f)
+
+
+class MockMovielensSchema(pa.SchemaModel):
+ """
+ Mock dataset schema to generate fake data for testing purpose.
+ This schema is configured to mimic the Movielens dataset
+
+ http://files.grouplens.org/datasets/movielens/ml-100k/
+
+ Dataset schema and generation is configured using pandera.
+ Please see https://pandera.readthedocs.io/en/latest/schema_models.html
+ for more information.
+ """
+ # Some notebooks will do a cross join with userID and itemID,
+ # a sparse range for these IDs can slow down the notebook tests
+ userID: Series[int] = Field(in_range={"min_value": 1, "max_value": 10})
+ itemID: Series[int] = Field(in_range={"min_value": 1, "max_value": 10})
+ rating: Series[float] = Field(in_range={"min_value": 1, "max_value": 5})
+ timestamp: Series[int]
+ title: Series[str] = Field(eq="foo")
+ genre: Series[str] = Field(eq="genreA|0")
+
+ @classmethod
+ def get_df(
+ cls,
+ size: int = 3, seed: int = 100,
+ keep_first_n_cols: Optional[int] = None,
+ keep_title_col: bool = False, keep_genre_col: bool = False,
+ ) -> pd.DataFrame:
+ """Return fake movielens dataset as a Pandas Dataframe with specified rows.
+
+ Args:
+ size (int): number of rows to generate
+ seed (int, optional): seeding the pseudo-number generation. Defaults to 100.
+ keep_first_n_cols (int, optional): keep the first n default movielens columns.
+ keep_title_col (bool): remove the title column if False. Defaults to True.
+ keep_genre_col (bool): remove the genre column if False. Defaults to True.
+
+ Returns:
+ pandas.DataFrame: a mock dataset
+ """
+ schema = cls.to_schema()
+ if keep_first_n_cols is not None:
+ if keep_first_n_cols < 1 or keep_first_n_cols > len(DEFAULT_HEADER):
+ raise ValueError(f"Invalid value for 'keep_first_n_cols': {keep_first_n_cols}. Valid range: [1-{len(DEFAULT_HEADER)}]")
+ schema = schema.remove_columns(DEFAULT_HEADER[keep_first_n_cols:])
+ if not keep_title_col:
+ schema = schema.remove_columns([DEFAULT_TITLE_COL])
+ if not keep_genre_col:
+ schema = schema.remove_columns([DEFAULT_GENRE_COL])
+
+ random.seed(seed)
+ # For more information on data synthesis, see https://pandera.readthedocs.io/en/latest/data_synthesis_strategies.html
+ return schema.example(size=size)
+
+ @classmethod
+ def get_spark_df(
+ cls,
+ spark,
+ size: int = 3, seed: int = 100,
+ keep_title_col: bool = False, keep_genre_col: bool = False,
+ tmp_path: Optional[str] = None,
+ ):
+ """Return fake movielens dataset as a Spark Dataframe with specified rows
+
+ Args:
+ spark (SparkSession): spark session to load the dataframe into
+ size (int): number of rows to generate
+ seed (int): seeding the pseudo-number generation. Defaults to 100.
+ keep_title_col (bool): remove the title column if False. Defaults to False.
+ keep_genre_col (bool): remove the genre column if False. Defaults to False.
+ tmp_path (str, optional): path to store files for serialization purpose
+ when transferring data from python to java.
+ If None, a temporal path is used instead
+
+ Returns:
+ pyspark.sql.DataFrame: a mock dataset
+ """
+ pandas_df = cls.get_df(size=size, seed=seed, keep_title_col=True, keep_genre_col=True)
+
+ # generate temp folder
+ with download_path(tmp_path) as tmp_folder:
+ filepath = os.path.join(tmp_folder, f"mock_movielens_{size}.csv")
+ # serialize the pandas.df as a csv to avoid the expensive java <-> python communication
+ pandas_df.to_csv(filepath, header=False, index=False)
+ spark_df = spark.read.csv(filepath, schema=cls._get_spark_deserialization_schema())
+ # Cache and force trigger action since data-file might be removed.
+ spark_df.cache()
+ spark_df.count()
+
+ if not keep_title_col:
+ spark_df = spark_df.drop(DEFAULT_TITLE_COL)
+ if not keep_genre_col:
+ spark_df = spark_df.drop(DEFAULT_GENRE_COL)
+ return spark_df
+
+ @classmethod
+ def _get_spark_deserialization_schema(cls):
+ return StructType([
+ StructField(DEFAULT_USER_COL, IntegerType()),
+ StructField(DEFAULT_ITEM_COL, IntegerType()),
+ StructField(DEFAULT_RATING_COL, FloatType()),
+ StructField(DEFAULT_TIMESTAMP_COL, StringType()),
+ StructField(DEFAULT_TITLE_COL, StringType()),
+ StructField(DEFAULT_GENRE_COL, StringType()),
+ ])
diff --git a/recommenders/evaluation/spark_evaluation.py b/recommenders/evaluation/spark_evaluation.py
index 37a73778ea..e5112965b2 100644
--- a/recommenders/evaluation/spark_evaluation.py
+++ b/recommenders/evaluation/spark_evaluation.py
@@ -1,9 +1,6 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
-
-import numpy as np
-
try:
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.sql import Window, DataFrame
diff --git a/recommenders/utils/constants.py b/recommenders/utils/constants.py
index 0e7ed34a9e..e24a58d725 100644
--- a/recommenders/utils/constants.py
+++ b/recommenders/utils/constants.py
@@ -6,6 +6,8 @@
DEFAULT_ITEM_COL = "itemID"
DEFAULT_RATING_COL = "rating"
DEFAULT_LABEL_COL = "label"
+DEFAULT_TITLE_COL = "title"
+DEFAULT_GENRE_COL = "genre"
DEFAULT_RELEVANCE_COL = "relevance"
DEFAULT_TIMESTAMP_COL = "timestamp"
DEFAULT_PREDICTION_COL = "prediction"
@@ -13,6 +15,13 @@
DEFAULT_ITEM_FEATURES_COL = "features"
DEFAULT_ITEM_SIM_MEASURE = "item_cooccurrence_count"
+DEFAULT_HEADER = (
+ DEFAULT_USER_COL,
+ DEFAULT_ITEM_COL,
+ DEFAULT_RATING_COL,
+ DEFAULT_TIMESTAMP_COL,
+)
+
COL_DICT = {
"col_user": DEFAULT_USER_COL,
"col_item": DEFAULT_ITEM_COL,
diff --git a/setup.py b/setup.py
index 4e37f407bc..9cca36aa38 100644
--- a/setup.py
+++ b/setup.py
@@ -81,8 +81,10 @@
],
"dev": [
"black>=18.6b4,<21",
+ "pandera[strategies]>=0.6.5", # For generating fake datasets
"pytest>=3.6.4",
"pytest-cov>=2.12.1",
+ "pytest-mock>=3.6.1", # for access to mock fixtures in pytest
],
}
# for the brave of heart
diff --git a/tests/ci/azure_pipeline_test/dsvm_nightly_linux_cpu.yml b/tests/ci/azure_pipeline_test/dsvm_nightly_linux_cpu.yml
index 2c5a698243..15b237650a 100644
--- a/tests/ci/azure_pipeline_test/dsvm_nightly_linux_cpu.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_nightly_linux_cpu.yml
@@ -33,6 +33,6 @@ extends:
timeout: 180
conda_env: "nightly_linux_cpu"
conda_opts: "python=3.6"
- pip_opts: "[examples]"
+ pip_opts: "[examples,dev]"
pytest_markers: "not spark and not gpu"
pytest_params: "-x"
diff --git a/tests/ci/azure_pipeline_test/dsvm_nightly_linux_gpu.yml b/tests/ci/azure_pipeline_test/dsvm_nightly_linux_gpu.yml
index b1182c34c9..c43e8ec981 100644
--- a/tests/ci/azure_pipeline_test/dsvm_nightly_linux_gpu.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_nightly_linux_gpu.yml
@@ -32,6 +32,6 @@ extends:
timeout: 240
conda_env: "nightly_linux_gpu"
conda_opts: "python=3.6 cudatoolkit=10.0 \"cudnn>=7.6\""
- pip_opts: "[gpu,examples] -f https://download.pytorch.org/whl/cu100/torch_stable.html"
+ pip_opts: "[gpu,examples,dev] -f https://download.pytorch.org/whl/cu100/torch_stable.html"
pytest_markers: "not spark and gpu"
pytest_params: "-x"
diff --git a/tests/ci/azure_pipeline_test/dsvm_nightly_linux_pyspark.yml b/tests/ci/azure_pipeline_test/dsvm_nightly_linux_pyspark.yml
index 6fd4e526ea..f542f059ff 100644
--- a/tests/ci/azure_pipeline_test/dsvm_nightly_linux_pyspark.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_nightly_linux_pyspark.yml
@@ -33,6 +33,6 @@ extends:
timeout: 180
conda_env: "nightly_linux_spark"
conda_opts: "python=3.6"
- pip_opts: "[spark,examples]"
+ pip_opts: "[spark,examples,dev]"
pytest_markers: "spark and not gpu"
pytest_params: "-x"
diff --git a/tests/ci/azure_pipeline_test/dsvm_notebook_linux_cpu.yml b/tests/ci/azure_pipeline_test/dsvm_notebook_linux_cpu.yml
index b75cc0c3f5..93eaeacc84 100644
--- a/tests/ci/azure_pipeline_test/dsvm_notebook_linux_cpu.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_notebook_linux_cpu.yml
@@ -60,5 +60,5 @@ extends:
task_name: "Test - Unit Notebook Linux CPU"
conda_env: "unit_notebook_linux_cpu"
conda_opts: "python=3.6"
- pip_opts: "[examples]"
+ pip_opts: "[examples,dev]"
pytest_markers: "notebooks and not spark and not gpu"
diff --git a/tests/ci/azure_pipeline_test/dsvm_notebook_linux_gpu.yml b/tests/ci/azure_pipeline_test/dsvm_notebook_linux_gpu.yml
index 9cb44639e0..6d7594a143 100644
--- a/tests/ci/azure_pipeline_test/dsvm_notebook_linux_gpu.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_notebook_linux_gpu.yml
@@ -60,5 +60,5 @@ extends:
task_name: "Test - Unit Notebook Linux GPU"
conda_env: "unit_notebook_linux_gpu"
conda_opts: "python=3.6 cudatoolkit=10.0 \"cudnn>=7.6\""
- pip_opts: "[gpu,examples] -f https://download.pytorch.org/whl/cu100/torch_stable.html"
+ pip_opts: "[gpu,examples,dev] -f https://download.pytorch.org/whl/cu100/torch_stable.html"
pytest_markers: "notebooks and not spark and gpu"
diff --git a/tests/ci/azure_pipeline_test/dsvm_notebook_linux_pyspark.yml b/tests/ci/azure_pipeline_test/dsvm_notebook_linux_pyspark.yml
index 535f6936a7..31d699588d 100644
--- a/tests/ci/azure_pipeline_test/dsvm_notebook_linux_pyspark.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_notebook_linux_pyspark.yml
@@ -60,5 +60,5 @@ extends:
task_name: "Test - Unit Notebook Linux Spark"
conda_env: "unit_notebook_linux_spark"
conda_opts: "python=3.6"
- pip_opts: "[spark,examples]"
+ pip_opts: "[spark,examples,dev]"
pytest_markers: "notebooks and spark and not gpu"
diff --git a/tests/ci/azure_pipeline_test/dsvm_unit_linux_cpu.yml b/tests/ci/azure_pipeline_test/dsvm_unit_linux_cpu.yml
index be3b95c587..26ed5bdf2f 100644
--- a/tests/ci/azure_pipeline_test/dsvm_unit_linux_cpu.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_unit_linux_cpu.yml
@@ -60,5 +60,5 @@ extends:
task_name: "Test - Unit Linux CPU"
conda_env: "unit_linux_cpu"
conda_opts: "python=3.6"
- pip_opts: ""
+ pip_opts: "[dev]"
pytest_markers: "not notebooks and not spark and not gpu"
diff --git a/tests/ci/azure_pipeline_test/dsvm_unit_linux_gpu.yml b/tests/ci/azure_pipeline_test/dsvm_unit_linux_gpu.yml
index b9a76211d9..9aa46047e6 100644
--- a/tests/ci/azure_pipeline_test/dsvm_unit_linux_gpu.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_unit_linux_gpu.yml
@@ -60,5 +60,5 @@ extends:
task_name: "Test - Unit Linux GPU"
conda_env: "unit_linux_gpu"
conda_opts: "python=3.6 cudatoolkit=10.0 \"cudnn>=7.6\""
- pip_opts: "[gpu] -f https://download.pytorch.org/whl/cu100/torch_stable.html"
+ pip_opts: "[gpu,dev] -f https://download.pytorch.org/whl/cu100/torch_stable.html"
pytest_markers: "not notebooks and not spark and gpu"
diff --git a/tests/ci/azure_pipeline_test/dsvm_unit_linux_pyspark.yml b/tests/ci/azure_pipeline_test/dsvm_unit_linux_pyspark.yml
index f99b151cad..1f3006a05e 100644
--- a/tests/ci/azure_pipeline_test/dsvm_unit_linux_pyspark.yml
+++ b/tests/ci/azure_pipeline_test/dsvm_unit_linux_pyspark.yml
@@ -60,5 +60,5 @@ extends:
task_name: "Test - Unit Linux Spark"
conda_env: "unit_linux_spark"
conda_opts: "python=3.6"
- pip_opts: "[spark]"
+ pip_opts: "[spark,dev]"
pytest_markers: "not notebooks and spark and not gpu"
diff --git a/tests/unit/examples/test_notebooks_pyspark.py b/tests/unit/examples/test_notebooks_pyspark.py
index e4ae1d9464..6ccd970492 100644
--- a/tests/unit/examples/test_notebooks_pyspark.py
+++ b/tests/unit/examples/test_notebooks_pyspark.py
@@ -8,6 +8,8 @@
except ImportError:
pass # disable error while collecting tests for non-notebook environments
+from recommenders.utils.constants import DEFAULT_RATING_COL, DEFAULT_USER_COL, DEFAULT_ITEM_COL
+
@pytest.mark.notebooks
@pytest.mark.spark
@@ -16,7 +18,13 @@
)
def test_als_pyspark_runs(notebooks, output_notebook, kernel_name):
notebook_path = notebooks["als_pyspark"]
- pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name)
+ pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name,
+ parameters=dict(
+ MOVIELENS_DATA_SIZE="mock100",
+ COL_USER=DEFAULT_USER_COL,
+ COL_ITEM=DEFAULT_ITEM_COL,
+ COL_RATING=DEFAULT_RATING_COL,
+ ))
@pytest.mark.notebooks
@@ -33,7 +41,13 @@ def test_data_split_runs(notebooks, output_notebook, kernel_name):
)
def test_als_deep_dive_runs(notebooks, output_notebook, kernel_name):
notebook_path = notebooks["als_deep_dive"]
- pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name)
+ pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name,
+ parameters=dict(
+ MOVIELENS_DATA_SIZE="mock100",
+ COL_USER=DEFAULT_USER_COL,
+ COL_ITEM=DEFAULT_ITEM_COL,
+ COL_RATING=DEFAULT_RATING_COL,
+ ))
@pytest.mark.notebooks
@@ -50,7 +64,14 @@ def test_evaluation_runs(notebooks, output_notebook, kernel_name):
@pytest.mark.spark
def test_evaluation_diversity_runs(notebooks, output_notebook, kernel_name):
notebook_path = notebooks["evaluation_diversity"]
- pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name)
+ pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name,
+ parameters=dict(
+ TOP_K=10,
+ MOVIELENS_DATA_SIZE="mock100",
+ COL_USER=DEFAULT_USER_COL,
+ COL_ITEM=DEFAULT_ITEM_COL,
+ COL_RATING=DEFAULT_RATING_COL,
+ ))
@pytest.mark.notebooks
@@ -65,6 +86,7 @@ def test_spark_tuning(notebooks, output_notebook, kernel_name):
output_notebook,
kernel_name=kernel_name,
parameters=dict(
+ MOVIELENS_DATA_SIZE="mock100",
NUMBER_CORES="*",
NUMBER_ITERATIONS=3,
SUBSET_RATIO=0.5,
diff --git a/tests/unit/examples/test_notebooks_python.py b/tests/unit/examples/test_notebooks_python.py
index 76cd854d28..e9cda6810e 100644
--- a/tests/unit/examples/test_notebooks_python.py
+++ b/tests/unit/examples/test_notebooks_python.py
@@ -52,7 +52,8 @@ def test_baseline_deep_dive_runs(notebooks, output_notebook, kernel_name):
@pytest.mark.notebooks
def test_surprise_deep_dive_runs(notebooks, output_notebook, kernel_name):
notebook_path = notebooks["surprise_svd_deep_dive"]
- pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name)
+ pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name,
+ parameters=dict(MOVIELENS_DATA_SIZE="mock100"))
@pytest.mark.notebooks
@@ -100,7 +101,8 @@ def test_wikidata_runs(notebooks, output_notebook, kernel_name, tmp):
@pytest.mark.notebooks
def test_rlrmc_quickstart_runs(notebooks, output_notebook, kernel_name):
notebook_path = notebooks["rlrmc_quickstart"]
- pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name)
+ pm.execute_notebook(notebook_path, output_notebook, kernel_name=kernel_name,
+ parameters=dict(rank_parameter=2, MOVIELENS_DATA_SIZE="mock100"))
@pytest.mark.notebooks
diff --git a/tests/unit/recommenders/datasets/test_movielens.py b/tests/unit/recommenders/datasets/test_movielens.py
new file mode 100644
index 0000000000..d8f12771f9
--- /dev/null
+++ b/tests/unit/recommenders/datasets/test_movielens.py
@@ -0,0 +1,125 @@
+import os
+import pandas
+import pytest
+
+from recommenders.datasets.movielens import MockMovielensSchema
+from recommenders.datasets.movielens import load_pandas_df, load_spark_df
+from recommenders.datasets.movielens import DATA_FORMAT, MOCK_DATA_FORMAT, DEFAULT_HEADER
+from recommenders.utils.constants import DEFAULT_GENRE_COL, DEFAULT_TITLE_COL
+
+from pandas.core.series import Series
+from pytest_mock import MockerFixture
+
+
+@pytest.mark.parametrize("size", [10, 100])
+def test_mock_movielens_schema__has_default_col_names(size):
+ df = MockMovielensSchema.example(size=size)
+ for col_name in DEFAULT_HEADER:
+ assert col_name in df.columns
+
+
+@pytest.mark.parametrize("keep_first_n_cols", [1, 2, 3, 4])
+def test_mock_movielens_schema__get_df_remove_default_col__return_success(keep_first_n_cols):
+ df = MockMovielensSchema.get_df(size=3, keep_first_n_cols=keep_first_n_cols)
+ assert len(df) > 0
+ assert len(df.columns) == keep_first_n_cols
+
+
+@pytest.mark.parametrize("keep_first_n_cols", [-1, 0, 100])
+def test_mock_movielens_schema__get_df_invalid_param__return_failure(keep_first_n_cols):
+ with pytest.raises(ValueError, match=r"Invalid value.*"):
+ MockMovielensSchema.get_df(size=3, keep_first_n_cols=keep_first_n_cols)
+
+
+@pytest.mark.parametrize("keep_genre_col", [True, False])
+@pytest.mark.parametrize("keep_title_col", [True, False])
+@pytest.mark.parametrize("keep_first_n_cols", [None, 2])
+@pytest.mark.parametrize("seed", [-1]) # seed for pseudo-random # generation
+@pytest.mark.parametrize("size", [0, 3, 10])
+def test_mock_movielens_schema__get_df__return_success(size, seed, keep_first_n_cols, keep_title_col, keep_genre_col):
+ df = MockMovielensSchema.get_df(
+ size=size, seed=seed,
+ keep_first_n_cols=keep_first_n_cols,
+ keep_title_col=keep_title_col, keep_genre_col=keep_genre_col
+ )
+ assert type(df) == pandas.DataFrame
+ assert len(df) == size
+
+ if keep_title_col:
+ assert len(df[DEFAULT_TITLE_COL]) == size
+ if keep_genre_col:
+ assert len(df[DEFAULT_GENRE_COL]) == size
+
+
+@pytest.mark.spark
+@pytest.mark.parametrize("keep_genre_col", [True, False])
+@pytest.mark.parametrize("keep_title_col", [True, False])
+@pytest.mark.parametrize("seed", [101]) # seed for pseudo-random # generation
+@pytest.mark.parametrize("size", [0, 3, 10])
+def test_mock_movielens_schema__get_spark_df__return_success(spark, size, seed, keep_title_col, keep_genre_col):
+ df = MockMovielensSchema.get_spark_df(spark, size=size, seed=seed, keep_title_col=keep_title_col, keep_genre_col=keep_genre_col)
+ assert df.count() == size
+
+ if keep_title_col:
+ assert df.schema[DEFAULT_TITLE_COL]
+ if keep_genre_col:
+ assert df.schema[DEFAULT_GENRE_COL]
+
+
+@pytest.mark.spark
+def test_mock_movielens_schema__get_spark_df__store_tmp_file(spark, tmp_path):
+ data_size = 3
+ MockMovielensSchema.get_spark_df(spark, size=data_size, tmp_path=tmp_path)
+ assert os.path.exists(os.path.join(tmp_path, f"mock_movielens_{data_size}.csv"))
+
+
+@pytest.mark.spark
+def test_mock_movielens_schema__get_spark_df__data_serialization_default_param(spark, mocker: MockerFixture):
+ data_size = 3
+ to_csv_spy = mocker.spy(pandas.DataFrame, "to_csv")
+
+ df = MockMovielensSchema.get_spark_df(spark, size=data_size)
+ # assertions
+ to_csv_spy.assert_called_once()
+ assert df.count() == data_size
+
+
+def test_mock_movielens_data__no_name_collision():
+ """
+ Making sure that no common names are shared between the mock and real dataset sizes
+ """
+ dataset_name = set(DATA_FORMAT.keys())
+ dataset_name_mock = set(MOCK_DATA_FORMAT.keys())
+ collision = dataset_name.intersection(dataset_name_mock)
+ assert not collision
+
+
+@pytest.mark.spark
+def test_load_spark_df_mock_100__with_default_param__succeed(spark):
+ df = load_spark_df(spark, "mock100")
+ assert df.count() == 100
+
+
+def test_load_pandas_df_mock_100__with_default_param__succeed():
+ df = load_pandas_df("mock100")
+ assert type(df) == pandas.DataFrame
+ assert len(df) == 100
+
+
+@pytest.mark.spark
+def test_load_spark_df_mock_100__with_custom_param__succeed(spark):
+ df = load_spark_df(spark, "mock100", title_col=DEFAULT_TITLE_COL, genres_col=DEFAULT_GENRE_COL)
+ assert df.schema[DEFAULT_TITLE_COL]
+ assert df.schema[DEFAULT_GENRE_COL]
+ assert df.count() == 100
+ assert '|' in df.take(1)[0][DEFAULT_GENRE_COL]
+ assert df.take(1)[0][DEFAULT_TITLE_COL] == 'foo'
+
+
+def test_load_pandas_df_mock_100__with_custom_param__succeed():
+ df = load_pandas_df("mock100", title_col=DEFAULT_TITLE_COL, genres_col=DEFAULT_GENRE_COL)
+ assert type(df[DEFAULT_TITLE_COL]) == Series
+ assert type(df[DEFAULT_GENRE_COL]) == Series
+ assert len(df) == 100
+ assert '|' in df.loc[0, DEFAULT_GENRE_COL]
+ assert df.loc[0, DEFAULT_TITLE_COL] == 'foo'