import numpy as np
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from shapely.wkt import loads
import itertools
import os
from collections import Counter
import pickle
from sklearn.preprocessing import LabelEncoder, MinMaxScaler
from sklearn.feature_selection import SelectPercentile, chi2
import adjacency_features as af
import textual_features as tf
# import geometric_features as gf
# import matching as m
import osm_utilities as osm_ut
import writers as wrtrs
from config import config
feature_module_map = {
'classes_in_radius_bln': af,
'classes_in_radius_cnt': af,
'classes_in_street_and_radius_bln': af,
'classes_in_street_and_radius_cnt': af,
'classes_in_neighbors_bln': af,
'classes_in_neighbors_cnt': af,
'classes_in_street_radius_bln': af,
'classes_in_street_radius_cnt': af,
'similarity_per_class': tf,
'top_k_terms': tf,
'top_k_trigrams': tf,
'top_k_fourgrams': tf
}
features_getter_map = {
'classes_in_radius_bln': 'get_classes_in_radius_bln',
'classes_in_radius_cnt': 'get_classes_in_radius_cnt',
'classes_in_street_and_radius_bln': 'get_classes_in_street_and_radius_bln',
'classes_in_street_and_radius_cnt': 'get_classes_in_street_and_radius_cnt',
'classes_in_neighbors_bln': 'get_classes_in_neighbors_bln',
'classes_in_neighbors_cnt': 'get_classes_in_neighbors_cnt',
'classes_in_street_radius_bln': 'get_classes_in_street_radius_bln',
'classes_in_street_radius_cnt': 'get_classes_in_street_radius_cnt',
'similarity_per_class': 'get_similarity_per_class',
'top_k_terms': 'get_top_k_terms',
'top_k_trigrams': 'get_top_k_trigrams',
'top_k_fourgrams': 'get_top_k_fourgrams'
}
features_params_map = {
'classes_in_radius_bln': 'classes_in_radius_thr',
'classes_in_radius_cnt': 'classes_in_radius_thr',
'classes_in_street_and_radius_bln': 'classes_in_street_and_radius_thr',
'classes_in_street_and_radius_cnt': 'classes_in_street_and_radius_thr',
'classes_in_neighbors_bln': 'classes_in_neighbors_thr',
'classes_in_neighbors_cnt': 'classes_in_neighbors_thr',
'classes_in_street_radius_bln': 'classes_in_street_radius_thr',
'classes_in_street_radius_cnt': 'classes_in_street_radius_thr',
'top_k_terms': 'top_k_terms_pct',
'top_k_trigrams': 'top_k_trigrams_pct',
'top_k_fourgrams': 'top_k_fourgrams_pct'
}
features_getter_args_map = {
'classes_in_radius_bln': ('poi_gdf', 'poi_index_path', 'nlabels', 'label_map', 'param'),
'classes_in_radius_cnt': ('poi_gdf', 'poi_index_path', 'nlabels', 'label_map', 'param'),
'classes_in_street_and_radius_bln': ('poi_gdf', 'street_gdf', 'pois_by_street', 'nlabels', 'label_map', 'geometry_map', 'param'),
'classes_in_street_and_radius_cnt': ('poi_gdf', 'street_gdf', 'pois_by_street', 'nlabels', 'label_map', 'geometry_map', 'param'),
'classes_in_neighbors_bln': ('poi_gdf', 'poi_index_path', 'nlabels', 'label_map', 'param'),
'classes_in_neighbors_cnt': ('poi_gdf', 'poi_index_path', 'nlabels', 'label_map', 'param'),
'classes_in_street_radius_bln': ('poi_gdf', 'street_gdf', 'nlabels', 'label_map', 'geometry_map', 'param'),
'classes_in_street_radius_cnt': ('poi_gdf', 'street_gdf', 'nlabels', 'label_map', 'geometry_map', 'param'),
'similarity_per_class': ('poi_gdf', 'textual_index_path', 'nlabels'),
'top_k_terms': ('poi_gdf', 'names', 'param'),
'top_k_trigrams': ('poi_gdf', 'names', 'param'),
'top_k_fourgrams': ('poi_gdf', 'names', 'param')
}
[docs]def load_poi_gdf(poi_fpath):
"""
Loads pois in *poi_fpath* into a geopandas.GeoDataFrame and project their \
geometries.
Args:
poi_fpath (str): Path to file containing the pois
Returns:
geopandas.GeoDataFrame
"""
poi_df = pd.read_csv(poi_fpath)
poi_df['geometry'] = poi_df.apply(
lambda x: Point(x[config.lon_col], x[config.lat_col]), axis=1)
poi_gdf = gpd.GeoDataFrame(poi_df, geometry='geometry')
poi_gdf.crs = {'init': f'epsg:{config.poi_crs}'}
poi_gdf = poi_gdf.to_crs({'init': 'epsg:3857'})
poi_gdf['lon'] = poi_gdf.apply(lambda p: p.geometry.coords[0][0], axis=1)
poi_gdf['lat'] = poi_gdf.apply(lambda p: p.geometry.coords[0][1], axis=1)
return poi_gdf
[docs]def encode_labels(poi_gdf, encoder=None):
"""
Encodes target column to with integer values.
Args:
poi_gdf (geopandas.GeoDataFrame): The GeoDataFrame containing the \
column to be encoded
encoder (sklearn.preprocessing.LabelEncoder, optional): The label \
encoder to be utilized
Returns:
tuple:
geopandas.GeoDataFrame: The GeoDataFrame with the encoded column
sklearn.preprocessing.LabelEncoder: The label encoder utilized
"""
if encoder is None:
encoder = LabelEncoder()
poi_gdf['label'] = encoder.fit_transform(poi_gdf[config.label_col])
else:
poi_gdf = poi_gdf[poi_gdf[config.label_col].isin(encoder.classes_)].reset_index(drop=True)
poi_gdf['label'] = encoder.transform(poi_gdf[config.label_col])
return poi_gdf, encoder
[docs]def load_street_gdf(street_fpath):
"""
Loads streets in *street_fpath* into a geopandas.GeoDataFrame and project \
their geometries.
Args:
street_fpath (str): Path to file containing the streets
Returns:
geopandas.GeoDataFrame
"""
street_df = pd.read_csv(street_fpath)
street_df['geometry'] = street_df['geometry'].apply(lambda x: loads(x))
street_gdf = gpd.GeoDataFrame(street_df, geometry='geometry')
street_gdf.crs = {'init': f'epsg:{config.osm_crs}'}
street_gdf = street_gdf.to_crs({'init': 'epsg:3857'})
return street_gdf
# def load_poly_gdf(poly_fpath):
# poly_df = pd.read_csv(poly_fpath)
# poly_df['geometry'] = poly_df['geometry'].apply(lambda x: loads(x))
# poly_gdf = gpd.GeoDataFrame(poly_df, geometry='geometry')
# poly_gdf.crs = {'init': f'epsg:{config.osm_crs}'}
# poly_gdf = poly_gdf.to_crs({'init': 'epsg:3857'})
# return poly_gdf
[docs]def get_bbox_coords(poi_gdf):
"""
Returns a bounding box containing all *poi_gdf*'s pois.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains the pois
Returns:
tuple: The bounding box coords as (south, west, north, east)
"""
poi_gdf = poi_gdf.to_crs({'init': f'epsg:{config.osm_crs}'})
min_lon, min_lat, max_lon, max_lat = poi_gdf.geometry.total_bounds
return (min_lat, min_lon, max_lat, max_lon)
[docs]def get_required_external_files(poi_gdf, feature_sets_path):
"""
Checks if external files are required and if so, downloads them using the \
Overpass API.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains pois in order to define \
the area to query with Overpass API
feature_sets_path (str): Path to store the downloaded elements
Returns:
None
"""
if (
'classes_in_street_and_radius_bln' in config.included_adjacency_features or
'classes_in_street_and_radius_cnt' in config.included_adjacency_features or
'classes_in_street_radius_bln' in config.included_adjacency_features or
'classes_in_street_radius_cnt' in config.included_adjacency_features
):
osm_ut.download_osm_streets(get_bbox_coords(poi_gdf), feature_sets_path)
# if config.included_geometric_features:
# osm_ut.download_osm_polygons(get_bbox_coords(poi_gdf), feature_sets_path)
return
[docs]def ngrams(n, word):
"""
Generator of all *n*-grams of *word*.
Args:
n (int): The length of character ngrams to be extracted
word (str): The word of which the ngrams are to be extracted
Yields:
str: ngram
"""
for i in range(len(word)-n-1):
yield word[i:i+n]
[docs]def get_top_k(names, k, mode='term'):
"""
Extracts the top *k* % terms or ngrams of *names*, based on *mode*.
Args:
names (list): Contains the names to be considered
k (float): Percentage of top terms or ngrams to be considered
mode (str, optional): May be 'term', 'trigram' or 'fourgram'
Returns:
list: Contains the top k terms or ngrams
"""
if mode == 'trigram':
cnt = Counter(ngram for word in names for ngram in ngrams(3, word))
elif mode == 'fourgram':
cnt = Counter(ngram for word in names for ngram in ngrams(4, word))
else:
cnt = Counter(names)
return [t[0] for t in cnt.most_common(int(len(cnt) * k))]
[docs]def normalize_features(X, train_idxs, scaler=None):
"""
Normalize features to [0, 1].
Args:
X (numpy.ndarray): Features array to be normalized
train_idxs (numpy.ndarray): Contains the train indexes
scaler (sklearn.preprocessing.MinMaxScaler, optional): Scaler to be \
utilized
Returns:
tuple:
numpy.ndarray: The normalized features array
sklearn.preprocessing.MinMaxScaler: The scaler utilized
"""
if scaler is None:
scaler = MinMaxScaler()
X_ = scaler.fit_transform(X[train_idxs])
for idx, i in enumerate(train_idxs):
X[i] = X_[idx]
test_idxs = [r for r in range(len(X)) if r not in train_idxs]
if test_idxs:
X_ = scaler.transform(X[test_idxs])
for idx, i in enumerate(test_idxs):
X[i] = X_[idx]
else:
X = scaler.transform(X)
return X, scaler
[docs]def get_pois_by_street(poi_gdf, street_gdf):
"""
Matches each poi in *poi_gdf* to its nearest street.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains pois to be matched to \
a street
street_gdf (geopandas.GeoDataFrame): Contains streets to search among \
them for the nearest to each poi
Returns:
dict: Has streets ids as keys and a list containing the pois which \
belong to each street as values
"""
street_index = street_gdf.sindex
pois_by_street = dict((s, []) for s in range(len(street_gdf)))
for poi in poi_gdf.itertuples():
poi_coords = (poi.lon, poi.lat)
candidates = list(street_index.nearest(poi_coords))
nearest = candidates[np.argmin([
Point(poi_coords).distance(street_gdf.iloc[c]['geometry'])
for c in candidates
])]
pois_by_street[nearest].append(poi.Index)
return pois_by_street
[docs]def create_args_dict(poi_gdf, train_idxs, required_args, read_path, write_path):
"""
Initializes and prepares structures required during features extraction.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains the pois for which \
features will be created
train_idxs (numpy.ndarray): Contains the train indexes
required_args (set): Contains the names of the required args
read_path (str): Path to read from
write_path (str): Path to write to
Returns:
dict: Containing arguments names as keys and their corresponding \
structures as values
"""
args = {'poi_gdf': poi_gdf, 'nlabels': poi_gdf['label'].nunique()}
if 'label_map' in required_args:
args['label_map'] = poi_gdf.iloc[train_idxs]['label'].values.tolist()
if 'geometry_map' in required_args:
args['geometry_map'] = poi_gdf.iloc[train_idxs]['geometry'].values.tolist()
if 'poi_index_path' in required_args:
args['poi_index_path'] = write_path + '/poi_index.pkl'
af.create_poi_index(poi_gdf.iloc[train_idxs].reset_index(), args['poi_index_path'])
if 'street_gdf' in required_args:
street_csv_path = read_path + '/osm_streets.csv'
args['street_gdf'] = load_street_gdf(street_csv_path)
args['pois_by_street'] = get_pois_by_street(poi_gdf.iloc[train_idxs].reset_index(), args['street_gdf'])
if 'textual_index_path' in required_args:
args['textual_index_path'] = write_path + '/textual_index'
tf.create_textual_index(poi_gdf.iloc[train_idxs].reset_index(), args['textual_index_path'])
if 'names' in required_args:
args['names'] = ' '.join(list(poi_gdf.iloc[train_idxs][config.name_col])).split()
return args
[docs]def create_single_feature(f, args, train_idxs, norm, scaler):
"""
Creates the features array given a feature's name *f*.
Args:
f (str): Feature name to be created
args (dict): Containing the required arguments for feature *f*
train_idxs (numpy.ndarray): Contains the train indexes
norm (boolean): Indicating whether the feature should be normalized \
or not
scaler (sklearn.preprocessing.MinMaxScaler): The scaler to be utilized
Returns:
tuple:
numpy.ndarray: The features array of feature *f*
sklearn.preprocessing.MinMaxScaler: The scaler utilized
"""
X = getattr(feature_module_map[f], features_getter_map[f])(
*[args[arg] for arg in features_getter_args_map[f]])
if scaler is not None:
return normalize_features(X, None, scaler)
elif norm is True:
return normalize_features(X, train_idxs)
else:
return X, None
[docs]def create_single_features(poi_gdf, train_idxs, fold_path):
"""
Creates all the included features arrays and saves them in *fold_path*.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains the pois for which the \
features will be created
train_idxs (numpy.ndarray): Contains the train indexes
fold_path (str): Path to save features arrays
Returns:
None
"""
os.makedirs(fold_path + '/tmp')
included_features = config.included_adjacency_features + config.included_textual_features
required_args = set([arg for f in included_features for arg in features_getter_args_map[f]])
args = create_args_dict(poi_gdf, train_idxs, required_args, os.path.dirname(fold_path), fold_path)
for f in included_features:
norm = True if f in config.normalized_features else False
if f not in features_params_map:
X, _ = create_single_feature(f, args, train_idxs, norm, None)
np.save(fold_path + f'/tmp/{f}.npy', X)
else:
for p in getattr(config, features_params_map[f]):
args['param'] = p
X, _ = create_single_feature(f, args, train_idxs, norm, None)
np.save(fold_path + f'/tmp/{f}_{p}.npy', X)
return
[docs]def create_concatenated_features(poi_gdf, train_idxs, test_idxs, fold_path):
"""
Loads a list of included features arrays in order to concatenate them \
into the final X_train and X_test arrays. Then saves these arrays as well \
as the corresponding y_train and y_test arrays. Finally, writes the \
included features configuration into a file.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains the pois for which the \
features will be created
train_idxs (numpy.ndarray): Contains the train indexes
test_idxs (numpy.ndarray): Contains the test indexes
fold_path (str): Path to save features arrays
Returns:
None
"""
included_features = config.included_adjacency_features + config.included_textual_features
params_names = list(set([features_params_map[f] for f in included_features if f in features_params_map]))
params_vals = [getattr(config, param) for param in params_names]
y = poi_gdf['label']
for idx, params in enumerate(itertools.product(*params_vals)):
features_params = dict(zip(params_names, params))
Xs = []
for f in included_features:
if f in features_params_map:
p = features_params[features_params_map[f]]
Xs.append(np.load(fold_path + f'/tmp/{f}_{p}.npy'))
else:
Xs.append(np.load(fold_path + f'/tmp/{f}.npy'))
X = np.hstack(Xs)
# X = SelectPercentile(chi2, percentile=75).fit_transform(X, y)
X_train, X_test = X[train_idxs], X[test_idxs]
np.save(fold_path + f'/X_train_{idx}.npy', X_train)
np.save(fold_path + f'/X_test_{idx}.npy', X_test)
y_train, y_test = y[train_idxs], y[test_idxs]
np.save(fold_path + '/y_train.npy', y_train)
np.save(fold_path + '/y_test.npy', y_test)
path = os.path.dirname(fold_path)
wrtrs.write_feature_params_info(path + '/params_per_feature_set.csv', params_names, params_vals)
return
[docs]def create_finetuned_features(poi_gdf, features_info, best_feature_params, features_path, results_path):
"""
Creates and saves the X_train features array for the model_training step.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains the pois for which the \
features will be created
features_info (list): Containing the features (and whether they \
should be normalized or not) to be extracted
best_feature_params (dict): Containing the best found features \
parameters values
features_path (str): Path in order to read required external files \
(like osm streets file)
results_path (str): Path to write to
Returns:
numpy.ndarray: The features array for model_training step
"""
included_features = [f[0] for f in features_info]
required_args = set([arg for f in included_features for arg in features_getter_args_map[f]])
args = create_args_dict(poi_gdf, np.arange(len(poi_gdf)), required_args, features_path, results_path + '/pickled_objects')
Xs = []
for f in features_info:
feat, norm = f[0], f[1]
if feat in features_params_map:
args['param'] = best_feature_params[features_params_map[feat]]
X, scaler = create_single_feature(feat, args, np.arange(len(poi_gdf)), norm, None)
if norm is True:
pickle.dump(scaler, open(results_path + '/pickled_objects' + f'/{feat}_scaler.pkl', 'wb'))
Xs.append(X)
X = np.hstack(Xs)
np.save(results_path + '/X_train.npy', X)
return X
[docs]def create_test_args_dict(test_poi_gdf, required_args, read_path1, read_path2):
"""
Instantiate and prepare structures required during features extraction in \
model_deployment step.
Args:
test_poi_gdf (geopandas.GeoDataFrame): Contains the pois for which \
features will be created
required_args (set): Contains the names of the required args
read_path1 (str): Path to features_extraction step results
read_path2 (str): Path to model_training step results
Returns:
dict: Containing arguments names as keys and their corresponding \
structures as values
"""
train_poi_gdf = load_poi_gdf(read_path1 + '/train_poi_gdf.csv')
encoder = pickle.load(open(read_path1 + '/encoder.pkl', 'rb'))
train_poi_gdf, _ = encode_labels(train_poi_gdf, encoder)
args = {'poi_gdf': test_poi_gdf, 'nlabels': train_poi_gdf['label'].nunique()}
if 'label_map' in required_args:
args['label_map'] = train_poi_gdf['label'].values.tolist()
if 'geometry_map' in required_args:
args['geometry_map'] = train_poi_gdf['geometry'].values.tolist()
if 'poi_index_path' in required_args:
args['poi_index_path'] = read_path2 + '/poi_index.pkl'
if 'street_gdf' in required_args:
street_csv_path = read_path1 + '/osm_streets.csv'
args['street_gdf'] = load_street_gdf(street_csv_path)
args['pois_by_street'] = get_pois_by_street(train_poi_gdf, args['street_gdf'])
if 'textual_index_path' in required_args:
args['textual_index_path'] = read_path2 + '/textual_index'
if 'names' in required_args:
args['names'] = ' '.join(list(train_poi_gdf[config.name_col])).split()
return args
[docs]def create_test_features(poi_gdf, features, features_path, model_training_path, results_path):
"""
Creates and saves the X_test features array for the model_deployment step.
Args:
poi_gdf (geopandas.GeoDataFrame): Contains the pois for which the \
features will be created
features (list): Containing the features (as well as their best found \
configuration) to be extracted
features_path (str): Path to features_extraction step results
model_training_path (str): Path to model_training step results
results_path (str): Path to write to
Returns:
numpy.ndarray: The features array for model_deployment step
"""
included_features = [f[0] for f in features]
required_args = set([arg for f in included_features for arg in features_getter_args_map[f]])
args = create_test_args_dict(poi_gdf, required_args, features_path, model_training_path + '/pickled_objects')
Xs = []
for f in features:
feat, _, param_value, norm = f[0], f[1], f[2], f[3]
if feat in features_params_map:
args['param'] = int(param_value) if feature_module_map[feat] == af else float(param_value)
if norm is True:
scaler = pickle.load(open(model_training_path + '/pickled_objects' + f'/{feat}_scaler.pkl', 'rb'))
X, _ = create_single_feature(feat, args, None, norm, scaler)
else:
X, _ = create_single_feature(feat, args, None, norm, None)
Xs.append(X)
X = np.hstack(Xs)
np.save(results_path + '/X_test.npy', X)
return X