Skip to content

Commit

Permalink
connection socket methods in connection (#454)
Browse files Browse the repository at this point in the history
* connecto socket methods in connection

* fix and docstring

* fix

* Fix

* fix

* ensure connectino get's released

* no coveralls

* fix
  • Loading branch information
sonic182 authored Jan 21, 2024
1 parent 3a65485 commit 992fe6b
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 27 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ jobs:
COV_CORE_CONFIG: .coveragerc
COV_CORE_DATAFILE: .coverage.eager

- name: Coveralls
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
run: |
coveralls --service=github
# - name: Coveralls
# env:
# GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# run: |
# coveralls --service=github
37 changes: 20 additions & 17 deletions aiosonic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,20 @@ async def json(self, json_decoder=loads) -> dict:

async def read_chunks(self) -> AsyncIterator[bytes]:
"""Read chunks from chunked response."""
while True and not self.chunks_readed:
chunk_size = int((await self.connection.reader.readline()).rstrip(), 16)
if not chunk_size:
# read last CRLF
await self.connection.reader.readline()
# free connection
await self.connection.release()
break
chunk = await self.connection.reader.readexactly(chunk_size + 2)
yield chunk[:-2]
self.chunks_readed = True
assert self.connection
try:
while True and not self.chunks_readed:
chunk_size = int((await self.connection.readline()).rstrip(), 16)
if not chunk_size:
# read last CRLF
await self.connection.readline()
break
chunk = await self.connection.readexactly(chunk_size + 2)
yield chunk[:-2]
self.chunks_readed = True
finally:
# Ensure the conn get's released
await self.connection.release()

def __del__(self):
# clean it
Expand Down Expand Up @@ -328,7 +331,7 @@ def _handle_chunk(chunk: bytes, connection: Connection):
if not connection.writer:
raise MissingWriterException("missing writer in connection")

connection.writer.write(chunk_size.encode() + chunk + _NEW_LINE.encode())
connection.write(chunk_size.encode() + chunk + _NEW_LINE.encode())


async def _send_chunks(connection: Connection, body: BodyType):
Expand All @@ -344,7 +347,7 @@ async def _send_chunks(connection: Connection, body: BodyType):

if not connection.writer:
raise MissingWriterException("missing writer in connection")
connection.writer.write(("0" + _NEW_LINE * 2).encode())
connection.write(("0" + _NEW_LINE * 2).encode())


async def _send_multipart(
Expand Down Expand Up @@ -428,20 +431,20 @@ async def _do_request(
if not connection.writer or not connection.reader:
raise ConnectionError("Not connection writer or reader")

connection.writer.write(to_send)
connection.write(to_send)

if body:
if isinstance(body, (AsyncIterator, Iterator)):
await _send_chunks(connection, body)
else:
connection.writer.write(body)
connection.write(body)

response = HttpResponse()
response._set_request_meta(urlparsed)

# get response code and version
try:
line = await wait_for(connection.reader.readuntil(), timeouts.sock_read)
line = await wait_for(connection.readuntil(), timeouts.sock_read)
if not line:
raise HttpParsingError(f"response line parsing error: {line}")
response._set_response_initial(line)
Expand All @@ -463,7 +466,7 @@ async def _do_request(
response.compressed = response.headers.get("content-encoding", "")

if size:
response._set_body(await connection.reader.readexactly(int(size)))
response._set_body(await connection.readexactly(int(size)))

if chunked:
connection.block_until_read_chunks()
Expand Down
55 changes: 52 additions & 3 deletions aiosonic/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,40 @@
import h2.events

from aiosonic.connectors import TCPConnector

from aiosonic.exceptions import HttpParsingError
from aiosonic.exceptions import (
HttpParsingError,
MissingReaderException,
MissingWriterException,
)
from aiosonic.http2 import Http2Handler
from aiosonic.tcp_helpers import keepalive_flags
from aiosonic.types import ParsedBodyType


class Connection:
"""Connection class."""
"""Connection class.
This class represents a connection to a remote server, managing the communication
through a socket. It is designed to handle both HTTP/1.1 and HTTP/2 protocols.
Attributes:
connector (TCPConnector): An instance of the TCPConnector class responsible
for managing the connection pool.
reader (Optional[StreamReader]): A StreamReader for reading data from the socket.
writer (Optional[StreamWriter]): A StreamWriter for efficiently writing data
to the socket.
keep (bool): A flag indicating whether the connection should be kept alive.
key (Optional[str]): A key identifying the connection based on the hostname and port.
blocked (bool): A flag indicating whether the connection is currently blocked,
meaning it is in use and should not be reinserted into the pool until all
data has been read.
temp_key (Optional[str]): A temporary key used during the connection setup process.
requests_count (int): The count of requests made over the connection.
h2conn (Optional[h2.connection.H2Connection]): An instance of the H2Connection
class representing the HTTP/2 connection.
h2handler (Optional[Http2Handler]): An instance of the Http2Handler class
responsible for handling HTTP/2 requests.
"""

def __init__(self, connector: TCPConnector) -> None:
self.connector = connector
Expand All @@ -45,6 +70,30 @@ async def connect(
"""Connet with timeout."""
await self._connect(urlparsed, verify, ssl_context, dns_info, http2)

def write(self, data: bytes):
"""Write data in the socket."""
if not self.writer:
raise MissingWriterException("writer not set.")
self.writer.write(data)

async def readline(self):
"""Read data until line break"""
if not self.reader:
raise MissingReaderException("reader not set.")
return await self.reader.readline()

async def readexactly(self, size: int):
"""Read exactly size of bytes"""
if not self.reader:
raise MissingReaderException("reader not set.")
return await self.reader.readexactly(size)

async def readuntil(self, separator: bytes = b'\n'):
"""Read until separator"""
if not self.reader:
raise MissingReaderException("reader not set.")
return await self.reader.readuntil(separator)

async def _connect(
self,
urlparsed: ParseResult,
Expand Down
3 changes: 3 additions & 0 deletions aiosonic/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
class MissingWriterException(Exception):
pass

class MissingReaderException(Exception):
pass


# timeouts
class BaseTimeout(Exception):
Expand Down
4 changes: 3 additions & 1 deletion aiosonic/http_parser.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Pure python HTTP parser."""

from typing import TYPE_CHECKING, AsyncIterator, Dict, Iterator, List
from urllib.parse import ParseResult, urlencode, urlparse

Expand Down Expand Up @@ -26,7 +28,7 @@ async def parse_headers_iterator(connection: Connection):
"""Transform loop to iterator."""
while True:
# StreamReader already buffers data reading so it is efficient.
res_data = await connection.reader.readline()
res_data = await connection.readline()
if b": " not in res_data and b":" not in res_data:
break
yield res_data
Expand Down
2 changes: 1 addition & 1 deletion test-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ aiohttp>=3.9.0b1
# (see https://github.com/jazzband/pip-tools/issues/1326)
asgiref<3.5.0
black
coveralls
# coveralls
django<4.0.0
click<8.1.0
httpx
Expand Down

0 comments on commit 992fe6b

Please sign in to comment.