import pathlib
import cytoolz as toolz
import keras
import numpy as np
import scipy.stats
import slider as sl
from .scaler import Scaler
from .utils import dichotomize, summary, rolling_window
[docs]class Prediction:
"""The model's predicted values.
Attributes
----------
predicted_aim_error : np.ndarray[float64]
The predicted aim errors for each object.
predicted_aim_distribution : scipy.stats.lognorm
The predicted distribution of aim errors.
predicted_accuracy_error : np.ndarray[float64]
The predicted accuracy errors for each object.
predicted_accuracy_distribution : scipy.stats.lognorm
The predicted distribution of accuracy errors.
accuracy_mean : float
The mean predicted accuracy.
accuracy_std : float
The standard deviation of the predicted accuracy.
pp_mean : float
The mean predicted performance points.
pp_std : float
The standard deviation of the predicted performance points.
"""
def __init__(self,
*,
predicted_aim_error,
predicted_aim_distribution,
predicted_accuracy_error,
predicted_accuracy_distribution,
accuracy_mean,
accuracy_std,
pp_mean,
pp_std):
self.predicted_aim_error = predicted_aim_error
self.predicted_aim_distribution = predicted_aim_distribution
self.predicted_accuracy_error = predicted_accuracy_error
self.predicted_accuracy_distribution = predicted_accuracy_distribution
self.accuracy_mean = accuracy_mean
self.accuracy_std = accuracy_std
self.pp_mean = pp_mean
self.pp_std = pp_std
class _InnerErrorModel:
"""A model for osu! which trains on windows of time events and attempts to
predict the user's aim and accuracy error for each circle.
Parameters
----------
hidden : bool
Is this the hidden model?
aim_pessimism_factor : float
An exponential increase in aim error to account for the fact that most
replays are heavily biased towards a user's best replays.
accuracy_pessimism_factor : float
An exponential increase in accuracy error to account for the fact that
most replays are heavily biased towards a user's best replays.
trailing_context : int
The number of leading trailing hit objects or slider ticks to look at.
forward_context : int
The number of forward hit objects or slider ticks to look at.
hidden_layer_sizes : tuple[int, int]
The sizes of the hidden layers.
dropout : float
The droput ratio.
activation : str
The activation function. Must be one of:
'tanh', 'softplus, 'softsign', 'relu', 'sigmoid', 'hard_sigmoid', or
'linear'.
loss : {'mse', 'mae', 'mape', 'male', 'cosine'
The loss function.
optimizer : str
The optimizer to use.
max_hit_objects : int
The maximum number of hit objects allows in a beatmap. Data will be
padded to fit this shape.
Notes
-----
The inner model does not know about the difference between hidden and
non-hidden plays. :class:`~slider.model.alpha.AlphaModel` internally
holds two ``_InnerAlphaModel``\s which are fitted to the hidden and
non-hidden plays respectively.
"""
features = {
k: n for n, k in enumerate(
(
# absolute position
'absolute_x',
'absolute_y',
'absolute_time',
# relative position
'relative_x',
'relative_y',
'relative_time',
# misc.
'is_slider_tick', # 1 if slider tick else 0
'approach_rate', # the map's approach rate
# distances (magnitude of vector between two hit objects)
'distance_from_previous', # distance from the previous object
'distance_to_next', # distance to the next object
# angles
'pitch',
'roll',
'yaw',
),
)
}
_absolule_features_columns = np.s_[0:3]
_relative_features_columns = np.s_[3:6]
_angle_features_columns = np.s_[10:13]
def __init__(self,
hidden,
*,
aim_pessimism_factor=1.1,
accuracy_pessimism_factor=1.1,
trailing_context=10,
forward_context=3,
batch_size=32,
lstm_hidden_layer_sizes=(256, 128, 64),
dropout=0.1,
activation='linear',
loss='mae',
optimizer='rmsprop'):
self.hidden = hidden
self._aim_pessimism_factor = aim_pessimism_factor
self._accuracy_pessimism_factor = accuracy_pessimism_factor
self._trailing_context = trailing_context
self._forward_context = forward_context
self._window = window = trailing_context + forward_context + 1
self._batch_size = batch_size
if len(lstm_hidden_layer_sizes) == 0:
raise ValueError('there must be at least one lstm hidden layer')
input_ = keras.layers.Input(shape=(window, len(self.features)))
lstm = keras.layers.LSTM(
lstm_hidden_layer_sizes[0],
dropout=dropout,
return_sequences=True,
)(input_)
for size in lstm_hidden_layer_sizes[1:-1]:
lstm = keras.layers.LSTM(
size,
dropout=dropout,
return_sequences=True,
)(lstm)
lstm = keras.layers.LSTM(
lstm_hidden_layer_sizes[-1],
dropout=dropout,
)(lstm)
aim_error = keras.layers.Dense(
1,
activation=activation,
name='aim_error',
)(lstm)
accuracy_error = keras.layers.Dense(
1,
activation=activation,
name='accuracy_error',
)(lstm)
self._model = model = keras.models.Model(
inputs=input_,
outputs=[aim_error, accuracy_error],
)
model.compile(
loss=loss,
optimizer=optimizer,
)
self._feature_scaler = Scaler(ndim=3)
_angle_axes_map = {
'yaw': (0, 1),
'roll': (0, 2),
'pitch': (1, 2),
}
_raw_features = [
features['absolute_x'],
features['absolute_y'],
features['absolute_time'],
features['is_slider_tick'],
features['approach_rate'],
]
def _extract_features(self,
beatmap,
*,
double_time=False,
half_time=False,
hard_rock=False):
"""Extract the feature array from a beatmap.
Parameters
----------
beatmap : sl.Beatmap
The beatmap to extract features for.
double_time : bool, optional
Extract features for double time?
half_time : bool, optional
Extract features for half time?
hard_rock : bool, optional
Extract features for hard rock?
Returns
-------
windows : np.ndarray[float]
An array of observations.
mask : np.ndarray[bool]
A mask of valid hit objects.
"""
hit_objects = beatmap.hit_objects_no_spinners
approach_rate = sl.mod.ar_to_ms(
beatmap.ar(
double_time=double_time,
half_time=half_time,
hard_rock=hard_rock,
),
)
# events holds an (x, y, time) tuple for every hit object and slider
# tick; slider ticks are in ms
events = []
append_event = events.append
extend_events = events.extend
# hit_object_ixs holds the indices into ``events`` where hit objects
# appear
hit_object_ixs = []
append_hit_object_ixs = hit_object_ixs.append
for hit_object in hit_objects:
# mark this index in events as the location of a hit object.
append_hit_object_ixs(len(events))
if double_time:
hit_object = hit_object.double_time
elif half_time:
hit_object = hit_object.half_time
if hard_rock:
hit_object = hit_object.hard_rock
position = hit_object.position
append_event((
position.x,
position.y,
hit_object.time.total_seconds() * 1000,
0, # is_slider_tick
approach_rate,
))
if isinstance(hit_object, sl.beatmap.Slider):
# add all the slider ticks
extend_events(
(
x,
y,
time.total_seconds() * 1000,
1, # is_slider_tick
approach_rate,
)
for x, y, time in hit_object.tick_points
)
# allocate the empty output array
out = np.empty((len(events), len(self.features)))
# fill the output with the directly extracted features
out[:, self._raw_features] = events
out[:, self._relative_features_columns] = (
out[:, self._absolule_features_columns]
)
# the baseline data to take windows out of
baseline = np.vstack([
np.full((self._trailing_context, len(self.features)), np.nan),
out,
np.full((self._forward_context, len(self.features)), np.nan),
])
# pull out the x, y, time coordinates
coords = baseline[:, self._absolule_features_columns]
# draw triangles around each object with the leading and trailing hit
# object
triangles = rolling_window(coords, 3)
# the squared value of the length of each side in the triangles around
# each object across each axis
diff_a_b_sq = np.square(triangles[:, 0] - triangles[:, 1])
diff_a_c_sq = np.square(triangles[:, 0] - triangles[:, 2])
diff_b_c_sq = np.square(triangles[:, 1] - triangles[:, 2])
# for each hit object and axis (x, y, time), calculate the angle
# between the previous hit object and the next hit object
for angle_kind, (axis_0, axis_1) in self._angle_axes_map.items():
a_b_sq = diff_a_b_sq[:, axis_0] + diff_a_b_sq[:, axis_1]
b_c_sq = diff_b_c_sq[:, axis_0] + diff_b_c_sq[:, axis_1]
numerator = (
a_b_sq +
b_c_sq -
(diff_a_c_sq[:, axis_0] + diff_a_c_sq[:, axis_1])
)
denominator = (2 * np.sqrt(a_b_sq) * np.sqrt(b_c_sq))
mask = np.isclose(denominator, 0)
numerator[mask] = 1
denominator[mask] = 1
np.arccos(
# clip the values because we sometimes get things like
# 1.0000000000000002 which breaks ``np.arccos``
np.clip(numerator / denominator, -1, 1),
out=baseline[1:-1, self.features[angle_kind]],
)
# compute the distance from hit object to hit object in 3d space
# and store the distance from the previous and the distance to the
# next
distances = np.sqrt(np.square(coords[1:] - coords[:-1]).sum(axis=1))
baseline[:-1, self.features['distance_from_previous']] = distances
baseline[1:, self.features['distance_to_next']] = distances
# convert the hit object indices into a column vector; we add context
# to account for the padding
hit_object_ixs_array = (
np.array(hit_object_ixs) + self._trailing_context
)[:, np.newaxis]
# get an array of offsets from some base ``ix`` which correspond to a
# fully populated window; this is a row vector
context_ix_offsets = np.arange(
-self._trailing_context,
self._forward_context + 1,
)
# broadcast the hit_object_ixs together with the context_ix_offsets
# to get an indexer which produces a 3d array which is a sequence of
# windows of events
window_ixs = hit_object_ixs_array + context_ix_offsets
# a sequence of windows of events where the time is still absolute
windows = baseline[window_ixs]
# slice out the window center's 'relative' values which are currently
# absolute values
center_values = windows[
:,
self._trailing_context,
np.newaxis,
self._relative_features_columns,
]
# subtract the hit object features from the windows; this makes the
# window relative to the object being predicted.
windows[..., self._relative_features_columns] -= center_values
# only accept complete windows
mask = ~np.isnan(windows).any(axis=(1, 2))
# remove the partial windows
windows = windows[mask]
return windows, mask
def _extract_differences(self, replay):
"""Extract the time and position differences for each hit object.
Parameters
----------
replay : Replay
The replay to get differences for.
Returns
-------
differences : np.ndarray
An array of shape (len(hit_objects), 2) where the first column
is the time offset in milliseconds and the second column is the
magnitude of (x, y) error in osu! pixels.
"""
# get the x, y, and time of each click
if replay.double_time:
time_coefficient = 1000 * 2 / 3
elif replay.half_time:
time_coefficient = 1000 * 4 / 3
else:
time_coefficient = 1000
clicks = np.array([
(
second.position.x,
second.position.y,
time_coefficient * second.offset.total_seconds(),
)
for first, second in toolz.sliding_window(2, replay.actions)
if ((second.key1 and not first.key1 or
second.key2 and not first.key2) and
# there are weird stray clicks at (0, 0)
second.position != (0, 0))
])
double_time = replay.double_time
half_time = replay.half_time
hard_rock = replay.hard_rock
# accumulate the (x, y, start time) of each hit object
hit_object_coords = []
append_coord = hit_object_coords.append
for hit_object in replay.beatmap.hit_objects_no_spinners:
if double_time:
hit_object = hit_object.double_time
elif half_time:
hit_object = hit_object.half_time
if hard_rock:
hit_object = hit_object.hard_rock
position = hit_object.position
append_coord((
position.x,
position.y,
hit_object.time.total_seconds() * 1000,
))
# convert the hit object coordinates into an array
hit_object_coords = np.array(hit_object_coords)
# get the time of each hit object as a row vector
hit_object_times = hit_object_coords[:, 2]
# get the time of each click as a column vector
click_times = clicks[:, [2]]
# find the nearest click by taking the absolute difference from
# every hit object to every click (whose shape is:
# (len(clicks), len(hit_objects))) and reducing with agrgmin to
# get the index of the best match for each hit object
nearest_click_ix = np.abs(
hit_object_times - click_times,
).argmin(axis=0)
# get the x, y, time of the matched clicks
matched_clicks = clicks[nearest_click_ix]
# get the squared distance for the x and y axes
squared_distance = (
hit_object_coords[:, :2] - matched_clicks[:, :2]
) ** 2
aim_error = np.sqrt(squared_distance[:, 0] + squared_distance[:, 1])
# clip the aim error to within 2 * circle radius; things farther
# than this were probably just us skipping the circle entirely
np.clip(
aim_error,
0,
2 * sl.mod.circle_radius(replay.beatmap.cs(hard_rock=hard_rock)),
out=aim_error,
)
accuracy_error = np.abs(hit_object_times - matched_clicks[:, 2])
# clip the accuracy error to within 1.5 * 50 window; things farther
# than this were probably just us skipping the circle entirely
np.clip(
accuracy_error,
0,
1.5 * sl.mod.od_to_ms(replay.beatmap.od(
hard_rock=hard_rock,
double_time=double_time,
half_time=half_time,
)).hit_50,
out=accuracy_error,
)
return aim_error, accuracy_error
def _sample_weights(self, aim_error, accuracy_error):
"""Sample weights based on the error.
Parameters
----------
aim_error : np.ndarray
The aim errors for each sample.
accuracy_error : np.ndarray
The accuracy error errors for each sample.
Returns
-------
weights : np.ndarray
The weights for each sample.
Notes
-----
This weighs samples based on their standard deviations above the mean
with some clipping.
"""
aim_zscore = (aim_error - aim_error.mean()) / aim_error.std()
aim_weight = np.clip(aim_zscore, 1, 4) / 4
accuracy_zscore = (
accuracy_error - accuracy_error.mean()
) / accuracy_error.std()
accuracy_weight = np.clip(accuracy_zscore, 1, 4) / 4
return {
'aim_error': aim_weight,
'accuracy_error': accuracy_weight,
}
def fit(self, replays, *, verbose=False, epochs=10):
extract_features = self._extract_features
extract_differences = self._extract_differences
model = self._model
features = []
append_features = features.append
aim_error = []
append_aim_error = aim_error.append
accuracy_error = []
append_accuracy_error = accuracy_error.append
for n, replay in enumerate(replays):
if verbose:
print(f'{n:4}: {replay!r}')
windows, mask = extract_features(
replay.beatmap,
double_time=replay.double_time,
half_time=replay.half_time,
hard_rock=replay.hard_rock,
)
append_features(windows)
aim, accuracy = extract_differences(replay)
append_aim_error(aim[mask])
append_accuracy_error(accuracy[mask])
pre_scaled_features = np.concatenate(features)
features = self._feature_scaler.fit(pre_scaled_features)
aim_error = np.concatenate(aim_error)
accuracy_error = np.concatenate(accuracy_error)
if verbose:
# print some useful summary statistics; this helps quickly
# identify data errors.
print(summary(
self.features,
pre_scaled_features,
aim_error=aim_error,
accuracy_error=accuracy_error,
))
return model.fit(
features,
{'aim_error': aim_error, 'accuracy_error': accuracy_error},
verbose=int(bool(verbose)),
batch_size=self._batch_size,
epochs=epochs,
sample_weight=self._sample_weights(aim_error, accuracy_error),
)
@staticmethod
def _fit_lognorm_distribution(array):
"""Fit a probability distribution to the array.
Parameters
----------
array : np.ndarray
The data to fit.
Returns
-------
distribution : scipy.stats.lognorm
A frozen distribution instance.
"""
return scipy.stats.lognorm(*scipy.stats.lognorm.fit(array))
def _predict_raw_error(self,
beatmap,
*,
pessimistic=True,
double_time=False,
half_time=False,
hard_rock=False):
"""Predict the time and position differences for each circle.
Parameters
----------
beatmap : sl.Beatmap
The beatmap to predict.
pessimistic : bool, optional
Apply pessimistic error scaling?
double_time : bool, optional
Predict double time offsets.
half_time : bool, optional
Predict half time offsets.
hard_rock : bool, optional
Predict hard_rock offsets.
Returns
-------
aim_error : np.ndarray
The predicted magnitude of (x, y) error in osu! pixels.
accuracy_error : np.ndarray
The predicted magnitude of time error in milliseconds.
"""
aim_error, accuracy_error = self._model.predict(
self._feature_scaler.transform(
self._extract_features(
beatmap,
double_time=double_time,
half_time=half_time,
hard_rock=hard_rock,
)[0],
),
)
if pessimistic:
aim_error **= self._aim_pessimism_factor
accuracy_error **= self._accuracy_pessimism_factor
return aim_error, accuracy_error
def predict(self,
beatmap,
*,
pessimistic=True,
double_time=False,
half_time=False,
hard_rock=False,
random_state=None,
samples=1000):
"""Predict the user's accuracy on the beatmap with the given mods.
Parameters
----------
beatmap : sl.Beatmap
The beatmap to predict.
pessimistic : bool, optional
Apply pessimistic error scaling?
double_time : bool, optional
Predict double time offsets.
half_time : bool, optional
Predict half time offsets.
hard_rock : bool, optional
Predict hard_rock offsets.
random_state : np.random.RandomState, optional
The numpy random state used to draw samples.
samples : int, optional
The number of plays to simulate.
Returns
-------
prediction : Prediction
A collection of predicted values for this play.
"""
aim_error, accuracy_error = self._predict_raw_error(
beatmap,
double_time=double_time,
half_time=half_time,
hard_rock=hard_rock,
pessimistic=pessimistic,
)
# fit the distributions to the predicted data
aim_distribution = self._fit_lognorm_distribution(aim_error)
accuracy_distribution = self._fit_lognorm_distribution(accuracy_error)
predicted_object_count = len(beatmap.hit_objects_no_spinners)
spinner_count = len(beatmap.hit_objects) - predicted_object_count
aim_samples = aim_distribution.rvs(
predicted_object_count * samples,
random_state=random_state,
)
accuracy_samples = accuracy_distribution.rvs(
predicted_object_count * samples,
random_state=random_state,
)
hit_windows = np.array([
sl.mod.od_to_ms(
beatmap.od(
hard_rock=hard_rock,
double_time=double_time,
half_time=half_time,
),
),
]).T
circle_radius = sl.mod.circle_radius(beatmap.cs(hard_rock=hard_rock))
simulated_300, simulated_100, simulated_50, simulated_miss = (
3 - (
(aim_samples <= circle_radius) &
(accuracy_samples <= hit_windows)
).sum(axis=0) ==
np.array([[0, 1, 2, 3]]).T
).T.reshape(samples, -1, 4).sum(axis=1).T
simulated_300 += spinner_count # assume perfect spinners
simulated_accuracies = sl.utils.accuracy(
simulated_300,
simulated_100,
simulated_50,
simulated_miss,
)
simulated_pp = beatmap.performance_points(
count_300=simulated_300,
count_100=simulated_100,
count_50=simulated_50,
count_miss=simulated_miss,
hidden=self.hidden,
hard_rock=hard_rock,
double_time=double_time,
half_time=half_time,
)
return Prediction(
predicted_aim_error=aim_error,
predicted_aim_distribution=aim_distribution,
predicted_accuracy_error=accuracy_error,
predicted_accuracy_distribution=accuracy_distribution,
accuracy_mean=simulated_accuracies.mean(),
accuracy_std=simulated_accuracies.std(),
pp_mean=simulated_pp.mean(),
pp_std=simulated_pp.std(),
)
def save_path(self, path):
self._model.save(path)
self._feature_scaler.save_path(path.with_suffix('.feature_scaler'))
with open(path.with_suffix('.pessimism'), 'wb') as f:
np.savez(
f,
aim_pessimism_factor=self._aim_pessimism_factor,
accuracy_pessimism_factor=self._accuracy_pessimism_factor,
)
@classmethod
def load_path(cls, path, *, hidden):
self = cls(hidden=hidden)
self._model = keras.models.load_model(path)
self._feature_scaler = Scaler.load_path(
path.with_suffix('.feature_scaler'),
)
with np.load(str(path.with_suffix('.pessimism'))) as f:
self._aim_pessimism_factor = f['aim_pessimism_factor']
self._accuracy_pessimism_factor = f['accuracy_pessimism_factor']
return self
[docs]class ErrorModel:
"""A model for osu! which trains on windows of time events and attempts to
predict the user's aim and accuracy error for each circle.
Parameters
----------
context : int
The number of leading and trailing hit objects or slider ticks to look
at.
aim_pessimism_factor : float
An exponential increase in aim error to account for the fact that most
replays are heavily biased towards a user's best replays.
accuracy_pessimism_factor : float
An exponential increase in accuracy error to account for the fact that
most replays are heavily biased towards a user's best replays.
trailing_context : int
The number of leading trailing hit objects or slider ticks to look at.
forward_context : int
The number of forward hit objects or slider ticks to look at.
hidden_layer_sizes : tuple[int, int]
The sizes of the hidden layers.
dropout : float
The droput ratio.
activation : str
The activation function. Must be one of:
'tanh', 'softplus, 'softsign', 'relu', 'sigmoid', 'hard_sigmoid', or
'linear'.
loss : {'mse', 'mae', 'mape', 'male', 'cosine'
The loss function.
optimizer : str
The optimizer to use.
max_hit_objects : int
The maximum number of hit objects allows in a beatmap. Data will be
padded to fit this shape.
"""
version = 0
def __init__(self, *args, **kwargs):
self._hidden_model = _InnerErrorModel(True, *args, **kwargs)
self._non_hidden_model = _InnerErrorModel(False, *args, **kwargs)
[docs] def save_path(self, path):
"""Serialize the model as a directory.
Parameters
----------
path : path-like
The path to the directory to serialize the model to.
See Also
--------
lain.ErrorModel.load_path
"""
path = pathlib.Path(path)
path.mkdir(exist_ok=True)
(path / 'version').write_text(str(self.version))
self._hidden_model.save_path(path / 'hidden')
self._non_hidden_model.save_path(path / 'non-hidden')
@classmethod
[docs] def load_path(cls, path):
"""Deserialize a model from a directory.
Parameters
----------
path : path-like
The path to the directory to load.
Returns
-------
self : lain.ErrorModel
The loaded model.
See Also
--------
lain.ErrorModel.save_path
"""
path = pathlib.Path(path)
version = int((path / 'version').read_text())
if version != cls.version:
raise ValueError(
f'saved model is of version {version} but the code is on'
f' version {cls.version}',
)
self = cls()
self._hidden_model = _InnerErrorModel.load_path(
path / 'hidden',
hidden=True,
)
self._non_hidden_model = _InnerErrorModel.load_path(
path / 'non-hidden',
hidden=False,
)
return self
[docs] def fit(self, replays, *, verbose=False, epochs=10):
"""Fit the model to data.
Parameters
----------
replays : iterable[sl.Replay]
The replays to fit the model to.
verbose : bool, optional
Print verbose messages to stdout?
epochs : int, optional
The number of times to pass through the replays.
Returns
-------
hidden_history : keras.History
The history of training the keras model on the replays with hidden.
non_hidden_history : keras.History
The history of training the keras model on the replays without
hidden.
"""
hidden, non_hidden = dichotomize(lambda replay: replay.hidden, replays)
if verbose:
print('fitting the hidden replays')
hidden_history = self._hidden_model.fit(
hidden,
verbose=verbose,
epochs=epochs,
)
if verbose:
print('fitting the non-hidden replays')
non_hidden_history = self._non_hidden_model.fit(
non_hidden,
verbose=verbose,
epochs=epochs,
)
return hidden_history, non_hidden_history
[docs] def predict(self,
beatmap,
*,
pessimistic=True,
hidden=False,
double_time=False,
half_time=False,
hard_rock=False,
random_state=None,
samples=1000):
"""Predict the user's accuracy on the beatmap with the given mods.
Parameters
----------
beatmap : sl.Beatmap
The beatmap to predict.
pessimistic : bool, optional
Apply pessimistic error scaling?
hidden : bool, optional
Predict performance with hidden?
double_time : bool, optional
Predict performance with double time?
half_time : bool, optional
Predict performance with half time?
hard_rock : bool, optional
Predict performance with hard rock?
random_state : np.random.RandomState, optional
The numpy random state used to draw samples.
samples : int, optional
The number of plays to simulate.
Returns
-------
prediction : Prediction
A collection of predicted values for this play.
"""
if hidden:
predict = self._hidden_model.predict
else:
predict = self._non_hidden_model.predict
return predict(
beatmap,
pessimistic=pessimistic,
double_time=double_time,
half_time=half_time,
hard_rock=hard_rock,
random_state=random_state,
samples=samples,
)