feat(tui): improve key input handling and add cursor APIs

- Rewrite Unix key input to use non-blocking I/O via fcntl
- Fix key repeat/hold by reading Python's buffer before select()
- Add SIGWINCH handler for window resize detection
- Implement cursor APIs: clear, clrtoeol, clrtobot, clearok, timeout
- Return KEY_RESIZE on terminal resize events
This commit is contained in:
Xuehai Pan 2026-02-02 02:01:28 +08:00
parent fa93507dce
commit 3ba255dd31

View file

@ -410,6 +410,7 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes
self.nodelay_mode: bool = False
self.leaveok_mode: bool = False
self.cursor_visible: int = 1
self.input_timeout_ms: int = -1 # -1 = blocking, 0 = non-blocking, >0 = timeout in ms
# Color support
self.colors: int = 256
@ -444,6 +445,10 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes
self.cursor_y: int = 0
self.cursor_x: int = 0
# Resize state
self.resize_pending: bool = False
self._old_sigwinch_handler: object = None
# Windows console state (for restoration)
self._windows_console_handle: int | None = None
self._windows_original_mode: int | None = None
@ -505,6 +510,37 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes
except (ImportError, OSError, ValueError):
pass # not a tty or termios not available
# Set up SIGWINCH handler for window resize detection
self._setup_sigwinch_handler()
def _setup_sigwinch_handler(self) -> None:
"""Set up SIGWINCH signal handler for window resize detection."""
import signal # pylint: disable=import-outside-toplevel
if not hasattr(signal, 'SIGWINCH'):
return # Windows doesn't have SIGWINCH
def sigwinch_handler(signum: int, frame: object) -> None: # pylint: disable=unused-argument
self.resize_pending = True
try:
self._old_sigwinch_handler = signal.signal(signal.SIGWINCH, sigwinch_handler)
except (OSError, ValueError):
pass # signal handling not supported
def _restore_sigwinch_handler(self) -> None:
"""Restore the original SIGWINCH handler."""
import signal # pylint: disable=import-outside-toplevel
if not hasattr(signal, 'SIGWINCH'):
return
if self._old_sigwinch_handler is not None:
try:
signal.signal(signal.SIGWINCH, self._old_sigwinch_handler) # type: ignore[arg-type]
except (OSError, ValueError):
pass
def set_cbreak_unix(self, enable: bool) -> None:
"""Configure Unix terminal for cbreak mode (no line buffering, no echo)."""
if self._unix_stdin_fd is None:
@ -540,6 +576,8 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes
)
except (ImportError, OSError):
pass
# Restore original SIGWINCH handler
self._restore_sigwinch_handler()
def write(self, data: str) -> None:
"""Write data to stdout."""
@ -776,88 +814,100 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes
return KEY_MOUSE
def _read_key_unix(self) -> int: # pylint: disable=too-many-branches,too-many-return-statements
"""Read key on Unix using select."""
import select # pylint: disable=import-outside-toplevel
"""Read key on Unix using non-blocking I/O."""
import fcntl # pylint: disable=import-outside-toplevel
if _sys.stdin is None:
return -1
# Guard against non-TTY stdin (IDE, piped input) where select() may fail
try:
_sys.stdin.fileno()
fd = _sys.stdin.fileno()
except (OSError, ValueError, AttributeError):
return -1
# Set non-blocking mode for entire function to handle buffered data
# This is critical for key repeat - Python buffers multiple escape sequences
old_flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | _os.O_NONBLOCK)
try:
rlist, _, _ = select.select([_sys.stdin], [], [], 0)
except (OSError, ValueError):
return -1
if not rlist:
return -1
return self._read_key_unix_nonblock(fd)
finally:
fcntl.fcntl(fd, fcntl.F_SETFL, old_flags)
ch = _sys.stdin.read(1)
# pylint: disable=too-many-return-statements,too-many-branches
def _read_key_unix_nonblock(self, fd: int) -> int:
"""Read key with file descriptor already in non-blocking mode."""
import select # pylint: disable=import-outside-toplevel
# Try to read from Python's buffer first
ch = self._read_nonblock(_sys.stdin, retries=1)
# If no buffered data, check if new data available on file descriptor
if not ch:
return -1
try:
rlist, _, _ = select.select([fd], [], [], 0)
except (OSError, ValueError):
return -1
if not rlist:
return -1
ch = self._read_nonblock(_sys.stdin, retries=1)
if not ch:
return -1
code = ord(ch)
# Handle escape sequences
if code == 27: # ESC # pylint: disable=too-many-nested-blocks
# Use deadline-based timeout (100ms total) to handle slow connections (SSH, etc.)
deadline = _time.monotonic() + 0.1
remaining = 0.1
rlist, _, _ = select.select([_sys.stdin], [], [], remaining)
if rlist:
seq = _sys.stdin.read(1)
if code == 27: # ESC
seq = self._read_nonblock(_sys.stdin)
if seq:
if seq == '[':
# CSI sequence - could be mouse or key
# Read complete sequence until terminator (letter or ~)
# Limit iterations to prevent DoS from malformed input
# CSI sequence - read until terminator
csi_seq = '['
max_seq_len = 32 # CSI sequences are typically <20 chars
found_terminator = False
for _ in range(max_seq_len):
remaining = deadline - _time.monotonic()
if remaining <= 0:
break
rlist, _, _ = select.select([_sys.stdin], [], [], remaining)
if not rlist:
break
next_ch = _sys.stdin.read(1)
for _ in range(32):
next_ch = self._read_nonblock(_sys.stdin)
if not next_ch:
break
csi_seq += next_ch
if next_ch == '<':
# SGR mouse sequence: \033[<button;x;y[Mm]
return self._parse_sgr_mouse(_sys.stdin)
# Check for sequence terminator (letter A-Z, a-z, or ~)
if next_ch.isalpha() or next_ch == '~':
found_terminator = True
break
if found_terminator:
return self._map_escape_sequence(csi_seq)
# Incomplete sequence (timeout) - queue consumed bytes, return ESC
for ch in csi_seq:
self.input_queue.append(ord(ch))
if seq == 'O':
# SS3 sequence (F1-F4 keys)
remaining = deadline - _time.monotonic()
if remaining > 0:
rlist, _, _ = select.select([_sys.stdin], [], [], remaining)
if rlist:
next_ch = _sys.stdin.read(1)
if next_ch:
return self._map_escape_sequence('O' + next_ch)
# Timeout or no char, queue 'O' and return ESC
return self._map_escape_sequence(csi_seq)
# Incomplete - queue and return ESC
for c in csi_seq:
self.input_queue.append(ord(c))
elif seq == 'O':
# SS3 sequence
next_ch = self._read_nonblock(_sys.stdin)
if next_ch:
return self._map_escape_sequence('O' + next_ch)
self.input_queue.append(ord('O'))
return 27
# Alt+key: queue the consumed character, return ESC
if seq:
else:
# Alt+key
self.input_queue.append(ord(seq))
return 27
return code
def _read_nonblock(self, stdin: SupportsRead[str], retries: int = 3) -> str:
"""Read a single character in non-blocking mode with brief retries.
Args:
stdin: The stdin file object
retries: Number of retry attempts (each ~1ms apart) for escape sequences
"""
for _ in range(retries):
try:
ch = stdin.read(1)
if ch:
return ch
except (BlockingIOError, OSError):
pass
# Brief sleep to allow more bytes to arrive (escape sequences come in bursts)
if retries > 1:
_time.sleep(0.001) # 1ms
return ''
def _parse_sgr_mouse(self, stdin: SupportsRead[str]) -> int: # pylint: disable=too-many-return-statements
"""Parse SGR extended mouse sequence and queue the event."""
import select # pylint: disable=import-outside-toplevel
@ -2082,16 +2132,37 @@ class CursesWindow: # pylint: disable=too-many-public-methods
_terminal.screen_buffer[y][x + i] = (char, attr)
def clear(self) -> None:
raise NotImplementedError
"""Clear screen and move cursor to (0, 0)."""
assert _terminal is not None
# Clear screen buffer
_terminal.screen_buffer = [
[(' ', 0) for _ in range(_terminal.cols)] for _ in range(_terminal.lines)
]
# Move cursor to home position
_terminal.cursor_y = 0
_terminal.cursor_x = 0
def clearok(self, yes: int) -> None:
raise NotImplementedError
pass # no-op: we always do full redraws
def clrtobot(self) -> None:
raise NotImplementedError
"""Clear from cursor to end of screen."""
assert _terminal is not None
y, x = _terminal.cursor_y, _terminal.cursor_x
# Clear from cursor to end of current line
for i in range(x, _terminal.cols):
_terminal.screen_buffer[y][i] = (' ', 0)
# Clear all lines below
for row in range(y + 1, _terminal.lines):
for col in range(_terminal.cols):
_terminal.screen_buffer[row][col] = (' ', 0)
def clrtoeol(self) -> None:
raise NotImplementedError
"""Clear from cursor to end of line."""
assert _terminal is not None
y, x = _terminal.cursor_y, _terminal.cursor_x
for i in range(x, _terminal.cols):
_terminal.screen_buffer[y][i] = (' ', 0)
def cursyncup(self) -> None:
raise NotImplementedError
@ -2142,10 +2213,16 @@ class CursesWindow: # pylint: disable=too-many-public-methods
@_overload
def getch(self, y: int, x: int) -> int: ...
# pylint: disable-next=too-many-branches
# pylint: disable-next=too-many-branches,too-many-return-statements
def getch(self, *yx: int) -> int: # type: ignore[misc]
assert _terminal is not None
# Check for pending window resize
if _terminal.resize_pending:
_terminal.resize_pending = False
_terminal.update_size()
return KEY_RESIZE
# Move cursor if position specified (with bounds check like move())
if len(yx) == 2:
y, x = yx[0], yx[1]
@ -2168,17 +2245,15 @@ class CursesWindow: # pylint: disable=too-many-public-methods
# Unix: try to use select() for efficient blocking wait
# But guard against invalid fileno (piped stdin, some IDEs, etc.)
try:
import select # pylint: disable=import-outside-toplevel
# Test if stdin has a valid fileno for select
_sys.stdin.fileno()
use_select = True
except (ValueError, OSError, AttributeError, ImportError):
except (ValueError, OSError, AttributeError):
# stdin doesn't support fileno() or it's invalid - fall back to polling
use_select = False
if use_select:
import select # pylint: disable=import-outside-toplevel
import select as select_module # pylint: disable=import-outside-toplevel
while True:
# Check input queue first
@ -2186,7 +2261,7 @@ class CursesWindow: # pylint: disable=too-many-public-methods
return _terminal.input_queue.popleft()
try:
# Block until input is available (with 100ms timeout for responsiveness)
rlist, _, _ = select.select([_sys.stdin], [], [], 0.1)
rlist, _, _ = select_module.select([_sys.stdin], [], [], 0.1)
if rlist:
key = _terminal.read_key_nonblocking()
if key != -1:
@ -2562,7 +2637,26 @@ class CursesWindow: # pylint: disable=too-many-public-methods
raise NotImplementedError
def timeout(self, delay: int) -> None:
raise NotImplementedError
"""Set blocking behavior for getch().
Args:
delay: If negative, getch() blocks indefinitely.
If zero, getch() is non-blocking.
If positive, getch() blocks for at most delay milliseconds.
"""
assert _terminal is not None
if delay < 0:
self._nodelay_mode = False
_terminal.nodelay_mode = False
_terminal.input_timeout_ms = -1
elif delay == 0:
self._nodelay_mode = True
_terminal.nodelay_mode = True
_terminal.input_timeout_ms = 0
else:
self._nodelay_mode = False
_terminal.nodelay_mode = False
_terminal.input_timeout_ms = delay
def touchline(self, start: int, count: int, changed: bool = True) -> None:
raise NotImplementedError