Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-124096: Enable REPL virtual terminal support on Windows #124119

Merged
merged 13 commits into from
Feb 23, 2025
67 changes: 58 additions & 9 deletions Lib/_pyrepl/windows_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from .console import Event, Console
from .trace import trace
from .utils import wlen
from .windows_eventqueue import EventQueue

try:
from ctypes import GetLastError, WinDLL, windll, WinError # type: ignore[attr-defined]
Expand Down Expand Up @@ -94,7 +95,9 @@ def __init__(self, err: int | None, descr: str | None = None) -> None:
0x83: "f20", # VK_F20
}

# Console escape codes: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
# Virtual terminal output sequences
# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#output-sequences
# Check `windows_eventqueue.py` for input sequences
ERASE_IN_LINE = "\x1b[K"
MOVE_LEFT = "\x1b[{}D"
MOVE_RIGHT = "\x1b[{}C"
Expand All @@ -110,6 +113,12 @@ def __init__(self, err: int | None, descr: str | None = None) -> None:
class _error(Exception):
pass

def _supports_vt():
try:
import nt
return nt._supports_virtual_terminal()
except (ImportError, AttributeError):
return False

class WindowsConsole(Console):
def __init__(
Expand All @@ -121,17 +130,29 @@ def __init__(
):
super().__init__(f_in, f_out, term, encoding)

self.__vt_support = _supports_vt()

if self.__vt_support:
trace('console supports virtual terminal')

# Save original console modes so we can recover on cleanup.
original_input_mode = DWORD()
GetConsoleMode(InHandle, original_input_mode)
trace(f'saved original input mode 0x{original_input_mode.value:x}')
self.__original_input_mode = original_input_mode.value

SetConsoleMode(
OutHandle,
ENABLE_WRAP_AT_EOL_OUTPUT
| ENABLE_PROCESSED_OUTPUT
| ENABLE_VIRTUAL_TERMINAL_PROCESSING,
)

self.screen: list[str] = []
self.width = 80
self.height = 25
self.__offset = 0
self.event_queue: deque[Event] = deque()
self.event_queue = EventQueue(encoding)
try:
self.out = io._WindowsConsoleIO(self.output_fd, "w") # type: ignore[attr-defined]
except ValueError:
Expand Down Expand Up @@ -295,6 +316,12 @@ def _enable_blinking(self):
def _disable_blinking(self):
self.__write("\x1b[?12l")

def _enable_bracketed_paste(self) -> None:
self.__write("\x1b[?2004h")

def _disable_bracketed_paste(self) -> None:
self.__write("\x1b[?2004l")

def __write(self, text: str) -> None:
if "\x1a" in text:
text = ''.join(["^Z" if x == '\x1a' else x for x in text])
Expand Down Expand Up @@ -324,8 +351,15 @@ def prepare(self) -> None:
self.__gone_tall = 0
self.__offset = 0

if self.__vt_support:
SetConsoleMode(InHandle, self.__original_input_mode | ENABLE_VIRTUAL_TERMINAL_INPUT)
self._enable_bracketed_paste()

def restore(self) -> None:
pass
if self.__vt_support:
# Recover to original mode before running REPL
self._disable_bracketed_paste()
SetConsoleMode(InHandle, self.__original_input_mode)

def _move_relative(self, x: int, y: int) -> None:
"""Moves relative to the current posxy"""
Expand All @@ -346,7 +380,7 @@ def move_cursor(self, x: int, y: int) -> None:
raise ValueError(f"Bad cursor position {x}, {y}")

if y < self.__offset or y >= self.__offset + self.height:
self.event_queue.insert(0, Event("scroll", ""))
self.event_queue.insert(Event("scroll", ""))
else:
self._move_relative(x, y)
self.posxy = x, y
Expand Down Expand Up @@ -394,10 +428,8 @@ def get_event(self, block: bool = True) -> Event | None:
"""Return an Event instance. Returns None if |block| is false
and there is no event pending, otherwise waits for the
completion of an event."""
if self.event_queue:
return self.event_queue.pop()

while True:
while self.event_queue.empty():
rec = self._read_input(block)
if rec is None:
return None
Expand Down Expand Up @@ -428,20 +460,25 @@ def get_event(self, block: bool = True) -> Event | None:
key = f"ctrl {key}"
elif key_event.dwControlKeyState & ALT_ACTIVE:
# queue the key, return the meta command
self.event_queue.insert(0, Event(evt="key", data=key, raw=key))
self.event_queue.insert(Event(evt="key", data=key, raw=key))
return Event(evt="key", data="\033") # keymap.py uses this for meta
return Event(evt="key", data=key, raw=key)
if block:
continue

return None
elif self.__vt_support:
# If virtual terminal is enabled, scanning VT sequences
self.event_queue.push(rec.Event.KeyEvent.uChar.UnicodeChar)
continue

if key_event.dwControlKeyState & ALT_ACTIVE:
# queue the key, return the meta command
self.event_queue.insert(0, Event(evt="key", data=key, raw=raw_key))
self.event_queue.insert(Event(evt="key", data=key, raw=raw_key))
return Event(evt="key", data="\033") # keymap.py uses this for meta

return Event(evt="key", data=key, raw=raw_key)
return self.event_queue.get()

def push_char(self, char: int | bytes) -> None:
"""
Expand Down Expand Up @@ -563,6 +600,13 @@ class INPUT_RECORD(Structure):
MOUSE_EVENT = 0x02
WINDOW_BUFFER_SIZE_EVENT = 0x04

ENABLE_PROCESSED_INPUT = 0x0001
ENABLE_LINE_INPUT = 0x0002
ENABLE_ECHO_INPUT = 0x0004
ENABLE_MOUSE_INPUT = 0x0010
ENABLE_INSERT_MODE = 0x0020
ENABLE_VIRTUAL_TERMINAL_INPUT = 0x0200

ENABLE_PROCESSED_OUTPUT = 0x01
ENABLE_WRAP_AT_EOL_OUTPUT = 0x02
ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04
Expand Down Expand Up @@ -594,6 +638,10 @@ class INPUT_RECORD(Structure):
]
ScrollConsoleScreenBuffer.restype = BOOL

GetConsoleMode = _KERNEL32.GetConsoleMode
GetConsoleMode.argtypes = [HANDLE, POINTER(DWORD)]
GetConsoleMode.restype = BOOL

SetConsoleMode = _KERNEL32.SetConsoleMode
SetConsoleMode.argtypes = [HANDLE, DWORD]
SetConsoleMode.restype = BOOL
Expand All @@ -620,6 +668,7 @@ def _win_only(*args, **kwargs):
GetStdHandle = _win_only
GetConsoleScreenBufferInfo = _win_only
ScrollConsoleScreenBuffer = _win_only
GetConsoleMode = _win_only
SetConsoleMode = _win_only
ReadConsoleInput = _win_only
GetNumberOfConsoleInputEvents = _win_only
Expand Down
119 changes: 119 additions & 0 deletions Lib/_pyrepl/windows_eventqueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Windows event and VT sequence scanner, similar to `unix_eventqueue.py`
"""

from collections import deque

from . import keymap
from .console import Event
from .trace import trace
import os

# Reference: https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences#input-sequences
VT_MAP: dict[bytes, str] = {
b'\x1b[A': 'up',
b'\x1b[B': 'down',
b'\x1b[C': 'right',
b'\x1b[D': 'left',
b'\x1b[1;5D': 'ctrl left',
b'\x1b[1;5C': 'ctrl right',

b'\x1b[H': 'home',
b'\x1b[F': 'end',

b'\x7f': 'backspace',
b'\x1b[2~': 'insert',
b'\x1b[3~': 'delete',
b'\x1b[5~': 'page up',
b'\x1b[6~': 'page down',

b'\x1bOP': 'f1',
b'\x1bOQ': 'f2',
b'\x1bOR': 'f3',
b'\x1bOS': 'f4',
b'\x1b[15~': 'f5',
b'\x1b[17~': 'f6',
b'\x1b[18~': 'f7',
b'\x1b[19~': 'f8',
b'\x1b[20~': 'f9',
b'\x1b[21~': 'f10',
b'\x1b[23~': 'f11',
b'\x1b[24~': 'f12',
}

class EventQueue:
def __init__(self, encoding: str) -> None:
self.compiled_keymap = keymap.compile_keymap(VT_MAP)
self.keymap = self.compiled_keymap
trace("keymap {k!r}", k=self.keymap)
self.encoding = encoding
self.events: deque[Event] = deque()
self.buf = bytearray()

def get(self) -> Event | None:
"""
Retrieves the next event from the queue.
"""
if self.events:
return self.events.popleft()
else:
return None

def empty(self) -> bool:
"""
Checks if the queue is empty.
"""
return not self.events

def flush_buf(self) -> bytearray:
"""
Flushes the buffer and returns its contents.
"""
old = self.buf
self.buf = bytearray()
return old

def insert(self, event: Event) -> None:
"""
Inserts an event into the queue.
"""
trace('added event {event}', event=event)
self.events.append(event)

def push(self, char: int | bytes) -> None:
"""
Processes a character by updating the buffer and handling special key mappings.
"""
ord_char = char if isinstance(char, int) else ord(char)
char = bytes(bytearray((ord_char,)))
self.buf.append(ord_char)
if char in self.keymap:
if self.keymap is self.compiled_keymap:
#sanity check, buffer is empty when a special key comes
assert len(self.buf) == 1
k = self.keymap[char]
trace('found map {k!r}', k=k)
if isinstance(k, dict):
self.keymap = k
else:
self.insert(Event('key', k, self.flush_buf()))
self.keymap = self.compiled_keymap

elif self.buf and self.buf[0] == 27: # escape
# escape sequence not recognized by our keymap: propagate it
# outside so that i can be recognized as an M-... key (see also
# the docstring in keymap.py
trace('unrecognized escape sequence, propagating...')
self.keymap = self.compiled_keymap
self.insert(Event('key', '\033', bytearray(b'\033')))
for _c in self.flush_buf()[1:]:
self.push(_c)

else:
try:
decoded = bytes(self.buf).decode(self.encoding)
except UnicodeError:
return
else:
self.insert(Event('key', decoded, self.flush_buf()))
self.keymap = self.compiled_keymap
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Turn on virtual terminal mode and enable barcketed paste in REPL Windows
console. (If the terminal does not support bracketed paste, enabling it
does nothing.)
Loading