Skip to content

Commit

Permalink
fix: fix PrintBuffer logic
Browse files Browse the repository at this point in the history
Signed-off-by: Artem Inzhyyants <artem.inzhyyants@gmail.com>
  • Loading branch information
artem1205 committed Feb 13, 2025
1 parent 522caab commit 7f8e0ac
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
12 changes: 6 additions & 6 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

from airbyte_cdk.connector import TConfig
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.logger import init_logger
from airbyte_cdk.logger import PRINT_BUFFER, init_logger
from airbyte_cdk.models import (
AirbyteConnectionStatus,
AirbyteMessage,
Expand Down Expand Up @@ -337,11 +337,11 @@ def launch(source: Source, args: List[str]) -> None:
parsed_args = source_entrypoint.parse_args(args)
# temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs
# Refer to: https://github.com/airbytehq/oncall/issues/6235
# with PrintBuffer():
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="", flush=True)
with PRINT_BUFFER:
for message in source_entrypoint.run(parsed_args):
# simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and
# the other for the break line. Adding `\n` to the message ensure that both are printed at the same time
print(f"{message}\n", end="")


def _init_internal_request_filter() -> None:
Expand Down
5 changes: 4 additions & 1 deletion airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
Level,
Type,
)
from airbyte_cdk.utils import PrintBuffer
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets

PRINT_BUFFER = PrintBuffer(flush_interval=0.1)

LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
Expand All @@ -27,7 +30,7 @@
"handlers": {
"console": {
"class": "logging.StreamHandler",
"stream": "ext://sys.stdout",
"stream": PRINT_BUFFER,
"formatter": "airbyte",
},
},
Expand Down
4 changes: 4 additions & 0 deletions airbyte_cdk/utils/print_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,7 @@ def __exit__(
) -> None:
self.flush()
sys.stdout, sys.stderr = self.old_stdout, self.old_stderr

def flush_logger(self) -> None:
"""Explicit flush that can be triggered by logger to synchronize with PrintBuffer."""
self.flush()

0 comments on commit 7f8e0ac

Please sign in to comment.