Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 33 additions & 31 deletions UnityPy/streams/EndianBinaryReader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import sys
from struct import Struct, unpack
import re
from typing import List, Union
from io import BytesIO, BufferedIOBase, IOBase, BufferedReader
from struct import Struct, unpack
from io import IOBase, BufferedReader

import builtins
from typing import Callable, List, Optional, Tuple, Union

reNot0 = re.compile(b"(.*?)\x00", re.S)

Expand Down Expand Up @@ -43,13 +45,13 @@ class EndianBinaryReader:

def __new__(
cls,
item: Union[bytes, bytearray, memoryview, BytesIO, str],
item: Union[bytes, bytearray, memoryview, IOBase, str],
endian: str = ">",
offset: int = 0,
):
if isinstance(item, (bytes, bytearray, memoryview)):
obj = super(EndianBinaryReader, cls).__new__(EndianBinaryReader_Memoryview)
elif isinstance(item, (IOBase, BufferedIOBase)):
elif isinstance(item, IOBase):
obj = super(EndianBinaryReader, cls).__new__(EndianBinaryReader_Streamable)
elif isinstance(item, str):
item = open(item, "rb")
Expand All @@ -75,17 +77,17 @@ def __new__(
obj.__init__(item, endian)
return obj

def __init__(self, item, endian=">", offset=0):
def __init__(self, item, endian: str = ">", offset: int = 0):
self.endian = endian
self.BaseOffset = offset
self.Position = 0

@property
def bytes(self):
def bytes(self) -> builtins.bytes:
# implemented by Streamable and Memoryview versions
return b""

def read(self, *args):
def read(self, *args) -> builtins.bytes:
# implemented by Streamable and Memoryview versions
return b""

Expand All @@ -95,7 +97,7 @@ def read_byte(self) -> int:
def read_u_byte(self) -> int:
return unpack(self.endian + "B", self.read(1))[0]

def read_bytes(self, num) -> bytes:
def read_bytes(self, num) -> builtins.bytes:
return self.read(num)

def read_short(self) -> int:
Expand Down Expand Up @@ -191,54 +193,54 @@ def read_color4(self) -> Color:
self.read_float(), self.read_float(), self.read_float(), self.read_float()
)

def read_byte_array(self) -> bytes:
def read_byte_array(self) -> builtins.bytes:
return self.read(self.read_int())

def read_matrix(self) -> Matrix4x4:
return Matrix4x4(self.read_float_array(16))

def read_array(self, command, length: int) -> list:
def read_array(self, command: Callable, length: int) -> list:
return [command() for _ in range(length)]

def read_array_struct(self, param: str, length: int = None) -> list:
def read_array_struct(self, param: str, length: Optional[int] = None) -> tuple:
if length is None:
length = self.read_int()
struct = Struct(f"{self.endian}{length}{param}")
return struct.unpack(self.read(struct.size))

def read_boolean_array(self, length: int = None) -> List[bool]:
def read_boolean_array(self, length: Optional[int] = None) -> Tuple[bool]:
return self.read_array_struct("?", length)

def read_u_byte_array(self, length: int = None) -> List[int]:
def read_u_byte_array(self, length: Optional[int] = None) -> Tuple[int]:
return self.read_array_struct("B", length)

def read_u_short_array(self, length: int = None) -> List[int]:
def read_u_short_array(self, length: Optional[int] = None) -> Tuple[int]:
return self.read_array_struct("h", length)

def read_short_array(self, length: int = None) -> List[int]:
def read_short_array(self, length: Optional[int] = None) -> Tuple[int]:
return self.read_array_struct("H", length)

def read_int_array(self, length: int = None) -> List[int]:
def read_int_array(self, length: Optional[int] = None) -> Tuple[int]:
return self.read_array_struct("i", length)

def read_u_int_array(self, length: int = None) -> List[int]:
def read_u_int_array(self, length: Optional[int] = None) -> Tuple[int]:
return self.read_array_struct("I", length)

def read_long_array(self, length: int = None) -> List[int]:
def read_long_array(self, length: Optional[int] = None) -> Tuple[int]:
return self.read_array_struct("q", length)

def read_u_long_array(self, length: int = None) -> List[int]:
def read_u_long_array(self, length: Optional[int] = None) -> Tuple[int]:
return self.read_array_struct("Q", length)

def read_u_int_array_array(self, length: int = None) -> List[List[int]]:
def read_u_int_array_array(self, length: Optional[int] = None) -> List[Tuple[int]]:
return self.read_array(
self.read_u_int_array, length if length is not None else self.read_int()
)

def read_float_array(self, length: int = None) -> List[float]:
def read_float_array(self, length: Optional[int] = None) -> Tuple[float]:
return self.read_array_struct("f", length)

def read_double_array(self, length: int = None) -> List[float]:
def read_double_array(self, length: Optional[int] = None) -> Tuple[float]:
return self.read_array_struct("d", length)

def read_string_array(self) -> List[str]:
Expand All @@ -259,7 +261,7 @@ def real_offset(self) -> int:
"""
return self.BaseOffset + self.Position

def read_the_rest(self, obj_start: int, obj_size: int) -> bytes:
def read_the_rest(self, obj_start: int, obj_size: int) -> builtins.bytes:
"""Returns the rest of the current reader bytes."""
return self.read_bytes(obj_size - (self.Position - obj_start))

Expand All @@ -268,7 +270,7 @@ class EndianBinaryReader_Memoryview(EndianBinaryReader):
__slots__ = ("view", "_endian", "BaseOffset", "Position", "Length")
view: memoryview

def __init__(self, view, endian=">", offset=0):
def __init__(self, view, endian: str = ">", offset: int = 0):
self._endian = ""
super().__init__(view, endian=endian, offset=offset)
self.view = memoryview(view)
Expand All @@ -293,20 +295,20 @@ def endian(self, value: str):
self._endian = value

@property
def bytes(self):
def bytes(self) -> memoryview:
return self.view

def dispose(self):
def dispose(self) -> None:
self.view.release()

def read(self, length: int):
def read(self, length: int) -> memoryview:
if not length:
return b""
return memoryview(b"")
ret = self.view[self.Position : self.Position + length]
self.Position += length
return ret

def read_aligned_string(self):
def read_aligned_string(self) -> str:
length = self.read_int()
if 0 < length <= self.Length - self.Position:
string_data = self.read_bytes(length)
Expand All @@ -315,7 +317,7 @@ def read_aligned_string(self):
return result
return ""

def read_string_to_null(self, max_length=32767) -> str:
def read_string_to_null(self, max_length: int = 32767) -> str:
match = reNot0.search(self.view, self.Position, self.Position + max_length)
if not match:
if self.Position + max_length >= self.Length:
Expand Down
35 changes: 20 additions & 15 deletions UnityPy/streams/EndianBinaryWriter.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
import io
from struct import pack
from io import BytesIO, IOBase

import builtins
from typing import Callable, Union

from ..math import Color, Matrix4x4, Quaternion, Vector2, Vector3, Vector4, Rectangle


class EndianBinaryWriter:
endian: str
Length: int
Position: int
stream: io.BufferedReader
stream: IOBase

def __init__(self, input_=b"", endian=">"):
def __init__(
self,
input_: Union[bytes, bytearray, IOBase] = b"",
endian: str = ">"
):
if isinstance(input_, (bytes, bytearray)):
self.stream = io.BytesIO(input_)
self.stream = BytesIO(input_)
self.stream.seek(0, 2)
elif isinstance(input_, io.IOBase):
elif isinstance(input_, IOBase):
self.stream = input_
else:
raise ValueError("Invalid input type - %s." % type(input_))
Expand All @@ -36,7 +42,6 @@ def Length(self) -> int:

def dispose(self):
self.stream.close()
pass

def write(self, *args):
if self.Position != self.stream.tell():
Expand All @@ -51,7 +56,7 @@ def write_byte(self, value: int):
def write_u_byte(self, value: int):
self.write(pack(self.endian + "B", value))

def write_bytes(self, value: bytes):
def write_bytes(self, value: builtins.bytes):
return self.write(value)

def write_short(self, value: int):
Expand Down Expand Up @@ -91,7 +96,7 @@ def write_aligned_string(self, value: str):
self.write(bstring)
self.align_stream(4)

def align_stream(self, alignment=4):
def align_stream(self, alignment:int = 4):
pos = self.stream.tell()
align = (alignment - pos % alignment) % alignment
self.write(b"\0" * align)
Expand Down Expand Up @@ -124,10 +129,10 @@ def write_rectangle_f(self, value: Rectangle):
self.write_float(value.height)

def write_color_uint(self, value: Color):
self.write_u_byte(value.R * 255)
self.write_u_byte(value.G * 255)
self.write_u_byte(value.B * 255)
self.write_u_byte(value.A * 255)
self.write_u_byte(int(value.R * 255))
self.write_u_byte(int(value.G * 255))
self.write_u_byte(int(value.B * 255))
self.write_u_byte(int(value.A * 255))

def write_color4(self, value: Color):
self.write_float(value.R)
Expand All @@ -139,13 +144,13 @@ def write_matrix(self, value: Matrix4x4):
for val in value.M:
self.write_float(val)

def write_array(self, command, value: list, write_length: bool = True):
def write_array(self, command: Callable, value: list, write_length: bool = True):
if write_length:
self.write_int(len(value))
for val in value:
command(val)

def write_byte_array(self, value: bytes):
def write_byte_array(self, value: builtins.bytes):
self.write_int(len(value))
self.write(value)

Expand Down