From 3ba255dd31ac5d86ae6ebf0e123ab0da9de7143c Mon Sep 17 00:00:00 2001 From: Xuehai Pan Date: Mon, 2 Feb 2026 02:01:28 +0800 Subject: [PATCH] feat(tui): improve key input handling and add cursor APIs - Rewrite Unix key input to use non-blocking I/O via fcntl - Fix key repeat/hold by reading Python's buffer before select() - Add SIGWINCH handler for window resize detection - Implement cursor APIs: clear, clrtoeol, clrtobot, clearok, timeout - Return KEY_RESIZE on terminal resize events --- nvitop/tui/library/curses/_curses.py | 220 +++++++++++++++++++-------- 1 file changed, 157 insertions(+), 63 deletions(-) diff --git a/nvitop/tui/library/curses/_curses.py b/nvitop/tui/library/curses/_curses.py index 4332675..feb77ec 100644 --- a/nvitop/tui/library/curses/_curses.py +++ b/nvitop/tui/library/curses/_curses.py @@ -410,6 +410,7 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes self.nodelay_mode: bool = False self.leaveok_mode: bool = False self.cursor_visible: int = 1 + self.input_timeout_ms: int = -1 # -1 = blocking, 0 = non-blocking, >0 = timeout in ms # Color support self.colors: int = 256 @@ -444,6 +445,10 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes self.cursor_y: int = 0 self.cursor_x: int = 0 + # Resize state + self.resize_pending: bool = False + self._old_sigwinch_handler: object = None + # Windows console state (for restoration) self._windows_console_handle: int | None = None self._windows_original_mode: int | None = None @@ -505,6 +510,37 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes except (ImportError, OSError, ValueError): pass # not a tty or termios not available + # Set up SIGWINCH handler for window resize detection + self._setup_sigwinch_handler() + + def _setup_sigwinch_handler(self) -> None: + """Set up SIGWINCH signal handler for window resize detection.""" + import signal # pylint: disable=import-outside-toplevel + + if not hasattr(signal, 'SIGWINCH'): + return # Windows doesn't have SIGWINCH + + def sigwinch_handler(signum: int, frame: object) -> None: # pylint: disable=unused-argument + self.resize_pending = True + + try: + self._old_sigwinch_handler = signal.signal(signal.SIGWINCH, sigwinch_handler) + except (OSError, ValueError): + pass # signal handling not supported + + def _restore_sigwinch_handler(self) -> None: + """Restore the original SIGWINCH handler.""" + import signal # pylint: disable=import-outside-toplevel + + if not hasattr(signal, 'SIGWINCH'): + return + + if self._old_sigwinch_handler is not None: + try: + signal.signal(signal.SIGWINCH, self._old_sigwinch_handler) # type: ignore[arg-type] + except (OSError, ValueError): + pass + def set_cbreak_unix(self, enable: bool) -> None: """Configure Unix terminal for cbreak mode (no line buffering, no echo).""" if self._unix_stdin_fd is None: @@ -540,6 +576,8 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes ) except (ImportError, OSError): pass + # Restore original SIGWINCH handler + self._restore_sigwinch_handler() def write(self, data: str) -> None: """Write data to stdout.""" @@ -776,88 +814,100 @@ class _TerminalState: # pylint: disable=too-many-instance-attributes return KEY_MOUSE def _read_key_unix(self) -> int: # pylint: disable=too-many-branches,too-many-return-statements - """Read key on Unix using select.""" - import select # pylint: disable=import-outside-toplevel + """Read key on Unix using non-blocking I/O.""" + import fcntl # pylint: disable=import-outside-toplevel if _sys.stdin is None: return -1 - # Guard against non-TTY stdin (IDE, piped input) where select() may fail try: - _sys.stdin.fileno() + fd = _sys.stdin.fileno() except (OSError, ValueError, AttributeError): return -1 + # Set non-blocking mode for entire function to handle buffered data + # This is critical for key repeat - Python buffers multiple escape sequences + old_flags = fcntl.fcntl(fd, fcntl.F_GETFL) + fcntl.fcntl(fd, fcntl.F_SETFL, old_flags | _os.O_NONBLOCK) try: - rlist, _, _ = select.select([_sys.stdin], [], [], 0) - except (OSError, ValueError): - return -1 - if not rlist: - return -1 + return self._read_key_unix_nonblock(fd) + finally: + fcntl.fcntl(fd, fcntl.F_SETFL, old_flags) - ch = _sys.stdin.read(1) + # pylint: disable=too-many-return-statements,too-many-branches + def _read_key_unix_nonblock(self, fd: int) -> int: + """Read key with file descriptor already in non-blocking mode.""" + import select # pylint: disable=import-outside-toplevel + + # Try to read from Python's buffer first + ch = self._read_nonblock(_sys.stdin, retries=1) + + # If no buffered data, check if new data available on file descriptor if not ch: - return -1 + try: + rlist, _, _ = select.select([fd], [], [], 0) + except (OSError, ValueError): + return -1 + if not rlist: + return -1 + ch = self._read_nonblock(_sys.stdin, retries=1) + if not ch: + return -1 code = ord(ch) # Handle escape sequences - if code == 27: # ESC # pylint: disable=too-many-nested-blocks - # Use deadline-based timeout (100ms total) to handle slow connections (SSH, etc.) - deadline = _time.monotonic() + 0.1 - remaining = 0.1 - rlist, _, _ = select.select([_sys.stdin], [], [], remaining) - if rlist: - seq = _sys.stdin.read(1) + if code == 27: # ESC + seq = self._read_nonblock(_sys.stdin) + if seq: if seq == '[': - # CSI sequence - could be mouse or key - # Read complete sequence until terminator (letter or ~) - # Limit iterations to prevent DoS from malformed input + # CSI sequence - read until terminator csi_seq = '[' - max_seq_len = 32 # CSI sequences are typically <20 chars - found_terminator = False - for _ in range(max_seq_len): - remaining = deadline - _time.monotonic() - if remaining <= 0: - break - rlist, _, _ = select.select([_sys.stdin], [], [], remaining) - if not rlist: - break - next_ch = _sys.stdin.read(1) + for _ in range(32): + next_ch = self._read_nonblock(_sys.stdin) if not next_ch: break csi_seq += next_ch if next_ch == '<': - # SGR mouse sequence: \033[ 0: - rlist, _, _ = select.select([_sys.stdin], [], [], remaining) - if rlist: - next_ch = _sys.stdin.read(1) - if next_ch: - return self._map_escape_sequence('O' + next_ch) - # Timeout or no char, queue 'O' and return ESC + return self._map_escape_sequence(csi_seq) + # Incomplete - queue and return ESC + for c in csi_seq: + self.input_queue.append(ord(c)) + elif seq == 'O': + # SS3 sequence + next_ch = self._read_nonblock(_sys.stdin) + if next_ch: + return self._map_escape_sequence('O' + next_ch) self.input_queue.append(ord('O')) return 27 - # Alt+key: queue the consumed character, return ESC - if seq: + else: + # Alt+key self.input_queue.append(ord(seq)) return 27 return code + def _read_nonblock(self, stdin: SupportsRead[str], retries: int = 3) -> str: + """Read a single character in non-blocking mode with brief retries. + + Args: + stdin: The stdin file object + retries: Number of retry attempts (each ~1ms apart) for escape sequences + """ + for _ in range(retries): + try: + ch = stdin.read(1) + if ch: + return ch + except (BlockingIOError, OSError): + pass + # Brief sleep to allow more bytes to arrive (escape sequences come in bursts) + if retries > 1: + _time.sleep(0.001) # 1ms + return '' + def _parse_sgr_mouse(self, stdin: SupportsRead[str]) -> int: # pylint: disable=too-many-return-statements """Parse SGR extended mouse sequence and queue the event.""" import select # pylint: disable=import-outside-toplevel @@ -2082,16 +2132,37 @@ class CursesWindow: # pylint: disable=too-many-public-methods _terminal.screen_buffer[y][x + i] = (char, attr) def clear(self) -> None: - raise NotImplementedError + """Clear screen and move cursor to (0, 0).""" + assert _terminal is not None + # Clear screen buffer + _terminal.screen_buffer = [ + [(' ', 0) for _ in range(_terminal.cols)] for _ in range(_terminal.lines) + ] + # Move cursor to home position + _terminal.cursor_y = 0 + _terminal.cursor_x = 0 def clearok(self, yes: int) -> None: - raise NotImplementedError + pass # no-op: we always do full redraws def clrtobot(self) -> None: - raise NotImplementedError + """Clear from cursor to end of screen.""" + assert _terminal is not None + y, x = _terminal.cursor_y, _terminal.cursor_x + # Clear from cursor to end of current line + for i in range(x, _terminal.cols): + _terminal.screen_buffer[y][i] = (' ', 0) + # Clear all lines below + for row in range(y + 1, _terminal.lines): + for col in range(_terminal.cols): + _terminal.screen_buffer[row][col] = (' ', 0) def clrtoeol(self) -> None: - raise NotImplementedError + """Clear from cursor to end of line.""" + assert _terminal is not None + y, x = _terminal.cursor_y, _terminal.cursor_x + for i in range(x, _terminal.cols): + _terminal.screen_buffer[y][i] = (' ', 0) def cursyncup(self) -> None: raise NotImplementedError @@ -2142,10 +2213,16 @@ class CursesWindow: # pylint: disable=too-many-public-methods @_overload def getch(self, y: int, x: int) -> int: ... - # pylint: disable-next=too-many-branches + # pylint: disable-next=too-many-branches,too-many-return-statements def getch(self, *yx: int) -> int: # type: ignore[misc] assert _terminal is not None + # Check for pending window resize + if _terminal.resize_pending: + _terminal.resize_pending = False + _terminal.update_size() + return KEY_RESIZE + # Move cursor if position specified (with bounds check like move()) if len(yx) == 2: y, x = yx[0], yx[1] @@ -2168,17 +2245,15 @@ class CursesWindow: # pylint: disable=too-many-public-methods # Unix: try to use select() for efficient blocking wait # But guard against invalid fileno (piped stdin, some IDEs, etc.) try: - import select # pylint: disable=import-outside-toplevel - # Test if stdin has a valid fileno for select _sys.stdin.fileno() use_select = True - except (ValueError, OSError, AttributeError, ImportError): + except (ValueError, OSError, AttributeError): # stdin doesn't support fileno() or it's invalid - fall back to polling use_select = False if use_select: - import select # pylint: disable=import-outside-toplevel + import select as select_module # pylint: disable=import-outside-toplevel while True: # Check input queue first @@ -2186,7 +2261,7 @@ class CursesWindow: # pylint: disable=too-many-public-methods return _terminal.input_queue.popleft() try: # Block until input is available (with 100ms timeout for responsiveness) - rlist, _, _ = select.select([_sys.stdin], [], [], 0.1) + rlist, _, _ = select_module.select([_sys.stdin], [], [], 0.1) if rlist: key = _terminal.read_key_nonblocking() if key != -1: @@ -2562,7 +2637,26 @@ class CursesWindow: # pylint: disable=too-many-public-methods raise NotImplementedError def timeout(self, delay: int) -> None: - raise NotImplementedError + """Set blocking behavior for getch(). + + Args: + delay: If negative, getch() blocks indefinitely. + If zero, getch() is non-blocking. + If positive, getch() blocks for at most delay milliseconds. + """ + assert _terminal is not None + if delay < 0: + self._nodelay_mode = False + _terminal.nodelay_mode = False + _terminal.input_timeout_ms = -1 + elif delay == 0: + self._nodelay_mode = True + _terminal.nodelay_mode = True + _terminal.input_timeout_ms = 0 + else: + self._nodelay_mode = False + _terminal.nodelay_mode = False + _terminal.input_timeout_ms = delay def touchline(self, start: int, count: int, changed: bool = True) -> None: raise NotImplementedError