import bisect
import datetime
from enum import unique
import os
import lzma
from .beatmap import Circle, Slider, Spinner
from .bit_enum import BitEnum
from .game_mode import GameMode
from .mod import Mod, od_to_ms, circle_radius
from .position import Position
from .utils import accuracy, lazyval, orange
@unique
class ActionBitMask(BitEnum):
"""The bitmask values for the action type.
"""
m1 = 1
m2 = 2
k1 = 5
k2 = 10
[docs]class Action:
"""A user action.
Parameters
----------
offset : timedelta
The offset since the beginning of the song.
position : Position
The position of the cursor.
key1 : bool
Is the first keyboard key pressed?
key2 : bool
Is the second keyboard key pressed?
mouse1 : bool
Is the first mouse button pressed?
mouse2 : bool
is the second mouse button pressed?
"""
def __init__(self, offset, position, key1, key2, mouse1, mouse2):
self.offset = offset
self.position = position
self.key1 = key1
self.key2 = key2
self.mouse1 = mouse1
self.mouse2 = mouse2
@property
def action_bitmask(self):
"""Get the action bitmask from an action.
"""
return ActionBitMask.pack(
m1=self.mouse1,
m2=self.mouse2,
k1=self.key1,
k2=self.key2,
)
def _consume_byte(buffer):
result = buffer[0]
del buffer[0]
return result
def _consume_short(buffer):
result = int.from_bytes(buffer[:2], 'little')
del buffer[:2]
return result
def _consume_int(buffer):
result = int.from_bytes(buffer[:4], 'little')
del buffer[:4]
return result
def _consume_long(buffer):
result = int.from_bytes(buffer[:8], 'little')
del buffer[:8]
return result
def _consume_uleb128(buffer):
result = 0
shift = 0
while True:
byte = _consume_byte(buffer)
result |= (byte & 0x7f) << shift
if (byte & 0x80) == 0:
break
shift += 7
return result
def _consume_string(buffer):
mode = _consume_byte(buffer)
if mode == 0:
return None
if mode != 0x0b:
raise ValueError(
f'unknown string start byte: {hex(mode)}, expected 0 or 0x0b',
)
byte_length = _consume_uleb128(buffer)
data = buffer[:byte_length]
del buffer[:byte_length]
return data.decode('utf-8')
_windows_epoch = datetime.datetime(1, 1, 1)
def _consume_datetime(buffer):
windows_ticks = _consume_long(buffer)
return _windows_epoch + datetime.timedelta(microseconds=windows_ticks / 10)
def _consume_life_bar_graph(buffer):
life_bar_graph_raw = _consume_string(buffer)
return [
(datetime.timedelta(milliseconds=int(offset)), float(value))
for offset, value in (
pair.split('|') for pair in life_bar_graph_raw.split(',') if pair
)
]
def _consume_actions(buffer):
compressed_byte_count = _consume_int(buffer)
compressed_data = buffer[:compressed_byte_count]
del buffer[:compressed_byte_count]
decompressed_data = lzma.decompress(compressed_data)
out = []
offset = 0
for raw_action in decompressed_data.split(b','):
if not raw_action:
continue
raw_offset, x, y, raw_action_mask = raw_action.split(b'|')
action_mask = ActionBitMask.unpack(int(raw_action_mask))
offset += int(raw_offset)
out.append(Action(
datetime.timedelta(milliseconds=offset),
Position(float(x), float(y)),
action_mask['m1'],
action_mask['m2'],
action_mask['k1'],
action_mask['k2'],
))
return out
def _within(p1, p2, d):
"""Determines whether 2 points are within a distance of each other
Parameters
---------
p1 : Position
The first point
p2 : Position
The second point
d : int or float
The distance
Returns
----------
bool
Whether the distance between the points is less than d
"""
return (p1.x - p2.x) ** 2 + (p1.y - p2.y) ** 2 < d ** 2
def _pressed(datum):
return datum.key1 or datum.key2 or datum.mouse1 or datum.mouse2
def _process_circle(obj, rdatum, hw, scores):
out_by = abs(rdatum.offset - obj.time)
if out_by < datetime.timedelta(milliseconds=hw.hit_300):
scores["300s"].append(obj)
elif out_by < datetime.timedelta(milliseconds=hw.hit_100):
scores["100s"].append(obj)
else:
# must be within the 50 hit window or we wouldn't be here
scores["50s"].append(obj)
def _process_slider(obj, rdata, head_hit, rad, scores):
t_changes = []
t_changes_append = t_changes.append
duration = obj.end_time - obj.time
if head_hit:
t_changes_append((rdata[0].offset - obj.time) / duration)
on = True
else:
scores["slider_breaks"].append(obj)
on = False
for datum in rdata:
t = (datum.offset - obj.time) / duration
if 0 <= t <= 1:
nearest_pos = obj.curve(t)
if (on and
not (_pressed(datum)
and _within(nearest_pos, datum.position, rad * 3))):
t_changes_append(t)
on = False
elif (not on and
(_pressed(datum) and
_within(nearest_pos, datum.position, rad))):
t_changes_append(t)
on = True
tick_ts = list(orange(obj.tick_rate, obj.num_beats, obj.tick_rate))
missed_points = 0 if head_hit else 1
for tick in tick_ts:
bi = bisect.bisect_left(t_changes, tick)
if bi % 2 == 0:
# missed a tick
if tick is tick_ts[-1]:
if (len(t_changes) > 0 and
len(t_changes) == bi and
abs(tick_ts[-1] - t_changes[-1]) < 0.1):
# held close enough to last tick
continue
# end tick doesn't cause sliderbreak
elif obj not in scores["slider_breaks"]:
scores["slider_breaks"].append(obj)
missed_points += 1
if missed_points == obj.ticks:
# all ticks and head missed -> miss
scores["misses"].append(obj)
elif missed_points > obj.ticks / 2:
scores["50s"].append(obj)
elif missed_points > 0:
scores["100s"].append(obj)
else:
scores["300s"].append(obj)
[docs]class Replay:
"""An osu! replay.
Parameters
----------
mode : GameMode
The game mode.
version : int
The version of osu! used to create this replay.
beatmap_md5 : str
The md5 hash of the beatmap played in this replay.
player_name : str
The name of the player who recorded this replay.
replay_md5 : str
The md5 hash of part of the data in this replay.
count_300 : int
The number of 300's hit in the replay.
count_100 : int
The number of 100's hit in the replay.
count_50 : int
The number of 50's hit in the replay.
count_geki : int
The number of geki in the replay. A geki is when the user scores all
300's for a given color section.
count_katu : int
The number of katu in the replay. A katu is when the user completes
a given color section without any 50's or misses. All 300's would
result in a geki instead of a katu.
count_miss : int
The number of misses in the replay.
score : int
The score earned in this replay. This is the normal score, not
performance points.
max_combo : int
The largest combo achieved in this replay.
full_combo : bool
Did the player earn a max combo in this replay?
no_fail : bool
Was the no_fail mod used?
easy : bool
Was the easy mod used?
no_video : bool
Was the no_video mod used?
hidden : bool
Was the hidden mod used?
hard_rock : hard_rock
Was the hard_rock mod used?
sudden_death : bool
Was the sudden_death mod used?
double_time : bool
Was the double_time mod used?
relax : bool
Was the relax mod used?
half_time : bool
Was the half_time mod used?
nightcore : bool
Was the nightcore mod used?
flashlight : bool
Was the flashlight mod used?
autoplay : bool
Was the autoplay mod used?
spun_out : bool
Was the spun_out mod used?
auto_pilot : bool
Was the auto_pilot mod used?
perfect : bool
Was the perfect mod used?
key4 : bool
Was the key4 mod used?
key5 : bool
Was the key5 mod used?
key6 : bool
Was the key6 mod used?
key7 : bool
Was the key7 mod used?
key8 : bool
Was the key8 mod used?
fade_in : bool
Was the fade_in mod used?
random : bool
Was the random mod used?
cinema : bool
Was the cinema mod used?
target_practice : bool
Was the target_practice mod used?
key9 : bool
Was the key9 mod used?
coop : bool
Was the coop mod used?
key1 : bool
Was the key1 mod used?
key3 : bool
Was the key3 mod used?
key2 : bool
Was the key2 mod used?
scoreV2 : bool
Was the scoreV2 mod used?
life_bar_graph : list[timedelta, float]
A list of time points paired with the value of the life bar at that
time. These appear in sorted order. The values are in the range [0, 1].
timestamp : datetime
When this replay was created.
actions : list[Action]
A sorted list of all of the actions recorded from the player.
beatmap : Beatmap or None
The beatmap played in this replay if known, otherwise None.
"""
def __init__(self,
mode,
version,
beatmap_md5,
player_name,
replay_md5,
count_300,
count_100,
count_50,
count_geki,
count_katu,
count_miss,
score,
max_combo,
full_combo,
no_fail,
easy,
no_video,
hidden,
hard_rock,
sudden_death,
double_time,
relax,
half_time,
nightcore,
flashlight,
autoplay,
spun_out,
auto_pilot,
perfect,
key4,
key5,
key6,
key7,
key8,
fade_in,
random,
cinema,
target_practice,
key9,
coop,
key1,
key3,
key2,
scoreV2,
life_bar_graph,
timestamp,
actions,
beatmap):
self.mode = mode
self.version = version
self.beatmap_md5 = beatmap_md5
self.player_name = player_name
self.replay_md5 = replay_md5
self.count_300 = count_300
self.count_100 = count_100
self.count_50 = count_50
self.count_geki = count_geki
self.count_katu = count_katu
self.count_miss = count_miss
self.score = score
self.max_combo = max_combo
self.full_combo = full_combo
self.no_fail = no_fail
self.easy = easy
self.no_video = no_video
self.hidden = hidden
self.hard_rock = hard_rock
self.sudden_death = sudden_death
self.double_time = double_time
self.relax = relax
self.half_time = half_time
self.nightcore = nightcore
self.flashlight = flashlight
self.autoplay = autoplay
self.spun_out = spun_out
self.auto_pilot = auto_pilot
self.perfect = perfect
self.key4 = key4
self.key5 = key5
self.key6 = key6
self.key7 = key7
self.key8 = key8
self.fade_in = fade_in
self.random = random
self.cinema = cinema
self.target_practice = target_practice
self.key9 = key9
self.coop = coop
self.key1 = key1
self.key3 = key3
self.key2 = key2
self.scoreV2 = scoreV2
self.life_bar_graph = life_bar_graph
self.timestamp = timestamp
self.actions = actions
self.beatmap = beatmap
@lazyval
def accuracy(self):
"""The accuracy achieved in the replay in the range [0, 1].
"""
if self.mode != GameMode.standard:
raise NotImplementedError(
'accuracy for non osu!standard replays is not yet supported',
)
return accuracy(
self.count_300,
self.count_100,
self.count_50,
self.count_miss,
)
@lazyval
def performance_points(self):
return self.beatmap.performance_points(
count_300=self.count_300,
count_100=self.count_100,
count_50=self.count_50,
count_miss=self.count_miss,
easy=self.easy,
hard_rock=self.hard_rock,
half_time=self.half_time,
double_time=self.double_time,
hidden=self.hidden,
flashlight=self.flashlight,
spun_out=self.spun_out,
)
@lazyval
def failed(self):
"""Did the user fail this attempt?
"""
for _, value in self.life_bar_graph:
if not value:
return True
return False
@classmethod
[docs] def from_path(cls,
path,
*,
library=None,
client=None,
save=False,
retrieve_beatmap=True):
"""Read in a ``Replay`` object from a ``.osr`` file on disk.
Parameters
----------
path : str or pathlib.Path
The path to the file to read from.
library : Library, optional
The library of beatmaps.
client : Client, optional.
The client used to find the beatmap.
save : bool, optional
If the beatmap needs to be downloaded with the client, should it be
saved to disk?
retrieve_beatmap : bool, optional
Whether to retrieve the beatmap the replay is for.
Returns
-------
replay : Replay
The parsed replay object.
Raises
------
ValueError
Raised when the file cannot be parsed as an ``.osr`` file.
"""
with open(path, 'rb') as f:
return cls.from_file(
f,
library=library,
client=client,
save=save,
retrieve_beatmap=retrieve_beatmap,
)
@classmethod
[docs] def from_directory(cls,
path,
*,
library=None,
client=None,
save=False,
retrieve_beatmap=True):
"""Read in a list of ``Replay`` objects from a directory of ``.osr``
files.
Parameters
----------
path : str or pathlib.Path
The path to the directory to read from.
library : Library, optional
The library of beatmaps.
client : Client, optional.
The client used to find the beatmap.
save : bool, optional
If the beatmap needs to be downloaded with the client, should it be
saved to disk?
retrieve_beatmap : bool, optional
Whether to retrieve the beatmap the replay is for.
Returns
-------
replays : list[Replay]
The parsed replay objects.
Raises
------
ValueError
Raised when any file cannot be parsed as an ``.osr`` file.
"""
return [
cls.from_path(
p,
library=library,
client=client,
save=save,
retrieve_beatmap=retrieve_beatmap,
)
for p in os.scandir(path)
if p.name.endswith('.osr')
]
@classmethod
[docs] def from_file(cls,
file,
*,
library=None,
client=None,
save=False,
retrieve_beatmap=True):
"""Read in a ``Replay`` object from an open file object.
Parameters
----------
file : file-like
The file object to read from.
library : Library, optional
The library of beatmaps.
client : Client, optional.
The client used to find the beatmap.
save : bool, optional
If the beatmap needs to be downloaded with the client, should it be
saved to disk?
retrieve_beatmap : bool, optional
Whether to retrieve the beatmap the replay is for.
Returns
-------
replay : Replay
The parsed replay object.
Raises
------
ValueError
Raised when the file cannot be parsed as a ``.osr`` file.
"""
return cls.parse(
file.read(),
library=library,
client=client,
save=save,
retrieve_beatmap=retrieve_beatmap
)
@classmethod
[docs] def parse(cls,
data,
*,
library=None,
client=None,
save=False,
retrieve_beatmap=True):
"""Parse a replay from ``.osr`` file data.
Parameters
----------
data : bytes
The data from an ``.osr`` file.
library : Library, optional
The library of beatmaps.
client : Client, optional.
The client used to find the beatmap.
save : bool, optional
If the beatmap needs to be downloaded with the client, should it be
saved to disk?
retrieve_beatmap : bool, optional
Whether to retrieve the beatmap the replay is for.
Returns
-------
replay : Replay
The parsed replay.
Raises
------
ValueError
Raised when ``data`` is not in the ``.osr`` format.
"""
if retrieve_beatmap:
if library is None and client is None:
raise ValueError(
'one of library or client must be passed if you wish the'
' beatmap to be retrieved',
)
use_client = client is not None
if use_client:
if library is not None:
raise ValueError(
'only one of library or client can be passed'
)
library = client.library
buffer = bytearray(data)
mode = GameMode(_consume_byte(buffer))
version = _consume_int(buffer)
beatmap_md5 = _consume_string(buffer)
player_name = _consume_string(buffer)
replay_md5 = _consume_string(buffer)
count_300 = _consume_short(buffer)
count_100 = _consume_short(buffer)
count_50 = _consume_short(buffer)
count_geki = _consume_short(buffer)
count_katu = _consume_short(buffer)
count_miss = _consume_short(buffer)
score = _consume_int(buffer)
max_combo = _consume_short(buffer)
full_combo = bool(_consume_byte(buffer))
mod_mask = _consume_int(buffer)
life_bar_graph = _consume_life_bar_graph(buffer)
timestamp = _consume_datetime(buffer)
actions = _consume_actions(buffer)
mod_kwargs = Mod.unpack(mod_mask)
# delete the alias field names
del mod_kwargs['relax2']
del mod_kwargs['last_mod']
if retrieve_beatmap:
try:
beatmap = library.lookup_by_md5(beatmap_md5)
except KeyError:
if not use_client:
raise
beatmap = client.beatmap(
beatmap_md5=beatmap_md5,
).beatmap(save=save)
else:
beatmap = None
return cls(
mode=mode,
version=version,
beatmap_md5=beatmap_md5,
player_name=player_name,
replay_md5=replay_md5,
count_300=count_300,
count_100=count_100,
count_50=count_50,
count_geki=count_geki,
count_katu=count_katu,
count_miss=count_miss,
score=score,
max_combo=max_combo,
full_combo=full_combo,
life_bar_graph=life_bar_graph,
timestamp=timestamp,
actions=actions,
beatmap=beatmap,
**mod_kwargs,
)
@lazyval
def hits(self):
"""Dictionary containing beatmap's hit objects sorted into
300s, 100s, 50s, misses, slider_breaks as they were hit in the replay
Each hit object will be in exactly one category except sliders which
may be in slider_breaks in addition to another category
Slider calculations are unreliable so some objects may in the wrong
category.
Spinners are not yet calculated so are always in the 300s category.
"""
beatmap = self.beatmap
actions = self.actions
scores = {"300s": [],
"100s": [],
"50s": [],
"misses": [],
"slider_breaks": [],
}
hw = od_to_ms(beatmap.od(easy=self.easy, hard_rock=self.hard_rock))
rad = circle_radius(
beatmap.cs(easy=self.easy, hard_rock=self.hard_rock),
)
hit_50_threshold = datetime.timedelta(milliseconds=hw.hit_50)
i = 0
for obj in beatmap.hit_objects:
if self.hard_rock:
obj = obj.hard_rock
if isinstance(obj, Spinner):
# spinners are hard
scores['300s'].append(obj)
continue
# we can ignore events before the hit window so iterate
# until we get past the beginning of the hit window
while actions[i].offset < obj.time - hit_50_threshold:
i += 1
starti = i
while actions[i].offset < obj.time + hit_50_threshold:
if (((actions[i].key1 and not actions[i - 1].key1)
or (actions[i].key2 and not actions[i - 1].key2))
and _within(actions[i].position, obj.position, rad)):
# key pressed that wasn't before and
# event is in hit window and correct location
if isinstance(obj, Circle):
_process_circle(obj, actions[i], hw, scores)
elif isinstance(obj, Slider):
# Head was hit
starti = i
while actions[i].offset <= obj.end_time:
i += 1
_process_slider(
obj, actions[starti:i + 1], True, rad, scores
)
break
i += 1
else:
# no events in the hit window were in the correct location
if isinstance(obj, Slider):
# Slider ticks might still be hit
while actions[i].offset <= obj.end_time:
i += 1
_process_slider(
obj, actions[starti:i + 1], False, rad, scores
)
else:
scores["misses"].append(obj)
i += 1
return scores
def __repr__(self):
try:
accuracy = f'{self.accuracy * 100:.2f}'
except NotImplementedError:
accuracy = '<unknown>'
beatmap = self.beatmap
if beatmap is None:
beatmap = '<unknown>'
return (
f'<{type(self).__qualname__}: {accuracy}% ('
f'{self.count_300}/{self.count_100}/'
f'{self.count_50}/{self.count_miss}) on {beatmap}>'
)