mirror of
https://github.com/DarkflameUniverse/DarkflameServer.git
synced 2024-11-09 17:58:20 +00:00
Move utils to submodule
This commit is contained in:
parent
960b090702
commit
0bad9d7186
@ -1,427 +0,0 @@
|
||||
"""
|
||||
Module for sequential reading (ReadStream) and writing (WriteStream) from/to bytes.
|
||||
Also includes objects for converting datatypes from/to bytes, similar to the standard library struct module.
|
||||
"""
|
||||
|
||||
# https://github.com/lcdr/bitstream/blob/master/bitstream/__init__.py
|
||||
|
||||
import math
|
||||
import struct
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import AnyStr, ByteString, cast, Generic, overload, SupportsBytes, Type, TypeVar
|
||||
|
||||
T = TypeVar('T')
|
||||
|
||||
class _Struct(Generic[T]):
|
||||
_struct: struct.Struct
|
||||
|
||||
def __new__(cls, value: T) -> bytes:
|
||||
return cls._struct.pack(value)
|
||||
|
||||
def __str__(self) -> str:
|
||||
return "<Struct %s>" % _Struct._struct.format
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, stream: "ReadStream") -> T:
|
||||
return cast(T, cls._struct.unpack(stream.read(bytes, length=cls._struct.size))[0])
|
||||
|
||||
class IntStruct(_Struct[int]):
|
||||
pass
|
||||
|
||||
class UnsignedIntStruct(IntStruct):
|
||||
@classmethod
|
||||
def deserialize_compressed(cls, stream: "ReadStream") -> int:
|
||||
number_of_bytes = cls._struct.size
|
||||
current_byte = number_of_bytes - 1
|
||||
|
||||
while current_byte > 0:
|
||||
if stream.read(c_bit):
|
||||
current_byte -= 1
|
||||
else:
|
||||
# Read the rest of the bytes
|
||||
read = stream.read(bytes, length=current_byte + 1) + bytes(number_of_bytes - current_byte - 1)
|
||||
return cast(int, cls._struct.unpack(read)[0])
|
||||
|
||||
# All but the first bytes are 0. If the upper half of the last byte is a 0 (positive) or 16 (negative) then what we read will be a 1 and the remaining 4 bits.
|
||||
# Otherwise we read a 0 and the 8 bits
|
||||
if stream.read(c_bit):
|
||||
start = bytes([stream.read_bits(4)])
|
||||
else:
|
||||
start = stream.read(bytes, length=1)
|
||||
read = start + bytes(number_of_bytes - current_byte - 1)
|
||||
return cast(int, cls._struct.unpack(read)[0])
|
||||
|
||||
class SignedIntStruct(IntStruct):
|
||||
pass
|
||||
|
||||
class c_bool(_Struct[bool]):
|
||||
_struct = struct.Struct("<?")
|
||||
|
||||
class c_float(_Struct[float]):
|
||||
_struct = struct.Struct("<f")
|
||||
|
||||
class c_double(_Struct[float]):
|
||||
_struct = struct.Struct("<d")
|
||||
|
||||
class c_int(SignedIntStruct):
|
||||
_struct = struct.Struct("<i")
|
||||
|
||||
class c_uint(UnsignedIntStruct):
|
||||
_struct = struct.Struct("<I")
|
||||
|
||||
|
||||
class c_byte(SignedIntStruct):
|
||||
_struct = struct.Struct("<b")
|
||||
|
||||
class c_ubyte(UnsignedIntStruct):
|
||||
_struct = struct.Struct("<B")
|
||||
|
||||
class c_short(SignedIntStruct):
|
||||
_struct = struct.Struct("<h")
|
||||
|
||||
class c_ushort(UnsignedIntStruct):
|
||||
_struct = struct.Struct("<H")
|
||||
|
||||
class c_long(SignedIntStruct):
|
||||
_struct = struct.Struct("<l")
|
||||
|
||||
class c_ulong(UnsignedIntStruct):
|
||||
_struct = struct.Struct("<L")
|
||||
|
||||
class c_longlong(SignedIntStruct):
|
||||
_struct = struct.Struct("<q")
|
||||
|
||||
class c_ulonglong(UnsignedIntStruct):
|
||||
_struct = struct.Struct("<Q")
|
||||
|
||||
c_int8 = c_byte
|
||||
c_uint8 = c_ubyte
|
||||
c_int16 = c_short
|
||||
c_uint16 = c_ushort
|
||||
c_int32 = c_long
|
||||
c_uint32 = c_ulong
|
||||
c_int64 = c_longlong
|
||||
c_uint64 = c_ulonglong
|
||||
|
||||
class c_bit:
|
||||
def __init__(self, boolean: bool):
|
||||
self.value = boolean
|
||||
|
||||
class Serializable(ABC):
|
||||
"""By inheriting from this class you can create types which you can pass to the read/write bitstream functions."""
|
||||
@abstractmethod
|
||||
def serialize(self, stream: "WriteStream") -> None:
|
||||
"""Write this object to the bitstream."""
|
||||
pass
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def deserialize(cls, stream: "ReadStream") -> "Serializable":
|
||||
"""Create a new object from the bitstream."""
|
||||
pass
|
||||
|
||||
S = TypeVar('S', bound=Serializable)
|
||||
|
||||
class ReadStream:
|
||||
"""Allows simple sequential reading from bytes."""
|
||||
_data: bytes
|
||||
|
||||
def __init__(self, data: bytes, unlocked: bool=False):
|
||||
self._data = data
|
||||
self._unlocked = unlocked
|
||||
self._read_offset = 0
|
||||
|
||||
@property
|
||||
def read_offset(self) -> int:
|
||||
if not self._unlocked:
|
||||
raise RuntimeError("access to read offset on locked stream")
|
||||
return self._read_offset
|
||||
|
||||
@read_offset.setter
|
||||
def read_offset(self, value: int) -> None:
|
||||
if not self._unlocked:
|
||||
raise RuntimeError("access to read offset on locked stream")
|
||||
self._read_offset = value
|
||||
|
||||
def skip_read(self, byte_length: int) -> None:
|
||||
"""Skips reading byte_length number of bytes."""
|
||||
self._read_offset += byte_length * 8
|
||||
|
||||
@overload
|
||||
def read(self, arg_type: Type[_Struct[T]]) -> T:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def read(self, arg_type: Type[c_bit]) -> bool:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def read(self, arg_type: Type[S]) -> S:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def read(self, arg_type: Type[bytes], length: int) -> bytes:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def read(self, arg_type: Type[bytes], allocated_length: int=None, length_type: Type[UnsignedIntStruct]=None) -> bytes:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def read(self, arg_type: Type[str], allocated_length: int=None, length_type: Type[UnsignedIntStruct]=None) -> str:
|
||||
pass
|
||||
|
||||
def read(self, arg_type, length=None, allocated_length=None, length_type=None):
|
||||
"""
|
||||
Read a value of type arg_type from the bitstream.
|
||||
allocated_length is for fixed-length strings.
|
||||
length_type is for variable-length strings.
|
||||
"""
|
||||
if issubclass(arg_type, _Struct):
|
||||
return arg_type.deserialize(self)
|
||||
if issubclass(arg_type, c_bit):
|
||||
return self._read_bit()
|
||||
if issubclass(arg_type, Serializable):
|
||||
return arg_type.deserialize(self)
|
||||
if allocated_length is not None or length_type is not None:
|
||||
return self._read_str(arg_type, allocated_length, length_type)
|
||||
if issubclass(arg_type, bytes):
|
||||
return self._read_bytes(length)
|
||||
raise TypeError(arg_type)
|
||||
|
||||
def _read_str(self, arg_type: Type[AnyStr], allocated_length: int=None, length_type: Type[UnsignedIntStruct]=None) -> AnyStr:
|
||||
if issubclass(arg_type, str):
|
||||
char_size = 2
|
||||
else:
|
||||
char_size = 1
|
||||
|
||||
if length_type is not None:
|
||||
# Variable-length string
|
||||
length = self.read(length_type)
|
||||
value = self._read_bytes(length*char_size)
|
||||
elif allocated_length is not None:
|
||||
# Fixed-length string
|
||||
value = self._read_bytes(allocated_length*char_size)
|
||||
# find null terminator
|
||||
for i in range(len(value)):
|
||||
char = value[i*char_size:(i+1)*char_size]
|
||||
if char == bytes(char_size):
|
||||
value = value[:i*char_size]
|
||||
break
|
||||
else:
|
||||
raise RuntimeError("String doesn't have null terminator")
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
if issubclass(arg_type, str):
|
||||
return value.decode("utf-16-le")
|
||||
return value
|
||||
|
||||
def _read_bit(self) -> bool:
|
||||
bit = self._data[self._read_offset // 8] & 0x80 >> self._read_offset % 8 != 0
|
||||
self._read_offset += 1
|
||||
return bit
|
||||
|
||||
def read_bits(self, number_of_bits: int) -> int:
|
||||
assert 0 < number_of_bits < 8
|
||||
|
||||
output = (self._data[self._read_offset // 8] << self._read_offset % 8) & 0xff # First half
|
||||
if self._read_offset % 8 != 0 and number_of_bits > 8 - self._read_offset % 8: # If we have a second half, we didn't read enough bytes in the first half
|
||||
output |= self._data[self._read_offset // 8 + 1] >> 8 - self._read_offset % 8 # Second half (overlaps byte boundary)
|
||||
output >>= 8 - number_of_bits
|
||||
self._read_offset += number_of_bits
|
||||
return output
|
||||
|
||||
def _read_bytes(self, length: int) -> bytes:
|
||||
if self._read_offset % 8 == 0:
|
||||
num_bytes_read = length
|
||||
else:
|
||||
num_bytes_read = length+1
|
||||
|
||||
# check whether there is enough left to read
|
||||
if len(self._data) - self._read_offset//8 < num_bytes_read:
|
||||
raise EOFError("Trying to read %i bytes but only %i remain" % (num_bytes_read, len(self._data) - self._read_offset // 8))
|
||||
|
||||
if self._read_offset % 8 == 0:
|
||||
output = self._data[self._read_offset // 8:self._read_offset // 8 + num_bytes_read]
|
||||
else:
|
||||
# data is shifted
|
||||
# clear the part before the struct
|
||||
|
||||
firstbyte = self._data[self._read_offset // 8] & ((1 << 8 - self._read_offset % 8) - 1)
|
||||
output = firstbyte.to_bytes(1, "big") + self._data[self._read_offset // 8 + 1:self._read_offset // 8 + num_bytes_read]
|
||||
# shift back
|
||||
output = (int.from_bytes(output, "big") >> (8 - self._read_offset % 8)).to_bytes(length, "big")
|
||||
self._read_offset += length * 8
|
||||
return output
|
||||
|
||||
def read_compressed(self, arg_type: Type[UnsignedIntStruct]) -> int:
|
||||
return arg_type.deserialize_compressed(self)
|
||||
|
||||
def read_remaining(self) -> bytes:
|
||||
return self._read_bytes(len(self._data) - int(math.ceil(self._read_offset / 8)))
|
||||
|
||||
def align_read(self) -> None:
|
||||
if self._read_offset % 8 != 0:
|
||||
self._read_offset += 8 - self._read_offset % 8
|
||||
|
||||
def all_read(self) -> bool:
|
||||
# This is not accurate to the bit, just to the byte
|
||||
return math.ceil(self._read_offset / 8) == len(self._data)
|
||||
|
||||
# Note: a ton of the logic here assumes that the write offset is never moved back, that is, that you never overwrite things
|
||||
# Doing so may break everything
|
||||
class WriteStream(SupportsBytes):
|
||||
"""Allows simple sequential writing to bytes."""
|
||||
_data: bytearray
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._data = bytearray()
|
||||
self._write_offset = 0
|
||||
self._was_cast_to_bytes = False
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
if self._was_cast_to_bytes:
|
||||
raise RuntimeError("WriteStream can only be cast to bytes once")
|
||||
self._was_cast_to_bytes = True
|
||||
return bytes(self._data)
|
||||
|
||||
@overload
|
||||
def write(self, arg: ByteString) -> None:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def write(self, arg: _Struct) -> None:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def write(self, arg: c_bit) -> None:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def write(self, arg: Serializable) -> None:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def write(self, arg: AnyStr, allocated_length: int=None, length_type: Type[UnsignedIntStruct]=None) -> None:
|
||||
pass
|
||||
|
||||
def write(self, arg, allocated_length=None, length_type=None):
|
||||
"""
|
||||
Write a value to the bitstream.
|
||||
allocated_length is for fixed-length strings.
|
||||
length_type is for variable-length strings.
|
||||
"""
|
||||
if isinstance(arg, c_bit):
|
||||
self._write_bit(arg.value)
|
||||
return
|
||||
if isinstance(arg, Serializable):
|
||||
arg.serialize(self)
|
||||
return
|
||||
if allocated_length is not None or length_type is not None:
|
||||
self._write_str(arg, allocated_length, length_type)
|
||||
return
|
||||
if isinstance(arg, (bytes, bytearray)):
|
||||
self._write_bytes(arg)
|
||||
return
|
||||
|
||||
raise TypeError(arg)
|
||||
|
||||
def _write_str(self, str_: AnyStr, allocated_length: int=None, length_type: Type[UnsignedIntStruct]=None) -> None:
|
||||
# possibly include default encoded length for non-variable-length strings (seems to be 33)
|
||||
if isinstance(str_, str):
|
||||
encoded_str = str_.encode("utf-16-le")
|
||||
else:
|
||||
encoded_str = str_
|
||||
|
||||
if length_type is not None:
|
||||
# Variable-length string
|
||||
self.write(length_type(len(str_))) # note: there's also a version that uses the length of the encoded string, should that be used?
|
||||
elif allocated_length is not None:
|
||||
# Fixed-length string
|
||||
# null terminator
|
||||
if isinstance(str_, str):
|
||||
char_size = 2
|
||||
else:
|
||||
char_size = 1
|
||||
|
||||
if len(str_)+1 > allocated_length:
|
||||
raise ValueError("String too long!")
|
||||
encoded_str += bytes(allocated_length*char_size-len(encoded_str))
|
||||
self._write_bytes(encoded_str)
|
||||
|
||||
def _write_bit(self, bit: bool) -> None:
|
||||
self._alloc_bits(1)
|
||||
if bit: # we don't actually have to do anything if the bit is 0
|
||||
self._data[self._write_offset//8] |= 0x80 >> self._write_offset % 8
|
||||
|
||||
self._write_offset += 1
|
||||
|
||||
def write_bits(self, value: int, number_of_bits: int) -> None:
|
||||
assert 0 < number_of_bits < 8
|
||||
self._alloc_bits(number_of_bits)
|
||||
|
||||
if number_of_bits < 8: # In the case of a partial byte, the bits are aligned from the right (bit 0) rather than the left (as in the normal internal representation)
|
||||
value = value << (8 - number_of_bits) & 0xff # Shift left to get the bits on the left, as in our internal representation
|
||||
if self._write_offset % 8 == 0:
|
||||
self._data[self._write_offset//8] = value
|
||||
else:
|
||||
self._data[self._write_offset//8] |= value >> self._write_offset % 8 # First half
|
||||
if 8 - self._write_offset % 8 < number_of_bits: # If we didn't write it all out in the first half (8 - self._write_offset % 8 is the number we wrote in the first half)
|
||||
self._data[self._write_offset//8 + 1] = (value << 8 - self._write_offset % 8) & 0xff # Second half (overlaps byte boundary)
|
||||
|
||||
self._write_offset += number_of_bits
|
||||
|
||||
def _write_bytes(self, byte_arg: bytes) -> None:
|
||||
if self._write_offset % 8 == 0:
|
||||
self._data[self._write_offset//8:self._write_offset//8+len(byte_arg)] = byte_arg
|
||||
else:
|
||||
# shift new input to current shift
|
||||
new = (int.from_bytes(byte_arg, "big") << (8 - self._write_offset % 8)).to_bytes(len(byte_arg)+1, "big")
|
||||
# update current byte
|
||||
self._data[self._write_offset//8] |= new[0]
|
||||
# add rest
|
||||
self._data[self._write_offset//8+1:self._write_offset//8+1+len(byte_arg)] = new[1:]
|
||||
self._write_offset += len(byte_arg)*8
|
||||
|
||||
@overload
|
||||
def write_compressed(self, byte_arg: UnsignedIntStruct) -> None:
|
||||
pass
|
||||
|
||||
@overload
|
||||
def write_compressed(self, byte_arg: bytes) -> None:
|
||||
pass
|
||||
|
||||
def write_compressed(self, byte_arg) -> None:
|
||||
current_byte = len(byte_arg) - 1
|
||||
|
||||
# Write upper bytes with a single 1
|
||||
# From high byte to low byte, if high byte is 0 then write 1. Otherwise write 0 and the remaining bytes
|
||||
while current_byte > 0:
|
||||
is_zero = byte_arg[current_byte] == 0
|
||||
self._write_bit(is_zero)
|
||||
if not is_zero:
|
||||
# Write the remainder of the data
|
||||
self._write_bytes(byte_arg[:current_byte + 1])
|
||||
return
|
||||
current_byte -= 1
|
||||
|
||||
# If the upper half of the last byte is 0 then write 1 and the remaining 4 bits. Otherwise write 0 and the 8 bits.
|
||||
|
||||
is_zero = byte_arg[0] & 0xF0 == 0x00
|
||||
self._write_bit(is_zero)
|
||||
if is_zero:
|
||||
self.write_bits(byte_arg[0], 4)
|
||||
else:
|
||||
self._write_bytes(byte_arg[:1])
|
||||
|
||||
def align_write(self) -> None:
|
||||
"""Align the write offset to the byte boundary."""
|
||||
if self._write_offset % 8 != 0:
|
||||
self._alloc_bits(8 - self._write_offset % 8)
|
||||
self._write_offset += 8 - self._write_offset % 8
|
||||
|
||||
def _alloc_bits(self, number_of_bits: int) -> None:
|
||||
bytes_to_allocate: int = math.ceil((self._write_offset + number_of_bits) / 8) - len(self._data)
|
||||
if bytes_to_allocate > 0:
|
||||
self._data += bytes(bytes_to_allocate)
|
@ -1,34 +0,0 @@
|
||||
import argparse
|
||||
import os.path
|
||||
import zlib
|
||||
|
||||
|
||||
def decompress(data):
|
||||
assert data[:5] == b"sd0\x01\xff"
|
||||
pos = 5
|
||||
out = b""
|
||||
while pos < len(data):
|
||||
length = int.from_bytes(data[pos:pos+4], "little")
|
||||
pos += 4
|
||||
out += zlib.decompress(data[pos:pos+length])
|
||||
pos += length
|
||||
return out
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("in_path")
|
||||
parser.add_argument(
|
||||
"--out_path", help="If not provided, output file is in the script directory")
|
||||
args = parser.parse_args()
|
||||
if args.out_path is None:
|
||||
filename, ext = os.path.splitext(os.path.basename(args.in_path))
|
||||
args.out_path = filename+"_decompressed"+ext
|
||||
|
||||
with open(args.in_path, "rb") as file:
|
||||
data = file.read()
|
||||
|
||||
with open(args.out_path, "wb") as file:
|
||||
file.write(decompress(data))
|
||||
|
||||
print("Decompressed file:", args.out_path)
|
@ -1,217 +0,0 @@
|
||||
"""Module for converting a FDB database to a SQLite database"""
|
||||
import argparse
|
||||
import os
|
||||
import sqlite3
|
||||
import struct
|
||||
from collections import OrderedDict
|
||||
|
||||
# There seems to be no difference between 4 and 8, but just in case there is I'm keeping that type info
|
||||
SQLITE_TYPE = {}
|
||||
SQLITE_TYPE[0] = "none"
|
||||
SQLITE_TYPE[1] = "int32"
|
||||
SQLITE_TYPE[3] = "real"
|
||||
SQLITE_TYPE[4] = "text_4"
|
||||
SQLITE_TYPE[5] = "int_bool"
|
||||
SQLITE_TYPE[6] = "int64"
|
||||
SQLITE_TYPE[8] = "text_8"
|
||||
|
||||
|
||||
def pointer_scope(func):
|
||||
"""The FDB format has a lot of pointers to structures, so this decorator automatically reads the pointer, seeks to the pointer position, calls the function, and seeks back."""
|
||||
|
||||
def wrapper(self, *args, **kwargs):
|
||||
pointer = kwargs.get("pointer")
|
||||
|
||||
if pointer == None:
|
||||
pointer = self._read_int32()
|
||||
else:
|
||||
del kwargs["pointer"]
|
||||
|
||||
if pointer == -1:
|
||||
return
|
||||
|
||||
current_pos = self.fdb.tell()
|
||||
self.fdb.seek(pointer)
|
||||
|
||||
result = func(self, *args, **kwargs)
|
||||
|
||||
self.fdb.seek(current_pos)
|
||||
return result
|
||||
return wrapper
|
||||
|
||||
# I'm using a class for this to save things like the fdb and the sqlite without using globals
|
||||
|
||||
|
||||
class convert:
|
||||
def __init__(self, in_file, out_file=None, add_link_info=False):
|
||||
self.add_link_info = add_link_info
|
||||
if out_file == None:
|
||||
out_file = os.path.splitext(os.path.basename(in_file))[
|
||||
0] + ".sqlite"
|
||||
|
||||
if os.path.exists(out_file):
|
||||
os.remove(out_file)
|
||||
|
||||
self.fdb = open(in_file, "rb")
|
||||
self.sqlite = sqlite3.connect(out_file)
|
||||
|
||||
self._read()
|
||||
print("-"*79)
|
||||
print("Finished converting database!")
|
||||
print("-"*79)
|
||||
|
||||
self.sqlite.commit()
|
||||
self.sqlite.close()
|
||||
self.fdb.close()
|
||||
|
||||
def _read(self):
|
||||
number_of_tables = self._read_int32()
|
||||
self._read_tables(number_of_tables)
|
||||
|
||||
@pointer_scope
|
||||
def _read_tables(self, number_of_tables):
|
||||
for table_struct_index in range(number_of_tables):
|
||||
table_name, number_of_columns = self._read_column_header()
|
||||
print("[%2i%%] Reading table %s" %
|
||||
(table_struct_index*100//number_of_tables, table_name))
|
||||
self._read_row_header(table_name, number_of_columns)
|
||||
|
||||
@pointer_scope
|
||||
def _read_column_header(self):
|
||||
number_of_columns = self._read_int32()
|
||||
table_name = self._read_string()
|
||||
columns = self._read_columns(number_of_columns)
|
||||
|
||||
sql = "create table if not exists '%s' (%s)" % \
|
||||
(table_name, ", ".join(
|
||||
["'%s' %s" % (col, SQLITE_TYPE[columns[col]]) for col in columns]))
|
||||
|
||||
self.sqlite.execute(sql)
|
||||
return table_name, len(columns)
|
||||
|
||||
@pointer_scope
|
||||
def _read_columns(self, number_of_columns):
|
||||
columns = OrderedDict()
|
||||
|
||||
for _ in range(number_of_columns):
|
||||
data_type = self._read_int32()
|
||||
name = self._read_string()
|
||||
columns[name] = data_type
|
||||
|
||||
if self.add_link_info:
|
||||
columns["_linked_from"] = 1
|
||||
columns["_does_link"] = 5
|
||||
columns["_invalid"] = 5
|
||||
|
||||
return columns
|
||||
|
||||
@pointer_scope
|
||||
def _read_row_header(self, table_name, number_of_columns):
|
||||
number_of_allocated_rows = self._read_int32()
|
||||
if number_of_allocated_rows != 0:
|
||||
# assert power of 2 allocation size
|
||||
assert number_of_allocated_rows & (
|
||||
number_of_allocated_rows - 1) == 0
|
||||
|
||||
self.sqlite.executemany("insert into '%s' values (%s)" % (table_name, ", ".join(
|
||||
["?"] * number_of_columns)), self._read_rows(number_of_allocated_rows, number_of_columns))
|
||||
|
||||
@pointer_scope
|
||||
def _read_rows(self, number_of_allocated_rows, number_of_columns):
|
||||
rowid = 0
|
||||
percent_read = -1 # -1 so 0% is displayed as new
|
||||
for row in range(number_of_allocated_rows):
|
||||
new_percent_read = row*100//number_of_allocated_rows
|
||||
if new_percent_read > percent_read:
|
||||
percent_read = new_percent_read
|
||||
print("[%2i%%] Reading rows" % percent_read, end="\r")
|
||||
|
||||
row_pointer = self._read_int32()
|
||||
if row_pointer == -1:
|
||||
if self.add_link_info:
|
||||
# invalid row
|
||||
yield (None,) * (number_of_columns-1) + (True,)
|
||||
rowid += 1
|
||||
else:
|
||||
linked_rows, rowid = self._read_row(rowid, pointer=row_pointer)
|
||||
for values in linked_rows:
|
||||
yield values
|
||||
|
||||
@pointer_scope
|
||||
def _read_row(self, rowid):
|
||||
rows = []
|
||||
linked_from = None
|
||||
while True:
|
||||
row_values = self._read_row_info()
|
||||
linked = self._read_int32()
|
||||
if self.add_link_info:
|
||||
row_values.append(linked_from)
|
||||
row_values.append(linked != -1)
|
||||
row_values.append(False) # valid row
|
||||
rows.append(row_values)
|
||||
|
||||
rowid += 1
|
||||
|
||||
if linked == -1:
|
||||
break
|
||||
|
||||
self.fdb.seek(linked)
|
||||
linked_from = rowid
|
||||
|
||||
return rows, rowid
|
||||
|
||||
@pointer_scope
|
||||
def _read_row_info(self):
|
||||
number_of_columns = self._read_int32()
|
||||
return self._read_row_values(number_of_columns)
|
||||
|
||||
@pointer_scope
|
||||
def _read_row_values(self, number_of_columns):
|
||||
values = []
|
||||
|
||||
for _ in range(number_of_columns):
|
||||
data_type = self._read_int32()
|
||||
if data_type == 0:
|
||||
assert self.fdb.read(4) == b"\0\0\0\0"
|
||||
value = None
|
||||
elif data_type == 1:
|
||||
value = self._read_int32()
|
||||
elif data_type == 3:
|
||||
value = struct.unpack("f", self.fdb.read(4))[0]
|
||||
elif data_type in (4, 8):
|
||||
value = self._read_string()
|
||||
elif data_type == 5:
|
||||
value = struct.unpack("?xxx", self.fdb.read(4))[0]
|
||||
elif data_type == 6:
|
||||
value = self._read_int64()
|
||||
else:
|
||||
raise NotImplementedError(data_type)
|
||||
|
||||
values.append(value)
|
||||
return values
|
||||
|
||||
def _read_int32(self):
|
||||
return struct.unpack("i", self.fdb.read(4))[0]
|
||||
|
||||
@pointer_scope
|
||||
def _read_string(self):
|
||||
str_bytes = bytearray()
|
||||
while True:
|
||||
byte = self.fdb.read(1)
|
||||
if byte == b"\0":
|
||||
break
|
||||
str_bytes += byte
|
||||
return str_bytes.decode("latin1")
|
||||
|
||||
@pointer_scope
|
||||
def _read_int64(self):
|
||||
return struct.unpack("q", self.fdb.read(8))[0]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("fdb_path")
|
||||
parser.add_argument("--sqlite_path")
|
||||
parser.add_argument("--add_link_info", action="store_true")
|
||||
args = parser.parse_args()
|
||||
convert(args.fdb_path, args.sqlite_path, args.add_link_info)
|
@ -1,142 +0,0 @@
|
||||
import hashlib
|
||||
import os
|
||||
import struct
|
||||
import argparse
|
||||
import hashlib
|
||||
|
||||
import decompress_sd0
|
||||
from bitstream import c_bool, c_int, c_ubyte, c_uint, ReadStream
|
||||
|
||||
args = {}
|
||||
|
||||
|
||||
class PKExtractor:
|
||||
def load(self, path: str, output: str) -> None:
|
||||
self.records = {}
|
||||
|
||||
filenames = {}
|
||||
|
||||
for filename in ("trunk.txt", "hotfix.txt"):
|
||||
filenames.update(self._load_filehashes(
|
||||
os.path.join(path, "versions", filename)))
|
||||
print("Loaded hashes")
|
||||
pks = []
|
||||
for dir, _, files in os.walk(os.path.join(path, "client/res/pack")):
|
||||
for file in files:
|
||||
if file.endswith(".pk"):
|
||||
pks.append(os.path.join(dir, file))
|
||||
|
||||
for pk in pks:
|
||||
self._load_pk(pk, filenames)
|
||||
|
||||
for filename in sorted(self.records.keys()):
|
||||
print(filename)
|
||||
self._save_path(output, filename)
|
||||
|
||||
def _load_filehashes(self, path: str):
|
||||
filenames = {}
|
||||
with open(path) as file:
|
||||
for line in file.read().splitlines()[3:]:
|
||||
values = line.split(",")
|
||||
filenames[values[2]] = values[0]
|
||||
return filenames
|
||||
|
||||
def _load_pki(self, path: str):
|
||||
# unused, alternate way to get the list of pks
|
||||
with open(path, "rb") as file:
|
||||
stream = ReadStream(file.read())
|
||||
|
||||
assert stream.read(c_uint) == 3
|
||||
pack_files = []
|
||||
for _ in range(stream.read(c_uint)):
|
||||
pack_files.append(stream.read(
|
||||
bytes, length_type=c_uint).decode("latin1"))
|
||||
|
||||
for _ in range(stream.read(c_uint)):
|
||||
stream.skip_read(20)
|
||||
|
||||
assert stream.all_read()
|
||||
return pack_files
|
||||
|
||||
def _load_pk(self, path: str, filenames) -> None:
|
||||
with open(path, "rb") as file:
|
||||
assert file.read(7) == b"ndpk\x01\xff\x00"
|
||||
file.seek(-8, 2)
|
||||
number_of_records_address = struct.unpack("I", file.read(4))[0]
|
||||
unknown = struct.unpack("I", file.read(4))[0]
|
||||
if unknown != 0:
|
||||
print(unknown, path)
|
||||
file.seek(number_of_records_address)
|
||||
data = ReadStream(file.read()[:-8])
|
||||
|
||||
number_of_records = data.read(c_uint)
|
||||
for _ in range(number_of_records):
|
||||
pk_index = data.read(c_uint)
|
||||
unknown1 = data.read(c_int)
|
||||
unknown2 = data.read(c_int)
|
||||
original_size = data.read(c_uint)
|
||||
original_md5 = data.read(bytes, length=32).decode()
|
||||
unknown3 = data.read(c_uint)
|
||||
compressed_size = data.read(c_uint)
|
||||
compressed_md5 = data.read(bytes, length=32).decode()
|
||||
unknown4 = data.read(c_uint)
|
||||
data_position = data.read(c_uint)
|
||||
is_compressed = data.read(c_bool)
|
||||
unknown5 = data.read(c_ubyte)
|
||||
unknown6 = data.read(c_ubyte)
|
||||
unknown7 = data.read(c_ubyte)
|
||||
if original_md5 not in filenames:
|
||||
filenames[original_md5] = "unlisted/"+original_md5
|
||||
self.records[filenames[original_md5]
|
||||
] = path, data_position, is_compressed, original_size, original_md5, compressed_size, compressed_md5
|
||||
|
||||
def extract_data(self, path: str) -> bytes:
|
||||
pk_path, data_position, is_compressed, original_size, original_md5, compressed_size, compressed_md5 = self.records[
|
||||
path]
|
||||
|
||||
with open(pk_path, "rb") as file:
|
||||
file.seek(data_position)
|
||||
if is_compressed:
|
||||
data = file.read(compressed_size)
|
||||
else:
|
||||
data = file.read(original_size)
|
||||
assert file.read(5) == b"\xff\x00\x00\xdd\x00"
|
||||
|
||||
if is_compressed:
|
||||
assert hashlib.md5(data).hexdigest() == compressed_md5
|
||||
data = decompress_sd0.decompress(data)
|
||||
|
||||
assert hashlib.md5(data).hexdigest() == original_md5
|
||||
return data
|
||||
|
||||
def _save_path(self, outdir: str, path: str) -> None:
|
||||
original_md5 = self.records[path][4]
|
||||
|
||||
dir, filename = os.path.split(path)
|
||||
out = os.path.join(outdir, dir.lower())
|
||||
os.makedirs(out, exist_ok=True)
|
||||
out_file_path = os.path.join(out, filename.lower())
|
||||
|
||||
if os.path.isfile(out_file_path):
|
||||
with open(out_file_path, "rb") as f:
|
||||
file_hash = hashlib.md5()
|
||||
while chunk := f.read(8192):
|
||||
file_hash.update(chunk)
|
||||
|
||||
if file_hash.hexdigest() == original_md5:
|
||||
print("File %s already exists with correct md5 %s" %
|
||||
(path, original_md5))
|
||||
return
|
||||
|
||||
data = self.extract_data(path)
|
||||
with open(out_file_path, "wb") as file:
|
||||
file.write(data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("client_path")
|
||||
parser.add_argument("output_path")
|
||||
args = parser.parse_args()
|
||||
app = PKExtractor()
|
||||
app.load(args.client_path, args.output_path)
|
@ -7,7 +7,7 @@ WORKDIR /setup
|
||||
# copy needed files from repo
|
||||
COPY resources/ resources/
|
||||
COPY migrations/cdserver/ migrations/cdserver
|
||||
ADD docker/*.py utils/
|
||||
ADD thirdparty/docker-utils/utils/*.py utils/
|
||||
|
||||
COPY docker/setup.sh /setup.sh
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user