Overhaul converter system

- Keyword argument support
- New Greedy, Union and Optional converters
- Optionally passing extra data as raw list
This commit is contained in:
Ben 2019-04-11 01:00:14 +01:00
parent d292407dc7
commit 5a01be8c68
2 changed files with 185 additions and 105 deletions

View File

@ -1,155 +1,221 @@
import zope.interface
from zope.interface import implementer
from abc import ABC
from abc import abstractmethod
import asyncio
class IConverter(zope.interface.Interface):
class IConverter(ABC):
description = zope.interface.Attribute("""A short description of the purpose of the converter""")
@property
@abstractmethod
def description(self):
"""A short description of the purpose of the converter"""
async def convert(self):
raise NotImplementedError('Converter must derive this class!')
@abstractmethod
async def convert(self, ctx):
"""The actual converter implementation"""
class Converter:
__slots__ = ['p', 'argument']
def __init__(self, p, argument):
self.p = p
self.argument = argument
@implementer(IConverter)
class CredentialsConverter(Converter):
class CredentialsConverter(IConverter):
description = """Used for obtaining login credentials from XML login data"""
async def convert(self):
username = self.argument[0][0].text
password = self.argument[0][1].text
async def convert(self, ctx):
username = ctx.argument[0][0].text
password = ctx.argument[0][1].text
return username, password
@implementer(IConverter)
class VersionChkConverter(Converter):
class VersionChkConverter(IConverter):
description = """Used for checking the verChk version number"""
async def convert(self):
return self.argument[0].get('v')
async def convert(self, ctx):
return ctx.argument[0].get('v')
@implementer(IConverter)
class ConnectedPenguinConverter(Converter):
class ConnectedPenguinConverter(IConverter):
description = """Converts a penguin ID into a live penguin instance
or none if the player is offline"""
async def convert(self):
penguin_id = int(self.argument)
if penguin_id in self.p.server.penguins_by_id:
return self.p.server.penguins_by_id[penguin_id]
async def convert(self, ctx):
penguin_id = int(ctx.argument)
if penguin_id in ctx.p.server.penguins_by_id:
return ctx.p.server.penguins_by_id[penguin_id]
return None
@implementer(IConverter)
class ConnectedIglooConverter(Converter):
class ConnectedIglooConverter(IConverter):
description = """Converts a penguin ID into a live igloo instance or
none if it's not available"""
async def convert(self):
igloo_id = int(self.argument)
if igloo_id in self.p.server.igloo_map:
return self.p.server.igloo_map[igloo_id]
async def convert(self, ctx):
igloo_id = int(ctx.argument)
if igloo_id in ctx.p.server.igloo_map:
return ctx.p.server.igloo_map[igloo_id]
return None
@implementer(IConverter)
class RoomConverter(Converter):
class RoomConverter(IConverter):
description = """Converts a room ID into a Houdini.Data.Room instance"""
async def convert(self):
room_id = int(self.argument)
if room_id in self.p.server.rooms:
return self.p.server.rooms[room_id]
async def convert(self, ctx):
room_id = int(ctx.argument)
if room_id in ctx.p.server.rooms:
return ctx.p.server.rooms[room_id]
return None
@implementer(IConverter)
class ItemConverter(Converter):
class ItemConverter(IConverter):
description = """Converts an item ID into a Houdini.Data.Item instance"""
async def convert(self):
item_id = int(self.argument)
if item_id in self.p.server.items:
return self.p.server.items[item_id]
async def convert(self, ctx):
item_id = int(ctx.argument)
if item_id in ctx.p.server.items:
return ctx.p.server.items[item_id]
return None
@implementer(IConverter)
class IglooConverter(Converter):
class IglooConverter(IConverter):
description = """Converts an igloo ID into a Houdini.Data.Igloo instance"""
async def convert(self):
igloo_id = int(self.argument)
if igloo_id in self.p.server.igloos:
return self.p.server.igloos[igloo_id]
async def convert(self, ctx):
igloo_id = int(ctx.argument)
if igloo_id in ctx.p.server.igloos:
return ctx.p.server.igloos[igloo_id]
return None
@implementer(IConverter)
class FurnitureConverter(Converter):
class FurnitureConverter(IConverter):
description = """Converts a furniture ID into a Houdini.Data.Furniture instance"""
async def convert(self):
furniture_id = int(self.argument)
if furniture_id in self.p.server.furniture:
return self.p.server.furniture[furniture_id]
async def convert(self, ctx):
furniture_id = int(ctx.argument)
if furniture_id in ctx.p.server.furniture:
return ctx.p.server.furniture[furniture_id]
return None
@implementer(IConverter)
class FlooringConverter(Converter):
class FlooringConverter(IConverter):
description = """Converts a flooring ID into a Houdini.Data.Flooring instance"""
async def convert(self):
flooring_id = int(self.argument)
if flooring_id in self.p.server.flooring:
return self.p.server.flooring[flooring_id]
async def convert(self, ctx):
flooring_id = int(ctx.argument)
if flooring_id in ctx.p.server.flooring:
return ctx.p.server.flooring[flooring_id]
return None
@implementer(IConverter)
class StampConverter(Converter):
class StampConverter(IConverter):
description = """Converts a stamp ID into a Houdini.Data.Stamp instance"""
async def convert(self):
stamp_id = int(self.argument)
if stamp_id in self.p.server.stamps:
return self.p.server.stamps[stamp_id]
async def convert(self, ctx):
stamp_id = int(ctx.argument)
if stamp_id in ctx.p.server.stamps:
return ctx.p.server.stamps[stamp_id]
return None
@implementer(IConverter)
class VerticalConverter(Converter):
class SeparatorConverter(IConverter):
description = """Converts vertically separated values into an int list"""
__slots__ = ['separator', 'mapper']
async def convert(self):
return map(int, self.argument.split('|'))
description = """Converts strings separated by char into a list of type"""
def __init__(self, separator='|', mapper=int):
self.separator = separator
self.mapper = mapper
async def convert(self, ctx):
return map(self.mapper, ctx.argument.split(self.separator))
@implementer(IConverter)
class CommaConverter(Converter):
class UnionConverter(IConverter):
description = """Converts comma separated values into an int list"""
__slots__ = ['types']
async def convert(self):
return map(int, self.argument.split(','))
description = """Converts union type into argument"""
def __init__(self, *types, skip_none=False):
self.types = types
self.skip_none = skip_none
async def convert(self, ctx):
for converter in self.types:
try:
result = await do_conversion(converter, ctx)
if not self.skip_none or result is not None:
return result
except ValueError:
continue
class GreedyConverter(IConverter):
__slots__ = ['target']
description = """Converts until it can't any longer"""
def __init__(self, target=int):
self.target = target
async def convert(self, ctx):
converted = []
try:
converted.append(await do_conversion(self.target, ctx))
for ctx.argument in ctx.arguments:
converted.append(await do_conversion(self.target, ctx))
except ValueError:
return converted
return converted
class OptionalConverter(IConverter):
__slots__ = ['target']
description = """Tries to convert but ignores if it can't"""
def __init__(self, target=int):
self.target = target
async def convert(self, ctx):
try:
return await do_conversion(self.target, ctx)
except ValueError:
return ctx.component.default
class _ConverterContext:
__slots__ = ['component', 'arguments', 'argument', 'p']
def __init__(self, component, arguments, argument, p):
self.component = component
self.arguments = arguments
self.argument = argument
self.p = p
def get_converter(component):
if component.annotation is component.empty:
return str
return component.annotation
async def do_conversion(converter, ctx):
if issubclass(type(converter), IConverter) and not isinstance(converter, IConverter):
converter = converter()
if isinstance(converter, IConverter):
if asyncio.iscoroutinefunction(converter.convert):
return await converter.convert(ctx)
return converter.convert(ctx)
return converter(ctx.argument)

View File

@ -110,39 +110,53 @@ class _Listener:
class _XTListener(_Listener):
__slots__ = ['pre_login']
__slots__ = ['pre_login', 'rest_raw', 'keywords']
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pre_login = kwargs.get('pre_login')
self.rest_raw = kwargs.get('rest_raw', False)
self.keywords = len(inspect.getfullargspec(self.handler).kwonlyargs)
if self.rest_raw:
self.components = self.components[:-1]
async def __call__(self, p, packet_data):
if not self.pre_login and not p.joined_world:
p.logger.warn('{} tried sending XT packet before authentication!'.format(p.peer_name))
await p.close()
return
await super().__call__(p, packet_data)
super().__call__(p, packet_data)
handler_call_arguments = [self.instance] if self.instance is not None else []
handler_call_arguments = [self.plugin] if self.plugin is not None else []
handler_call_arguments += [self.packet, p] if self.pass_packet else [p]
handler_call_keywords = {}
arguments = iter(packet_data)
for index, component in enumerate(self.components):
if component.default is not component.empty:
handler_call_arguments.append(component.default)
next(arguments)
elif component.kind == component.POSITIONAL_OR_KEYWORD:
component_data = next(arguments)
converter = get_converter(component)
handler_call_arguments.append(await do_conversion(converter, p, component_data))
elif component.kind == component.VAR_POSITIONAL:
for component_data in arguments:
converter = get_converter(component)
handler_call_arguments.append(await do_conversion(converter, p, component_data))
break
return await self.handler(*handler_call_arguments)
arguments = iter(packet_data[:-self.keywords])
ctx = _ConverterContext(None, arguments, None, p)
for ctx.component in self.components:
if ctx.component.annotation is ctx.component.empty and ctx.component.default is not ctx.component.empty:
handler_call_arguments.append(ctx.component.default)
next(ctx.arguments)
elif ctx.component.kind == ctx.component.POSITIONAL_OR_KEYWORD:
ctx.argument = next(ctx.arguments)
converter = get_converter(ctx.component)
handler_call_arguments.append(await do_conversion(converter, ctx))
elif ctx.component.kind == ctx.component.VAR_POSITIONAL:
for argument in ctx.arguments:
ctx.argument = argument
converter = get_converter(ctx.component)
handler_call_arguments.append(await do_conversion(converter, ctx))
elif ctx.component.kind == ctx.component.KEYWORD_ONLY:
ctx.argument = packet_data[-self.keywords:][len(handler_call_keywords)]
converter = get_converter(ctx.component)
handler_call_keywords[ctx.component.name] = await do_conversion(converter, ctx)
if self.rest_raw:
handler_call_arguments.append(list(ctx.arguments))
return await self.handler(*handler_call_arguments, **handler_call_keywords)
elif not len(list(ctx.arguments)):
return await self.handler(*handler_call_arguments, **handler_call_keywords)
class _XMLListener(_Listener):