-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
implement packet classes for encoding and decoding
- Loading branch information
Showing
10 changed files
with
298 additions
and
136 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from mcclient.packet.inbound import InboundPacket, IncompletePacket | ||
from mcclient.packet.outbound import OutboundPacket |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
import socket | ||
import struct | ||
|
||
from mcclient.packet.varint import read, pack, unpack | ||
|
||
|
||
class IncompletePacket(Exception): | ||
def __init__(self, size, missing): | ||
super().__init__(f"Incomplete packet: missing {missing} from {size} bytes.") | ||
|
||
|
||
class InboundPacket: | ||
""" | ||
Represents an inbound packet received from a socket. | ||
""" | ||
|
||
def __init__(self, sock: socket.socket): | ||
""" | ||
Initializes the InboundPacket by receiving data from a socket. | ||
Args: | ||
sock (socket.socket): The socket from which the packet is received. | ||
""" | ||
|
||
self.sock = sock | ||
self.length = read(self.sock) | ||
self.id = read(self.sock) | ||
id_size = len(pack(self.id)) | ||
|
||
remaining = self.length - id_size | ||
self.data = b"" | ||
while len(self.data) < remaining: | ||
chunk = self.sock.recv(remaining - len(self.data)) | ||
if chunk == b"": | ||
packet_size = len(pack(self.length)) + id_size + self.length | ||
raise IncompletePacket(packet_size, self.length - id_size - len(self.data)) | ||
|
||
self.data += chunk | ||
|
||
def read_int(self) -> int: | ||
""" | ||
Reads a 4-byte integer from the data. | ||
Returns: | ||
int: The read integer value. | ||
""" | ||
return struct.unpack('>i', self.read_bytes(4))[0] | ||
|
||
def read_short(self) -> int: | ||
""" | ||
Reads a 2-byte short integer from the data. | ||
Returns: | ||
int: The read short integer value. | ||
""" | ||
return struct.unpack('>h', self.read_bytes(2))[0] | ||
|
||
def read_ushort(self) -> int: | ||
""" | ||
Reads a 2-byte unsigned short integer from the data. | ||
Returns: | ||
int: The read unsigned short integer value. | ||
""" | ||
return struct.unpack('>H', self.read_bytes(2))[0] | ||
|
||
def read_long(self) -> int: | ||
""" | ||
Reads an 8-byte long integer from the data. | ||
Returns: | ||
int: The read long integer value. | ||
""" | ||
return struct.unpack('>q', self.read_bytes(8))[0] | ||
|
||
def read_varint(self) -> int: | ||
""" | ||
Reads a variable-length integer (Varint) from the data. | ||
Returns: | ||
int: The read Varint value. | ||
""" | ||
varint, size = unpack(self.data) | ||
self.read_bytes(size) | ||
return varint | ||
|
||
def read_bool(self) -> bool: | ||
""" | ||
Reads a boolean value from the data. | ||
Returns: | ||
bool: The read boolean value. | ||
""" | ||
value = self.data[0] == 1 | ||
self.data = self.data[1:] | ||
return value | ||
|
||
def read_string(self) -> str: | ||
""" | ||
Reads a string from the data. | ||
Returns: | ||
str: The read string. | ||
""" | ||
length = self.read_varint() | ||
string = self.data[:length].decode('utf-8') | ||
self.data = self.data[length:] | ||
return string | ||
|
||
def read_bytes(self, length: int) -> bytes: | ||
""" | ||
Reads a specified number of bytes from the data. | ||
Args: | ||
length (int): The number of bytes to read. | ||
Returns: | ||
bytes: The read bytes. | ||
""" | ||
bytes_data = self.data[:length] | ||
self.data = self.data[length:] | ||
return bytes_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import socket | ||
import struct | ||
|
||
from mcclient.packet.varint import pack | ||
|
||
|
||
class OutboundPacket: | ||
""" | ||
Represents an outbound packet that can be sent over a socket. | ||
""" | ||
|
||
def __init__(self, packet_id: int): | ||
""" | ||
Initializes the OutboundPacket with the specified packet ID. | ||
Args: | ||
packet_id (int): The packet ID representing the type of packet. | ||
""" | ||
self.id = packet_id | ||
self.data = b"" | ||
|
||
def write_int(self, value: int) -> None: | ||
""" | ||
Writes a 4-byte integer to the packet. | ||
Args: | ||
value (int): The integer value to write. | ||
""" | ||
self.data += struct.pack('>i', value) | ||
|
||
def write_short(self, value: int) -> None: | ||
""" | ||
Writes a 2-byte short integer to the packet. | ||
Args: | ||
value (int): The short integer value to write. | ||
""" | ||
self.data += struct.pack('>h', value) | ||
|
||
def write_ushort(self, value: int) -> None: | ||
""" | ||
Writes a 2-byte unsigned short integer to the packet. | ||
Args: | ||
value (int): The unsigned short integer value to write. | ||
""" | ||
self.data += struct.pack('>H', value) | ||
|
||
def write_long(self, value: int) -> None: | ||
""" | ||
Writes an 8-byte long integer to the packet. | ||
Args: | ||
value (int): The long integer value to write. | ||
""" | ||
self.data += struct.pack('>q', value) | ||
|
||
def write_varint(self, varint: int) -> None: | ||
""" | ||
Writes a variable-length integer (VarInt) to the packet. | ||
Args: | ||
varint (int): The VarInt value to write. | ||
""" | ||
self.data += pack(varint) | ||
|
||
def write_bool(self, value: bool) -> None: | ||
""" | ||
Writes a boolean value to the packet as a single byte. | ||
Args: | ||
value (bool): The boolean value to write. True is represented | ||
as 0x01, and False as 0x00. | ||
""" | ||
self.data += b"\x01" if value else b"\x00" | ||
|
||
def write_string(self, string: str) -> None: | ||
""" | ||
Writes a UTF-8 encoded string to the packet. | ||
Args: | ||
string (str): The string to write. | ||
""" | ||
self.write_varint(len(string)) | ||
self.data += string.encode('utf-8') | ||
|
||
def write_bytes(self, data: bytes) -> None: | ||
""" | ||
Writes raw bytes directly to the packet. | ||
Args: | ||
data (bytes): The bytes to write. | ||
""" | ||
self.data += data | ||
|
||
def pack(self) -> bytes: | ||
""" | ||
Packs the packet data, including the packet ID and length, into a | ||
single byte sequence. | ||
Returns: | ||
bytes: The packed packet ready for transmission. | ||
""" | ||
data = pack(self.id) + self.data | ||
return pack(len(data)) + data | ||
|
||
def write(self, sock: socket.socket) -> None: | ||
""" | ||
Sends the packet over the specified socket. | ||
This method packs the packet, including the packet ID and length, | ||
and then sends it to the remote socket. | ||
Args: | ||
sock (socket.socket): The socket to send the packet over. | ||
""" | ||
sock.send(self.pack()) |
Oops, something went wrong.