diff --git a/matsim/scenariogen/data/__init__.py b/matsim/scenariogen/data/__init__.py index 892c322..40e1bb6 100644 --- a/matsim/scenariogen/data/__init__.py +++ b/matsim/scenariogen/data/__init__.py @@ -2,18 +2,17 @@ """ __all__ = ["read_all", "ParkingPosition", "HouseholdType", "EconomicStatus", "Gender", "Employment", "Availability", - "Purpose", + "Purpose", "_check_header", "TripMode", "DistanceGroup", "DurationGroup", "SourceDestinationGroup", "Household", "Person", "Trip", "Activity", "Visitations"] +import numpy as np import os +import pandas as pd from dataclasses import dataclass from enum import Enum, auto from typing import List, Union, Tuple, get_type_hints -import numpy as np -import pandas as pd - def _batch(iterable: list, max_batch_size: int): """ Batches an iterable into lists of given maximum size, yielding them one by one. """ @@ -27,10 +26,25 @@ def _batch(iterable: list, max_batch_size: int): yield batch +def _check_header(path: os.PathLike, keywords: List[str]) -> bool: + """ Check if the first line of a file contains certain keywords. + :param path: Path to file + :param keywords: List of keywords that should be present in the first line + """ + valid = True + with open(path, "r", errors='replace') as f: + line = f.readline() + for k in keywords: + if k not in line: + valid = False + + return valid + + def read_all(dirs: Union[str, List[str]], regio=None) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Scan directories and read everything into one dataframe """ - from .formats import srv, mid + from .formats import srv, mid, jp_milt hh = [] pp = [] @@ -40,9 +54,10 @@ def read_all(dirs: Union[str, List[str]], regio=None) -> Tuple[pd.DataFrame, pd. if type(dirs) == str: dirs = [dirs] + found = False for d in dirs: - for format in (srv, mid): + for format in (srv, mid, jp_milt): files = [] @@ -69,6 +84,11 @@ def read_all(dirs: Union[str, List[str]], regio=None) -> Tuple[pd.DataFrame, pd. pp.append(df[1]) tt.append(df[2]) + found = True + + if not found: + raise ValueError("No valid survey data found in directories: " + str(dirs)) + hh = pd.concat(hh, axis=0) hh = hh[~hh.index.duplicated(keep='first')] hh = hh.dropna(axis=1, how='all') @@ -302,6 +322,14 @@ def source(self): return Purpose.OTHER + @staticmethod + def parse(source, destination): + for sd in SourceDestinationGroup: + if sd.name.startswith(source.upper()) and sd.name.endswith(destination.upper()): + return sd + + return SourceDestinationGroup.UNKNOWN + @dataclass class Household: @@ -317,10 +345,10 @@ class Household: type: HouseholdType region_type: int location: str - zone: str = None + zone: str = pd.NA """ A detailed zone, which can be more accurate than location. """ - income: float = None - geom: object = None + income: float = pd.NA + geom: object = pd.NA @dataclass @@ -339,7 +367,10 @@ class Person: pt_abo_avail: Availability mobile_on_day: bool present_on_day: bool + reporting_day: int + + """ Reporting day of the week between 1-7. 1 is Monday. """ n_trips: int @@ -351,7 +382,10 @@ class Trip: p_id: str hh_id: str n: int + day_of_week: int + """ Day of the week between 1-7. 1 is Monday. """ + departure: int duration: int gis_length: float diff --git a/matsim/scenariogen/data/formats/__init__.py b/matsim/scenariogen/data/formats/__init__.py index 9d826bc..3cd7c12 100644 --- a/matsim/scenariogen/data/formats/__init__.py +++ b/matsim/scenariogen/data/formats/__init__.py @@ -1,3 +1,3 @@ # -*- coding: utf-8 -*- -__all__ = ["srv", "mid"] \ No newline at end of file +__all__ = ["srv", "mid", "jp_milt"] \ No newline at end of file diff --git a/matsim/scenariogen/data/formats/jp_milt.py b/matsim/scenariogen/data/formats/jp_milt.py new file mode 100644 index 0000000..0803843 --- /dev/null +++ b/matsim/scenariogen/data/formats/jp_milt.py @@ -0,0 +1,264 @@ +# -*- coding: utf-8 -*- +import numpy as np +import os +from datetime import datetime + +import pandas as pd + +from .. import * + +# Persons and trips +INPUT_FILES = 2 + + +# This data format is for the person survey in Japan +# conducted by the Ministry of Land, Infrastructure, Transport and Tourism +# See https://www.mlit.go.jp/toshi/tosiko/toshi_tosiko_tk_000031.html + +def is_format(f: os.DirEntry): + fp = f.name + if not f.path.endswith(".csv"): + return False + + return _check_header(f.path, ["cities_code", "household_person_no"]) + + +def read_raw(person_file, trip_file): + """ Read the input files into format used by conversion """ + + p = pd.read_csv(person_file) + t = pd.read_csv(trip_file) + + return p, t + + +def convert(data: tuple, regio=None): + """ Convert data to standardized survey format """ + + (pp, tt) = data + + # Magnication factor is recaled to a mean of 1 + w_mean = pp["expansion factor2"].mean() + weights = {} + weekdays = {} + + ps = [] + hhs = {} + for p in pp.itertuples(): + + person_number = str(int(p.household_person_no)) + household_number = person_number[:-2] + + # _32 is expansion factor2, the name can not be used directly + weights[person_number] = round(p._32 / w_mean, 2) + weekdays[person_number] = Milt2010.weekday(p.survey_month, p.survey_day) + + n_trips = p.trip_end_no + + ps.append( + Person( + str(p.household_person_no), + weights[person_number], + household_number, + # Age is a group and given as lower bound + int(p.age5) + 2, + Milt2010.gender(p.sex), + pd.NA, + Milt2010.restricted_mobility(p.out_difficult_div), + Availability.YES if p.car_license == 1 else Availability.NO, + Availability.YES if p.car_cnt > 0 else Availability.NO, + Availability.YES if p.bicycle_cnt > 0 else Availability.NO, + Availability.UNKNOWN, + n_trips > 0, + True, + weekdays[person_number], + n_trips, + ) + ) + + if household_number not in hhs: + hhs[household_number] = Household( + household_number, + weights[person_number], + 1, + p.car_cnt, + p.bicycle_cnt, + 0, + ParkingPosition.NA, + pd.NA, + pd.NA, + 0, + str(p.cities_code) + ) + + + # Save the previous trip + prev_purpose = None + prev = None + + ts = [] + for t in tt.itertuples(): + # Trucate last two digits + person_number = str(int(t.household_person_no)) + household_number = person_number[:-2] + + purpose = Milt2010.trip_purpose(t.s_activity_type, prev_purpose) + + # Reset previous purpose if person changes + if prev != None and prev.household_person_no != t.household_person_no: + prev_purpose = None + + # Store context of person going to schhool or work + if purpose == Purpose.WORK: + prev_purpose = Purpose.WORK + elif purpose == Purpose.EDU: + prev_purpose = Purpose.EDU + + ts.append( + Trip( + person_number + "_" + str(int(t.trip_no)), + weights[person_number], + person_number, + household_number, + int(t.trip_no), + # Weekday is not present in the trip, only for person + weekdays.get(person_number, pd.NA), + t.dep_hour * 60 + t.dep_minute, + t.arr_hour * 60 + t.arr_minute - (t.dep_hour * 60 + t.dep_minute), + # Only approximate distance between zones, in km + # Original entry are in 100m + float(t.move_dist) * 0.1, + Milt2010.trip_mode(t.original_transport, t.driver), + purpose, + Milt2010.sd_group(purpose, prev_purpose, t.dep_pre_code, t.arr_pre_code), + # Trip is valid if length and duration are present + t.move_dist > 0 and not np.isnan(t.dep_hour) and not np.isnan(t.arr_hour), + from_location=str(t.dep_cities_code), + from_zone=str(t.dep_zone_code), + to_location=str(t.arr_cities_code), + to_zone=str(t.arr_zone_code), + ) + ) + + prev = t + + # Uses the dictionaries directly in order to avoid a copy of the dataclasses + hhs = pd.DataFrame(h.__dict__ for h in hhs.values()).set_index("hh_id") + ps = pd.DataFrame(p.__dict__ for p in ps).set_index("p_id") + + return hhs, ps, pd.DataFrame(t.__dict__ for t in ts).set_index("t_id") + + +class Milt2010: + """ Parsing functions for the MILT 2010 data format """ + + @staticmethod + def weekday(month, day): + if np.isnan(month) or np.isnan(day): + return pd.NA + + weekday = datetime(2010, int(month), int(day)).weekday() + return weekday + 1 + + @staticmethod + def gender(x): + if x == 1: + return Gender.M + elif x == 2: + return Gender.F + + return Gender.OTHER + + @staticmethod + def restricted_mobility(div): + if div == 2: + return True + + return False + + @staticmethod + def trip_mode(x, driver): + if x in (81, 82, 71, 72, 73): + return TripMode.PT + elif x in (61, 62, 63, 64, 65): + if driver == 1: + return TripMode.CAR + else: + return TripMode.RIDE + elif x in (41, 51, 55): + return TripMode.MOTORCYCLE + elif x in (31, 32): + return TripMode.BIKE + elif x in (10, 15, 21, 22): + return TripMode.WALK + + return TripMode.OTHER + + @staticmethod + def trip_purpose(x, prev_purpose): + if x == 1: + return Purpose.WORK + elif x == 2: + return Purpose.EDU + elif x == 3: + return Purpose.HOME + elif x == 4: + return Purpose.SHOPPING + elif x == 5: + return Purpose.LEISURE + elif x == 6: + return Purpose.PERSONAL_BUSINESS + elif x == 7 or x == 8 or x == 9 or x == 47 or x == 11: + return Purpose.WORK_BUSINESS + elif x == 12: + # return trip to work or school + # need to know which one it is + if prev_purpose == None: + return Purpose.WORK + + return prev_purpose + + return Purpose.OTHER + + @staticmethod + def sd_group(purpose, prev_purpose, dep_precode, arr_precode): + + # The precode does not allow a distinction between work and school + # There fore the purpose and prev purpose need to be considered + + # Precode: 1 home, 2 work or school, 3 other + source = "OTHER" + destination = "OTHER" + + if dep_precode == 1: + source = "HOME" + elif dep_precode == 2: + if prev_purpose == Purpose.EDU: + source = "EDU" + else: + # Assume here that work is the default if no previous edu trip is known + source = "WORK" + elif dep_precode == 3: + if prev_purpose == Purpose.LEISURE: + source = "LEISURE" + elif prev_purpose == Purpose.SHOPPING: + source = "SHOP" + else: + source = "OTHER" + + if arr_precode == 1: + destination = "HOME" + elif arr_precode == 2: + if purpose == Purpose.EDU: + destination = "EDU" + else: + destination = "WORK" + elif arr_precode == 3: + if purpose == Purpose.LEISURE: + destination = "LEISURE" + elif purpose == Purpose.SHOPPING: + destination = "SHOP" + else: + destination = "OTHER" + + return SourceDestinationGroup.parse(source, destination) diff --git a/matsim/scenariogen/data/preparation.py b/matsim/scenariogen/data/preparation.py index c134b05..bb147fe 100644 --- a/matsim/scenariogen/data/preparation.py +++ b/matsim/scenariogen/data/preparation.py @@ -10,9 +10,10 @@ from . import * - def prepare_persons(hh, pp, tt, augment=5, max_hh_size=5, core_weekday=False, remove_with_invalid_trips=False): - """ Cleans common data errors and fill missing values """ + """ Cleans common data errors and fill missing values. This method is very specific. + Functionality is now split up into smaller functions, which is more reusable. + """ df = pp.join(hh, on="hh_id", lsuffix="hh_") # Replace unknown income group @@ -70,6 +71,23 @@ def prepare_persons(hh, pp, tt, augment=5, max_hh_size=5, core_weekday=False, re return df +def join_person_with_household(persons, households): + """ Join persons withhold table so that household attributes will be available for each person """ + return persons.join(households, on="hh_id", lsuffix="hh_") + + +def remove_persons_with_invalid_trips(persons, trips): + """ Remove persons that have invalid trips """ + + df = persons.reset_index() + tt = trips.reset_index() + + invalid = set(tt[~tt.valid].p_id) + df = df[~df.p_id.isin(invalid)] + mobile = set(tt[tt.valid].p_id) + + # Filter persons that are supposed to be mobile but have no trips + return df[(df.p_id.isin(mobile) | (~df.mobile_on_day))] def bins_to_labels(bins): """ Convert bins to labels """ @@ -239,8 +257,11 @@ def fill(df, col, val=None, random_state=0): def create_activities(all_persons: pd.DataFrame, tt: pd.DataFrame, core_weekday=True, cut_groups=True, - include_person_context=True): - """ Create activity representation from trip table """ + include_person_context=True, use_index_as_pid=True, cores=1): + """ Create activity representation from trip table + + :param use_index_as_pid, if True, the p_id will be the index of the persons table. + """ tt = tt.reset_index().set_index("p_id") @@ -275,16 +296,18 @@ def convert(persons): if (trips.day_of_week > 4).any(): continue + use_id = p.Index if use_index_as_pid else p_id + # id generator def a_id(t_i): - return "%s_%d" % (p.Index, t_i) + return "%s_%d" % (use_id, t_i) if len(trips) == 0: - acts.append(Activity(a_id(0), p.p_weight, p.Index, 0, Purpose.HOME, 1440, 0, 0, TripMode.OTHER)) + acts.append(Activity(a_id(0), p.p_weight, use_id, 0, Purpose.HOME, 1440, 0, 0, TripMode.OTHER)) else: trip_0 = trips.iloc[0] - acts.append(Activity(a_id(0), p.p_weight, p.Index, 0, trip_0.sd_group.source(), trip_0.departure, 0, 0, - TripMode.OTHER, trip_0.from_location, trip_0.from_zone)) + acts.append(Activity(a_id(0), p.p_weight, use_id, 0, trip_0.sd_group.source(), trip_0.departure, 0, 0, + TripMode.OTHER, trip_0.from_location, trip_0.from_zone)) for i in range(len(trips) - 1): t0 = trips.iloc[i] @@ -295,7 +318,7 @@ def a_id(t_i): if duration < 0 or t0.gis_length < 0: valid = False - acts.append(Activity(a_id(i + 1), p.p_weight, p.Index, i + 1, t0.purpose,duration, t0.gis_length, + acts.append(Activity(a_id(i + 1), p.p_weight, use_id, i + 1, t0.purpose, duration, t0.gis_length, t0.duration, t0.main_mode, t0.to_location, t0.to_zone)) if len(trips) > 1: @@ -307,7 +330,7 @@ def a_id(t_i): valid = False # Duration is set to rest of day - acts.append(Activity(a_id(i + 1), p.p_weight, p.Index, i + 1, tl.purpose, 1440, tl.gis_length, tl.duration, tl.main_mode, + acts.append(Activity(a_id(i + 1), p.p_weight, use_id, i + 1, tl.purpose, 1440, tl.gis_length, tl.duration, tl.main_mode, tl.to_location, tl.to_zone)) if valid: @@ -315,9 +338,14 @@ def a_id(t_i): return res - with mp.Pool(8) as pool: - docs = pool.map(convert, np.array_split(all_persons, 16)) - result = functools.reduce(lambda a, b: a + b, docs) + all_persons = all_persons.head(n=1000) + + if cores == 1: + result = convert(all_persons) + else: + with mp.Pool(cores) as pool: + docs = pool.map(convert, np.array_split(all_persons, cores)) + result = functools.reduce(lambda a, b: a + b, docs) activities = pd.DataFrame(result).set_index("a_id") # Reverse columns because it will be reversed again at the end diff --git a/matsim/scenariogen/data/run_create_ref_data.py b/matsim/scenariogen/data/run_create_ref_data.py index fdc325c..058e836 100644 --- a/matsim/scenariogen/data/run_create_ref_data.py +++ b/matsim/scenariogen/data/run_create_ref_data.py @@ -33,6 +33,11 @@ class AggregationResult: trips: pd.DataFrame share: pd.DataFrame + # These are the original read dataframes + all_hh: pd.DataFrame + all_persons: pd.DataFrame + all_trips: pd.DataFrame + groups: pd.DataFrame = None @@ -240,7 +245,8 @@ def create(survey_dirs, transform_persons, transform_trips, dist = dist[ref_groups + ["dist_group", "main_mode", "share"]] dist.to_csv(output_prefix + "mode_share_per_group_dist_ref.csv", index=False) - return AggregationResult(persons, trips, share.groupby("main_mode").sum(), groups=groups) + return AggregationResult(persons, trips, share.groupby("main_mode").sum(), + all_hh, all_persons, all_trips, groups=groups) def main(args):