diff --git a/airbyte_cdk/entrypoint.py b/airbyte_cdk/entrypoint.py index a5052a575..0a13cfebe 100644 --- a/airbyte_cdk/entrypoint.py +++ b/airbyte_cdk/entrypoint.py @@ -22,7 +22,7 @@ from airbyte_cdk.connector import TConfig from airbyte_cdk.exception_handler import init_uncaught_exception_handler -from airbyte_cdk.logger import init_logger +from airbyte_cdk.logger import PRINT_BUFFER, init_logger from airbyte_cdk.models import ( AirbyteConnectionStatus, AirbyteMessage, @@ -337,11 +337,11 @@ def launch(source: Source, args: List[str]) -> None: parsed_args = source_entrypoint.parse_args(args) # temporarily removes the PrintBuffer because we're seeing weird print behavior for concurrent syncs # Refer to: https://github.com/airbytehq/oncall/issues/6235 - # with PrintBuffer(): - for message in source_entrypoint.run(parsed_args): - # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and - # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time - print(f"{message}\n", end="", flush=True) + with PRINT_BUFFER: + for message in source_entrypoint.run(parsed_args): + # simply printing is creating issues for concurrent CDK as Python uses different two instructions to print: one for the message and + # the other for the break line. Adding `\n` to the message ensure that both are printed at the same time + print(f"{message}\n", end="") def _init_internal_request_filter() -> None: diff --git a/airbyte_cdk/logger.py b/airbyte_cdk/logger.py index 78061b605..13c3b4676 100644 --- a/airbyte_cdk/logger.py +++ b/airbyte_cdk/logger.py @@ -16,8 +16,11 @@ Level, Type, ) +from airbyte_cdk.utils import PrintBuffer from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets +PRINT_BUFFER = PrintBuffer(flush_interval=0.1) + LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, @@ -27,7 +30,7 @@ "handlers": { "console": { "class": "logging.StreamHandler", - "stream": "ext://sys.stdout", + "stream": PRINT_BUFFER, "formatter": "airbyte", }, }, diff --git a/airbyte_cdk/utils/print_buffer.py b/airbyte_cdk/utils/print_buffer.py index ae5a2020c..3a66222dd 100644 --- a/airbyte_cdk/utils/print_buffer.py +++ b/airbyte_cdk/utils/print_buffer.py @@ -73,3 +73,7 @@ def __exit__( ) -> None: self.flush() sys.stdout, sys.stderr = self.old_stdout, self.old_stderr + + def flush_logger(self) -> None: + """Explicit flush that can be triggered by logger to synchronize with PrintBuffer.""" + self.flush()