Move cooldown classes into their own module

This commit is contained in:
Ben 2019-04-11 00:51:44 +01:00
parent e3f0b04ca0
commit 7d048372fb
2 changed files with 94 additions and 83 deletions

87
Houdini/Cooldown.py Normal file
View File

@ -0,0 +1,87 @@
import enum
import time
class CooldownError(Exception):
"""Raised when packets are sent whilst a cooldown is active"""
pass
class BucketType(enum.Enum):
Default = 1
Penguin = 1
Server = 2
class _Cooldown:
__slots__ = ['rate', 'per', 'bucket_type', 'last',
'_window', '_tokens']
def __init__(self, per, rate, bucket_type):
self.per = float(per)
self.rate = int(rate)
self.bucket_type = bucket_type
self.last = 0.0
self._window = 0.0
self._tokens = self.rate
@property
def is_cooling(self):
current = time.time()
self.last = current
if self._tokens == self.rate:
self._window = current
if current > self._window + self.per:
self._tokens = self.rate
self._window = current
if self._tokens == 0:
return self.per - (current - self._window)
self._tokens -= 1
if self._tokens == 0:
self._window = current
def reset(self):
self._tokens = self.rate
self.last = 0.0
def copy(self):
return _Cooldown(self.per, self.rate, self.bucket_type)
class _CooldownMapping:
__slots__ = ['_cooldown', '_cache', 'callback']
def __init__(self, callback, cooldown_object):
self._cooldown = cooldown_object
self.callback = callback
self._cache = {}
def _get_bucket_key(self, p):
if self._cooldown.bucket_type == BucketType.Default:
return p
return p.server
def _verify_cache_integrity(self):
current = time.time()
self._cache = {cache_key: bucket for cache_key, bucket in
self._cache.items() if current < bucket.last + bucket.per}
def get_bucket(self, p):
self._verify_cache_integrity()
cache_key = self._get_bucket_key(p)
if cache_key not in self._cache:
bucket = self._cooldown.copy()
self._cache[cache_key] = bucket
else:
bucket = self._cache[cache_key]
return bucket

View File

@ -7,6 +7,7 @@ from types import FunctionType
from Houdini.Converters import IConverter from Houdini.Converters import IConverter
from Houdini.Cooldown import _Cooldown, _CooldownMapping, BucketType, CooldownError
def get_relative_function_path(function_obj): def get_relative_function_path(function_obj):
abs_function_file = inspect.getfile(function_obj) abs_function_file = inspect.getfile(function_obj)
@ -64,83 +65,6 @@ class Priority(enum.Enum):
Low = 1 Low = 1
class BucketType(enum.Enum):
Default = 1
Penguin = 1
Server = 2
class _Cooldown:
__slots__ = ['rate', 'per', 'bucket_type', 'last',
'_window', '_tokens']
def __init__(self, per, rate, bucket_type):
self.per = float(per)
self.rate = int(rate)
self.bucket_type = bucket_type
self.last = 0.0
self._window = 0.0
self._tokens = self.rate
@property
def is_cooling(self):
current = time.time()
self.last = current
if self._tokens == self.rate:
self._window = current
if current > self._window + self.per:
self._tokens = self.rate
self._window = current
if self._tokens == 0:
return self.per - (current - self._window)
self._tokens -= 1
if self._tokens == 0:
self._window = current
def reset(self):
self._tokens = self.rate
self.last = 0.0
def copy(self):
return _Cooldown(self.per, self.rate, self.bucket_type)
class _CooldownMapping:
__slots__ = ['_cooldown', '_cache']
def __init__(self, cooldown_object):
self._cooldown = cooldown_object
self._cache = {}
def _get_bucket_key(self, p):
if self._cooldown.bucket_type == BucketType.Default:
return p
return p.server
def _verify_cache_integrity(self):
current = time.time()
self._cache = {cache_key: bucket for cache_key, bucket in
self._cache.items() if current < bucket.last + bucket.per}
def get_bucket(self, p):
self._verify_cache_integrity()
cache_key = self._get_bucket_key(p)
if cache_key not in self._cache:
bucket = self._cooldown.copy()
self._cache[cache_key] = bucket
else:
bucket = self._cache[cache_key]
return bucket
class _Listener: class _Listener:
__slots__ = ['packet', 'components', 'handler', 'priority', __slots__ = ['packet', 'components', 'handler', 'priority',
@ -255,14 +179,14 @@ def handler(packet, **kwargs):
listener_class = _XTListener if isinstance(packet, XTPacket) else _XMLListener listener_class = _XTListener if isinstance(packet, XTPacket) else _XMLListener
try: try:
cooldown_object = handler_function.cooldown cooldown_object = handler_function.__cooldown
del handler_function.cooldown del handler_function.__cooldown
except AttributeError: except AttributeError:
cooldown_object = None cooldown_object = None
try: try:
checklist = handler_function.checks checklist = handler_function.__checks
del handler_function.checks del handler_function.__checks
except AttributeError: except AttributeError:
checklist = [] checklist = []
@ -313,12 +237,12 @@ def cooldown(per=1.0, rate=1, bucket_type=BucketType.Default):
def check(predicate): def check(predicate):
def decorator(handler_function): def decorator(handler_function):
if not hasattr(handler_function, 'checks'): if not hasattr(handler_function, 'checks'):
handler_function.checks = [] handler_function.__checks = []
if not type(predicate) == FunctionType: if not type(predicate) == FunctionType:
raise TypeError('All handler checks must be a function') raise TypeError('All handler checks must be a function')
handler_function.checks.append(predicate) handler_function.__checks.append(predicate)
return handler_function return handler_function
return decorator return decorator