This commit is contained in:
The Anh Nguyen 2025-11-28 17:14:18 +08:00 committed by GitHub
commit d6869cbfa0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1393 additions and 19 deletions

140
README.md
View file

@ -37,6 +37,7 @@ An interactive NVIDIA-GPU process viewer and beyond, the one-stop solution for G
- [Device and Process Status](#device-and-process-status)
- [Resource Monitor](#resource-monitor)
- [For Docker Users](#for-docker-users)
- [For Kubernetes Users](#for-kubernetes-users)
- [For SSH Users](#for-ssh-users)
- [Command Line Options and Environment Variables](#command-line-options-and-environment-variables)
- [Keybindings for Monitor Mode](#keybindings-for-monitor-mode)
@ -98,6 +99,7 @@ An interactive NVIDIA-GPU process viewer and beyond, the one-stop solution for G
- get host process information using the cross-platform library [psutil](https://github.com/giampaolo/psutil) instead of calling `ps -p <pid>` in a subprocess. (vs. [nvidia-htop](https://github.com/peci1/nvidia-htop) & [py3nvml](https://github.com/fbcotter/py3nvml))
- written in pure Python, easy to install with `pip`. (vs. [nvtop](https://github.com/Syllo/nvtop))
- **Integrable**: easy to integrate into other applications, more than monitoring. (vs. [nvidia-htop](https://github.com/peci1/nvidia-htop) & [nvtop](https://github.com/Syllo/nvtop))
- **Kubernetes support**: display pod name, namespace, container information, and GPU resource requests/limits for processes running in Kubernetes clusters
<p align="center">
<img width="100%" src="https://user-images.githubusercontent.com/16078332/129374533-fe06c01a-630d-4994-b54b-821cccd0d33c.png" alt="Windows">
@ -115,6 +117,7 @@ An interactive NVIDIA-GPU process viewer and beyond, the one-stop solution for G
- NVIDIA Management Library (NVML)
- nvidia-ml-py
- psutil
- kubernetes (for Kubernetes pod information, optional)
- curses<sup>[*](#curses)</sup> (with `libncursesw`)
**NOTE:** The [NVIDIA Management Library (*NVML*)](https://developer.nvidia.com/nvidia-management-library-nvml) is a C-based programmatic interface for monitoring and managing various states. The runtime version of the NVML library ships with the NVIDIA display driver (available at [Download Drivers | NVIDIA](https://www.nvidia.com/Download/index.aspx)), or can be downloaded as part of the NVIDIA CUDA Toolkit (available at [CUDA Toolkit | NVIDIA Developer](https://developer.nvidia.com/cuda-downloads)). The lists of OS platforms and NVIDIA-GPUs supported by the NVML library can be found in the [NVML API Reference](https://docs.nvidia.com/deploy/nvml-api/nvml-api-reference.html).
@ -325,6 +328,143 @@ docker compose --project-directory=nvitop-exporter/grafana up --build --detach
See [`nvitop-exporter`](./nvitop-exporter/README.md) for more details.
#### For Kubernetes Users
`nvitop` supports Kubernetes integration and can display pod information for processes running in containers. When running inside a Kubernetes cluster, `nvitop` will automatically detect the environment and show:
- **Pod Name**: Name of the Kubernetes pod
- **Pod Namespace**: Kubernetes namespace the pod belongs to
- **Pod UID**: Unique identifier for the pod
- **Container Name**: Name of the container running the process
- **Container ID**: Unique container identifier
- **Node Name**: Kubernetes node where the pod is running
- **Pod Labels**: Kubernetes labels applied to the pod
- **NVIDIA GPU Requests**: Number of GPUs requested by the container
- **NVIDIA GPU Limits**: GPU limits set for the container
**Running as DaemonSet:**
```yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: nvitop
namespace: kube-system
spec:
selector:
matchLabels:
name: nvitop
template:
metadata:
labels:
name: nvitop
spec:
serviceAccountName: nvitop
hostPID: true
tolerations:
- key: nvidia.com/gpu
operator: Exists
effect: NoSchedule
containers:
- name: nvitop
image: nvitop:latest
command: ["sleep", "infinity"]
runtimeClassName: nvidia
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: nvitop
namespace: kube-system
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: nvitop
rules:
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: nvitop
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: nvitop
subjects:
- kind: ServiceAccount
name: nvitop
namespace: kube-system
```
**Key Requirements for Kubernetes:**
- `hostPID: true` for process visibility (access to host /proc)
- NVIDIA Container Toolkit or NVIDIA device plugin
- Service account with appropriate RBAC permissions (optional, for pod details)
**Environment Variables:**
- `KUBECONFIG`: Path to kubeconfig file for cluster access
**Usage Examples:**
```bash
# Run nvitop in the pod
kubectl exec -n kube-system -it <nvitop-pod> -- nvitop
# Monitor all nodes
kubectl get pods -n kube-system -l name=nvitop -o wide
# View logs
kubectl logs -n kube-system -l name=nvitop
```
#### Local Monitoring with KUBECONFIG
You can also run `nvitop` on your local machine and monitor GPU processes on Kubernetes cluster nodes by using kubeconfig:
```bash
# Set your kubeconfig environment
export KUBECONFIG=~/.kube/config
# Run nvitop locally - it will try to match processes to pods
nvitop
# Or specify a specific context
KUBECONFIG=~/.kube/production-config nvitop
# For Docker environments
docker run -it --rm --runtime=nvidia --gpus=all --pid=host \
-v ~/.kube:/root/.kube:ro \
ghcr.io/xuehaipan/nvitop:latest
```
**How it works:**
1. `nvitop` detects local GPU processes
2. Extracts container information from `/proc/<pid>/cgroup`
3. Uses container ID to find matching pods via Kubernetes API
4. Displays pod information alongside process details
**Requirements for local monitoring:**
- Valid kubeconfig with cluster access
- Network connectivity to Kubernetes API server
- GPU processes running in containers on accessible nodes
- Access to host proc filesystem via `--pid=host`
#### For Docker Users with Kubernetes
Build and run the Docker image with [nvidia-container-toolkit](https://github.com/NVIDIA/nvidia-container-toolkit) and kubeconfig:
```bash
docker run -it --rm --runtime=nvidia --gpus=all --pid=host \
-v ~/.kube:/root/.kube:ro \
ghcr.io/xuehaipan/nvitop:latest
```
**NOTE:** Don't forget to add the `--pid=host` option and mount your kubeconfig for Kubernetes pod information when running the container.
#### For SSH Users
Run `nvitop` directly on the SSH session instead of a login shell:

View file

@ -2772,6 +2772,70 @@
"refresh": 1,
"regex": "",
"type": "query"
},
{
"current": {},
"definition": "label_values(pod_name)",
"description": "Kubernetes pod name",
"includeAll": true,
"multi": true,
"name": "pod_name",
"options": [],
"query": {
"qryType": 1,
"query": "label_values(pod_name)"
},
"refresh": 1,
"regex": "",
"type": "query"
},
{
"current": {},
"definition": "label_values(pod_namespace)",
"description": "Kubernetes namespace",
"includeAll": true,
"multi": true,
"name": "pod_namespace",
"options": [],
"query": {
"qryType": 1,
"query": "label_values(pod_namespace)"
},
"refresh": 1,
"regex": "",
"type": "query"
},
{
"current": {},
"definition": "label_values(container_name)",
"description": "Kubernetes container name",
"includeAll": true,
"multi": true,
"name": "container_name",
"options": [],
"query": {
"qryType": 1,
"query": "label_values(container_name)"
},
"refresh": 1,
"regex": "",
"type": "query"
},
{
"current": {},
"definition": "label_values(node_name)",
"description": "Kubernetes node name",
"includeAll": true,
"multi": true,
"name": "node_name",
"options": [],
"query": {
"qryType": 1,
"query": "label_values(node_name)"
},
"refresh": 1,
"regex": "",
"type": "query"
}
]
},

View file

@ -178,7 +178,11 @@ def main() -> int: # pylint: disable=too-many-locals,too-many-statements
else:
cprint(f'INFO: GPU {device.index}: {name} (UUID: {uuid})', file=sys.stderr)
exporter = PrometheusExporter(devices, hostname=args.hostname, interval=args.interval)
exporter = PrometheusExporter(
devices,
hostname=args.hostname,
interval=args.interval,
)
try:
start_wsgi_server(port=args.port, addr=args.bind_address)

View file

@ -407,70 +407,261 @@ class PrometheusExporter: # pylint: disable=too-many-instance-attributes
self.process_info = Info(
name='process_info',
documentation='Process information.',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_running_time = Gauge(
name='process_running_time',
documentation='Process running time (s).',
unit='Second',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_cpu_percent = Gauge(
name='process_cpu_percent',
documentation='Process CPU percent (%).',
unit='Percentage',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_rss_memory = Gauge(
name='process_rss_memory',
documentation='Process memory resident set size (MiB).',
unit='MiB',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_memory_percent = Gauge(
name='process_memory_percent',
documentation='Process memory percent (%).',
unit='Percentage',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_gpu_memory = Gauge(
name='process_gpu_memory',
documentation='Process GPU memory (MiB).',
unit='MiB',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_gpu_sm_utilization = Gauge(
name='process_gpu_sm_utilization',
documentation='Process GPU SM utilization (%).',
unit='Percentage',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_gpu_memory_utilization = Gauge(
name='process_gpu_memory_utilization',
documentation='Process GPU memory utilization (%).',
unit='Percentage',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_gpu_encoder_utilization = Gauge(
name='process_gpu_encoder_utilization',
documentation='Process GPU encoder utilization (%).',
unit='Percentage',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_gpu_decoder_utilization = Gauge(
name='process_gpu_decoder_utilization',
documentation='Process GPU decoder utilization (%).',
unit='Percentage',
labelnames=['hostname', 'index', 'devicename', 'uuid', 'pid', 'username'],
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
# Kubernetes-specific metrics
self.process_pod_labels = Info(
name='process_pod_labels',
documentation='Kubernetes pod labels for the process.',
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_nvidia_gpu_requests = Gauge(
name='process_nvidia_gpu_requests',
documentation='NVIDIA GPU requests for the process pod.',
unit='Count',
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
self.process_nvidia_gpu_limits = Gauge(
name='process_nvidia_gpu_limits',
documentation='NVIDIA GPU limits for the process pod.',
unit='Count',
labelnames=[
'hostname',
'index',
'devicename',
'uuid',
'pid',
'username',
'pod_name',
'pod_namespace',
'pod_uid',
'container_name',
'container_id',
'node_name',
],
registry=self.registry,
)
@ -605,6 +796,24 @@ class PrometheusExporter: # pylint: disable=too-many-instance-attributes
host_snapshot = host_snapshots[pid, username] = process.host_snapshot()
else:
host_snapshot = host_snapshots[pid, username]
# Collect Kubernetes information (always enabled)
try:
k8s_pod_name = process.pod_name()
k8s_pod_namespace = process.pod_namespace()
k8s_pod_uid = process.pod_uid()
k8s_container_name = process.container_name()
k8s_container_id = process.container_id()
k8s_node_name = process.node_name()
k8s_pod_labels = process.pod_labels()
k8s_gpu_requests = process.nvidia_gpu_requests()
k8s_gpu_limits = process.nvidia_gpu_limits()
except (ImportError, OSError, AttributeError, KeyError, ValueError):
k8s_pod_name = k8s_pod_namespace = k8s_pod_uid = 'N/A'
k8s_container_name = k8s_container_id = k8s_node_name = 'N/A'
k8s_pod_labels = {}
k8s_gpu_requests = k8s_gpu_limits = 0
self.process_info.labels(
hostname=self.hostname,
index=index,
@ -612,6 +821,12 @@ class PrometheusExporter: # pylint: disable=too-many-instance-attributes
uuid=uuid,
pid=pid,
username=username,
pod_name=k8s_pod_name,
pod_namespace=k8s_pod_namespace,
pod_uid=k8s_pod_uid,
container_name=k8s_container_name,
container_id=k8s_container_id,
node_name=k8s_node_name,
).info(
{
'status': host_snapshot.status,
@ -655,10 +870,73 @@ class PrometheusExporter: # pylint: disable=too-many-instance-attributes
uuid=uuid,
pid=pid,
username=username,
pod_name=k8s_pod_name,
pod_namespace=k8s_pod_namespace,
pod_uid=k8s_pod_uid,
container_name=k8s_container_name,
container_id=k8s_container_id,
node_name=k8s_node_name,
).set(value)
# Set Kubernetes-specific metrics
self.process_pod_labels.labels(
hostname=self.hostname,
index=index,
devicename=name,
uuid=uuid,
pid=pid,
username=username,
pod_name=k8s_pod_name,
pod_namespace=k8s_pod_namespace,
pod_uid=k8s_pod_uid,
container_name=k8s_container_name,
container_id=k8s_container_id,
node_name=k8s_node_name,
).info(k8s_pod_labels if k8s_pod_labels else {})
self.process_nvidia_gpu_requests.labels(
hostname=self.hostname,
index=index,
devicename=name,
uuid=uuid,
pid=pid,
username=username,
pod_name=k8s_pod_name,
pod_namespace=k8s_pod_namespace,
pod_uid=k8s_pod_uid,
container_name=k8s_container_name,
container_id=k8s_container_id,
node_name=k8s_node_name,
).set(k8s_gpu_requests)
self.process_nvidia_gpu_limits.labels(
hostname=self.hostname,
index=index,
devicename=name,
uuid=uuid,
pid=pid,
username=username,
pod_name=k8s_pod_name,
pod_namespace=k8s_pod_namespace,
pod_uid=k8s_pod_uid,
container_name=k8s_container_name,
container_id=k8s_container_id,
node_name=k8s_node_name,
).set(k8s_gpu_limits)
alive_pids.update(host_snapshots)
for pid, username in previous_alive_pids.difference(alive_pids):
# For dead processes, we need to try removing with various Kubernetes label combinations
# since we don't have the actual K8s info anymore
k8s_na_values = {
'pod_name': 'N/A',
'pod_namespace': 'N/A',
'pod_uid': 'N/A',
'container_name': 'N/A',
'container_id': 'N/A',
'node_name': 'N/A',
}
for collector in (
self.process_info,
self.process_running_time,
@ -670,6 +948,9 @@ class PrometheusExporter: # pylint: disable=too-many-instance-attributes
self.process_gpu_memory_utilization,
self.process_gpu_encoder_utilization,
self.process_gpu_decoder_utilization,
self.process_pod_labels,
self.process_nvidia_gpu_requests,
self.process_nvidia_gpu_limits,
):
try:
collector.remove(
@ -679,6 +960,12 @@ class PrometheusExporter: # pylint: disable=too-many-instance-attributes
uuid,
pid,
username,
k8s_na_values['pod_name'],
k8s_na_values['pod_namespace'],
k8s_na_values['pod_uid'],
k8s_na_values['container_name'],
k8s_na_values['container_id'],
k8s_na_values['node_name'],
)
except KeyError: # noqa: PERF203
pass

View file

@ -21,6 +21,7 @@ from nvitop.api import (
collector,
device,
host,
kubernetes,
libcuda,
libcudart,
libnvml,
@ -39,6 +40,14 @@ from nvitop.api.device import (
normalize_cuda_visible_devices,
parse_cuda_visible_devices,
)
from nvitop.api.kubernetes import (
KubernetesClient,
KubernetesError,
KubernetesInfo,
extract_pod_from_pid,
get_kubernetes_info,
is_kubernetes_environment,
)
from nvitop.api.libnvml import NVMLError, nvmlCheckReturn
from nvitop.api.process import GpuProcess, HostProcess, command_join
from nvitop.api.utils import ( # explicitly export these to appease mypy
@ -84,6 +93,14 @@ __all__ = [ # noqa: RUF022
'HostProcess',
'GpuProcess',
'command_join',
# nvitop.api.kubernetes
'kubernetes',
'KubernetesClient',
'KubernetesError',
'KubernetesInfo',
'is_kubernetes_environment',
'extract_pod_from_pid',
'get_kubernetes_info',
# nvitop.api.collector
'take_snapshots',
'collect_in_background',

717
nvitop/api/kubernetes.py Normal file
View file

@ -0,0 +1,717 @@
"""Kubernetes integration module for extracting pod information from processes."""
from __future__ import annotations
import os
import re
import threading
from contextlib import suppress
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from nvitop.api.utils import NA, NaType, memoize_when_activated
if TYPE_CHECKING:
from typing_extensions import Self
try:
from kubernetes import config
from kubernetes.client import CoreV1Api
KUBERNETES_AVAILABLE = True
except ImportError:
KUBERNETES_AVAILABLE = False
config = None
CoreV1Api = None
__all__ = [
'KubernetesClient',
'KubernetesInfo',
'extract_pod_from_pid',
'get_kubernetes_client',
'get_kubernetes_info',
'is_kubernetes_environment',
]
def _ensure_kubernetes_available() -> None:
"""Raise ImportError if Kubernetes package is not available."""
if not KUBERNETES_AVAILABLE:
raise ImportError('kubernetes package not available')
@dataclass
class KubernetesInfo:
"""Container for Kubernetes pod and container information."""
pod_name: str | NaType
pod_namespace: str | NaType
pod_uid: str | NaType
container_name: str | NaType
container_id: str | NaType
node_name: str | NaType
# Group related metadata to reduce attribute count
metadata: dict[str, Any] | NaType
@property
def pod_labels(self) -> dict[str, str] | NaType:
"""Get pod labels from metadata."""
if isinstance(self.metadata, dict):
return self.metadata.get('labels', {})
return NA
@property
def nvidia_gpu_requests(self) -> int | NaType:
"""Get NVIDIA GPU requests from metadata."""
if isinstance(self.metadata, dict):
return self.metadata.get('gpu_requests', NA)
return NA
@property
def nvidia_gpu_limits(self) -> int | NaType:
"""Get NVIDIA GPU limits from metadata."""
if isinstance(self.metadata, dict):
return self.metadata.get('gpu_limits', NA)
return NA
class KubernetesError(Exception):
"""Exception raised for Kubernetes-related errors."""
def is_kubernetes_environment() -> bool:
"""Check if the current process is running in a Kubernetes environment.
Returns:
True if running in Kubernetes, False otherwise.
"""
if os.getenv('KUBERNETES_SERVICE_HOST') is not None:
return True
# Check for Kubernetes service account token (standard K8s path, not a password)
k8s_serviceaccount_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
if os.path.isfile(k8s_serviceaccount_path):
return True
try:
if os.path.isfile('/proc/1/cgroup'):
with open('/proc/1/cgroup', encoding='utf-8') as f:
cgroup_content = f.read()
if (
'docker' in cgroup_content
or 'containerd' in cgroup_content
or 'crio' in cgroup_content
):
return True
except OSError:
pass
return False
def extract_pod_from_pid(pid: int) -> dict[str, str | None] | None:
"""Extract pod and container information from process PID using /proc filesystem.
Args:
pid: Process ID to extract information from.
Returns:
Dictionary containing pod info or None if not found.
"""
try:
cgroup_path = f'/proc/{pid}/cgroup'
if not os.path.isfile(cgroup_path):
return None
container_id = None
pod_uid = None
with open(cgroup_path, encoding='utf-8') as f:
for line in f:
line = line.strip()
if '::' in line:
_, cgroup_path = line.split('::', 1)
else:
parts = line.split(':')
if len(parts) >= 3:
cgroup_path = parts[2]
if 'kubepods' in cgroup_path:
# Extract pod UID using improved regex
pod_uid_pattern = r'pod([a-f0-9_-]+)\.slice'
pod_match = re.search(pod_uid_pattern, cgroup_path)
if pod_match:
pod_uid = pod_match.group(1)
# Extract container ID using improved regex
container_id_pattern = r'cri-[^-]+-([a-f0-9]{12,})'
container_match = re.search(container_id_pattern, cgroup_path)
if container_match:
container_id = container_match.group(1)
def _create_container_info(container_id: str, pod_uid: str | None) -> dict[str, str | None]:
"""Create container info dictionary."""
return {
'container_id': container_id,
'pod_uid': pod_uid,
'pod_name': None,
'namespace': None,
}
return None if container_id is None else _create_container_info(container_id, pod_uid)
except (OSError, ValueError):
return None
class KubernetesClient:
"""Minimal Kubernetes API client for pod information retrieval."""
_instance: KubernetesClient | None = None
_lock: threading.Lock = threading.Lock()
def __new__(
cls,
kubeconfig_path: str | None = None,
context: str | None = None,
use_incluster_config: bool = True,
) -> Self:
"""Singleton pattern for Kubernetes client with configuration support."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance # type: ignore[return-value]
def __init__(
self,
kubeconfig_path: str | None = None,
context: str | None = None,
use_incluster_config: bool = True,
) -> None:
"""Initialize the Kubernetes client with optional kubeconfig support.
Args:
kubeconfig_path: Path to kubeconfig file (defaults to ~/.kube/config or KUBECONFIG)
context: Kubernetes context to use (defaults to current-context)
use_incluster_config: Whether to fall back to in-cluster config
"""
self._kubeconfig_path = kubeconfig_path
self._context = context
self._use_incluster_config = use_incluster_config
if not hasattr(self, '_initialized'):
self._initialized = True
self._k8s_loaded = False
self._load_error: str | None = None
self._setup_client()
def _setup_client(self) -> None:
try:
_ensure_kubernetes_available()
if self._kubeconfig_path:
config.load_kube_config(
config_file=self._kubeconfig_path,
context=self._context,
)
else:
load_kwargs = {'context': self._context} if self._context else {}
env_paths = [
os.getenv('KUBECONFIG'),
os.path.expanduser('~/.kube/config'),
]
for path in env_paths:
if path and os.path.isfile(path):
config.load_kube_config(config_file=path, **load_kwargs)
break
else:
if self._use_incluster_config or not is_kubernetes_environment():
config.load_config(**load_kwargs)
self._k8s_loaded = True
except (ImportError, OSError, KeyError) as e:
self._load_error = str(e)
self._k8s_loaded = False
@property
def is_available(self) -> bool:
"""Check if Kubernetes API is available."""
return self._k8s_loaded
@staticmethod
def list_available_contexts(kubeconfig_path: str | None = None) -> list[str]:
"""List all available contexts in kubeconfig file.
Args:
kubeconfig_path: Path to kubeconfig file (defaults to KUBECONFIG or ~/.kube/config).
Returns:
List of context names, empty list if kubeconfig is not available or invalid.
"""
try:
if not KUBERNETES_AVAILABLE:
return []
if kubeconfig_path is None:
kubeconfig_path = os.getenv('KUBECONFIG') or os.path.expanduser('~/.kube/config')
if not os.path.isfile(kubeconfig_path):
return []
contexts, _ = config.list_kube_config_contexts(config_file=kubeconfig_path)
return [ctx['name'] for ctx in contexts]
except (ImportError, OSError, KeyError, ValueError):
return []
@staticmethod
def get_current_context(kubeconfig_path: str | None = None) -> str | None:
"""Get the currently active context from kubeconfig.
Args:
kubeconfig_path: Path to kubeconfig file (defaults to KUBECONFIG or ~/.kube/config).
Returns:
Current context name, or None if not available.
"""
try:
if not KUBERNETES_AVAILABLE:
return None
if kubeconfig_path is None:
kubeconfig_path = os.getenv('KUBECONFIG') or os.path.expanduser('~/.kube/config')
if not os.path.isfile(kubeconfig_path):
return None
_, current_context = config.list_kube_config_contexts(
config_file=kubeconfig_path,
)
return current_context.get('name') if current_context else None
except (ImportError, OSError, KeyError, ValueError):
return None
def extract_nvidia_gpu_resources(
self,
pod_spec: dict,
container_name: str | None = None,
_container_id: str | None = None,
) -> tuple[int, int]:
"""Extract NVIDIA GPU resources from pod specification.
Args:
pod_spec: Pod specification dictionary from Kubernetes API.
container_name: Specific container name to extract from (if None, uses first container).
container_id: Container ID to match (if provided, prioritized over container_name).
Returns:
Tuple of (gpu_requests, gpu_limits) as integers.
"""
containers = pod_spec.get('containers', [])
if container_name:
containers = [c for c in containers if c.get('name') == container_name]
container = containers[0] if containers else {}
resources = container.get('resources', {})
requests = resources.get('requests', {})
limits = resources.get('limits', {})
gpu_requests = 0
gpu_limits = 0
if 'nvidia.com/gpu' in requests:
try:
gpu_requests = int(requests['nvidia.com/gpu'])
except (ValueError, TypeError):
gpu_requests = 0
if 'nvidia.com/gpu' in limits:
try:
gpu_limits = int(limits['nvidia.com/gpu'])
except (ValueError, TypeError):
gpu_limits = 0
return gpu_requests, gpu_limits
def _get_pods_from_namespace(self, api: Any, namespace: str) -> list:
"""Get pods from a single namespace, handling exceptions."""
return self._extract_pod_items_from_namespace(api, namespace)
def _extract_pod_items_from_namespace(self, api: Any, namespace: str) -> list:
"""Extract pod items from a namespace API call."""
with suppress(ImportError, OSError, KeyError, ValueError):
pods = api.list_namespaced_pod(namespace=namespace)
return pods.items
return []
def _search_pods_in_namespaces(
self,
api: Any,
namespaces: list[str],
pod_uid: str,
convert_uid: bool = True,
) -> KubernetesInfo | None:
"""Search for pod in list of namespaces without try-except in inner loop."""
for namespace in namespaces:
pods = self._get_pods_from_namespace(api, namespace)
for pod in pods:
pod_info = self._extract_pod_info(pod, pod_uid, convert_uid)
if pod_info is not None:
return pod_info
return None
def _extract_pod_info(
self,
pod: Any,
pod_uid: str,
convert_uid: bool = True,
) -> KubernetesInfo | None:
"""Extract pod information safely without exceptions in loops."""
try:
# Convert cgroup pod UID (underscores) to Kubernetes UID (dashes) if needed
target_uid = pod_uid.replace('_', '-') if convert_uid else pod_uid
if pod.metadata.uid == target_uid:
gpu_requests, gpu_limits = self.extract_nvidia_gpu_resources(
pod.spec.to_dict(),
)
return KubernetesInfo(
pod_name=pod.metadata.name,
pod_namespace=pod.metadata.namespace,
pod_uid=pod.metadata.uid,
container_name=NA,
container_id=NA,
node_name=pod.spec.node_name,
metadata={
'labels': pod.metadata.labels or {},
'gpu_requests': gpu_requests,
'gpu_limits': gpu_limits,
},
)
except (ImportError, OSError, KeyError, ValueError, AttributeError):
pass
return None
def find_container_name_by_id(self, pod: Any, container_id: str) -> str | None:
"""Find container name by container ID using pod status information.
Args:
pod: Kubernetes pod object from API.
container_id: Container ID to match (can be short or full ID).
Returns:
Container name if found, None otherwise.
"""
try:
if hasattr(pod.status, 'container_statuses') and pod.status.container_statuses:
for container_status in pod.status.container_statuses:
if hasattr(container_status, 'container_id') and container_status.container_id:
k8s_container_id = container_status.container_id
if '://' in k8s_container_id:
k8s_container_id = k8s_container_id.split('://', 1)[1]
if (
k8s_container_id == container_id
or k8s_container_id.startswith(container_id)
or container_id.startswith(k8s_container_id[:12])
):
return (
container_status.name if hasattr(container_status, 'name') else None
)
except (AttributeError, TypeError, KeyError):
pass
return None
@memoize_when_activated
def get_pod_info(
self,
pod_name: str,
namespace: str | None = None,
) -> KubernetesInfo:
"""Get pod information using official Kubernetes client.
Args:
pod_name: Name of the pod.
namespace: Namespace of the pod (defaults to current namespace).
Returns:
KubernetesInfo object with pod details.
"""
if not self.is_available:
return KubernetesInfo(NA, NA, NA, NA, NA, NA, NA)
try:
_ensure_kubernetes_available()
api = CoreV1Api()
pod = api.read_namespaced_pod(
name=pod_name,
namespace=namespace or 'default',
)
metadata = pod.metadata
spec = pod.spec
gpu_requests, gpu_limits = self.extract_nvidia_gpu_resources(
spec.to_dict(),
)
return KubernetesInfo(
pod_name=metadata.name,
pod_namespace=metadata.namespace,
pod_uid=metadata.uid,
container_name=NA, # Would need additional logic to determine container
container_id=NA,
node_name=spec.node_name,
metadata={
'labels': metadata.labels or {},
'gpu_requests': gpu_requests,
'gpu_limits': gpu_limits,
},
)
except (ImportError, OSError, KeyError, ValueError):
return KubernetesInfo(NA, NA, NA, NA, NA, NA, NA)
@memoize_when_activated
def get_pod_by_uid(self, pod_uid: str) -> KubernetesInfo:
"""Get pod information by UID using official Kubernetes client.
Args:
pod_uid: UID of the pod.
Returns:
KubernetesInfo object with pod details.
"""
if not self.is_available:
return KubernetesInfo(NA, NA, NA, NA, NA, NA, NA)
try:
_ensure_kubernetes_available()
api = CoreV1Api()
# First try common namespaces
common_namespaces = ['default', 'kube-system', 'kube-public']
result = self._search_pods_in_namespaces(
api,
common_namespaces,
pod_uid,
convert_uid=True,
)
if result is not None:
return result
# If not found, try all namespaces
try:
namespaces = api.list_namespace()
namespace_list = [ns.metadata.name for ns in namespaces.items]
result = self._search_pods_in_namespaces(
api,
namespace_list,
pod_uid,
convert_uid=True,
)
if result is not None:
return result
except (ImportError, OSError, KeyError, ValueError):
# Fallback to listing all pods
pods = api.list_pod_for_all_namespaces()
for pod in pods.items:
pod_info = self._extract_pod_info(pod, pod_uid, convert_uid=False)
if pod_info is not None:
return pod_info
return KubernetesInfo(NA, NA, NA, NA, NA, NA, NA)
except (ImportError, OSError, KeyError, ValueError):
return KubernetesInfo(NA, NA, NA, NA, NA, NA, NA)
class _KubernetesClientSingleton:
"""Thread-safe singleton for Kubernetes client."""
_instance: KubernetesClient | None = None
_lock = threading.Lock()
@classmethod
def get_client(
cls,
kubeconfig_path: str | None = None,
context: str | None = None,
use_incluster_config: bool = True,
) -> KubernetesClient:
"""Get the global Kubernetes client instance with optional configuration.
Args:
kubeconfig_path: Path to kubeconfig file.
context: Kubernetes context to use.
use_incluster_config: Whether to fall back to in-cluster config.
Returns:
KubernetesClient instance.
"""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = KubernetesClient(
kubeconfig_path=kubeconfig_path,
context=context,
use_incluster_config=use_incluster_config,
)
return cls._instance
@classmethod
def reset_instance(cls) -> None:
"""Reset the singleton instance. Useful for testing."""
with cls._lock:
cls._instance = None
def _get_kubernetes_client(
kubeconfig_path: str | None = None,
context: str | None = None,
use_incluster_config: bool = True,
) -> KubernetesClient:
"""Get the global Kubernetes client instance with optional configuration.
Args:
kubeconfig_path: Path to kubeconfig file.
context: Kubernetes context to use.
use_incluster_config: Whether to fall back to in-cluster config.
Returns:
KubernetesClient instance.
"""
return _KubernetesClientSingleton.get_client(
kubeconfig_path=kubeconfig_path,
context=context,
use_incluster_config=use_incluster_config,
)
def get_kubernetes_client(
kubeconfig_path: str | None = None,
context: str | None = None,
use_incluster_config: bool = True,
) -> KubernetesClient:
"""Get a configured Kubernetes client instance.
Args:
kubeconfig_path: Path to kubeconfig file (defaults to KUBECONFIG or ~/.kube/config).
context: Kubernetes context to use (defaults to current-context).
use_incluster_config: Whether to fall back to in-cluster config.
Returns:
Configured KubernetesClient instance.
Examples:
>>> client = get_kubernetes_client() # Use default kubeconfig
>>> client = get_kubernetes_client(context="prod") # Use specific context
>>> client = get_kubernetes_client("/path/to/config", "staging") # Use file and context
"""
return KubernetesClient(
kubeconfig_path=kubeconfig_path,
context=context,
use_incluster_config=use_incluster_config,
)
_container_pod_cache: dict[str, KubernetesInfo] = {}
_cache_lock: threading.Lock = threading.Lock()
@memoize_when_activated
def get_kubernetes_info(pid: int) -> KubernetesInfo:
"""Get Kubernetes information for a given process PID.
Args:
pid: Process ID to get Kubernetes information for.
Returns:
KubernetesInfo object with pod/container details.
"""
pod_info = extract_pod_from_pid(pid)
if pod_info is None:
return KubernetesInfo(NA, NA, NA, NA, NA, NA, NA)
container_id = pod_info.get('container_id')
if container_id:
with _cache_lock:
if container_id in _container_pod_cache:
return _container_pod_cache[container_id]
client = _get_kubernetes_client()
pod_uid = pod_info.get('pod_uid')
if pod_uid and client.is_available:
k8s_info = client.get_pod_by_uid(pod_uid)
if container_id and container_id is not NA and k8s_info.pod_name is not NA:
try:
_ensure_kubernetes_available()
api = CoreV1Api()
pod = api.read_namespaced_pod(
name=k8s_info.pod_name,
namespace=k8s_info.pod_namespace,
)
container_name = client.find_container_name_by_id(pod, container_id)
if container_name:
gpu_requests, gpu_limits = client.extract_nvidia_gpu_resources(
pod.spec.to_dict(),
container_name=container_name,
)
k8s_info = KubernetesInfo(
pod_name=k8s_info.pod_name,
pod_namespace=k8s_info.pod_namespace,
pod_uid=k8s_info.pod_uid,
container_name=container_name,
container_id=k8s_info.container_id,
node_name=k8s_info.node_name,
metadata={
'labels': k8s_info.metadata.get('labels', {})
if isinstance(k8s_info.metadata, dict)
else {},
'gpu_requests': gpu_requests,
'gpu_limits': gpu_limits,
},
)
if container_id:
with _cache_lock:
_container_pod_cache[container_id] = k8s_info
except (ImportError, OSError, KeyError, ValueError):
pass
if k8s_info.container_id is NA:
k8s_info.container_id = container_id or NA
return k8s_info
basic_info = KubernetesInfo(
pod_name=pod_info.get('pod_name') or NA,
pod_namespace=pod_info.get('namespace') or NA,
pod_uid=pod_info.get('pod_uid') or NA,
container_name=NA,
container_id=container_id or NA,
node_name=NA,
metadata={},
)
if container_id:
with _cache_lock:
_container_pod_cache[container_id] = basic_info
return basic_info

View file

@ -42,6 +42,27 @@ from nvitop.api.utils import (
)
# Optional Kubernetes integration
try:
from nvitop.api import kubernetes
from nvitop.api.kubernetes import KubernetesInfo
except ImportError:
kubernetes = None # type: ignore[assignment]
def kubernetes_info_fallback(**kwargs: Any) -> Any:
"""Fallback function for when kubernetes module is not available.
Args:
**kwargs: Arbitrary keyword arguments to be added to the fallback object.
Returns:
A dynamic object with the provided kwargs as attributes.
"""
return type('KubernetesInfo', (), kwargs)()
KubernetesInfo = kubernetes_info_fallback # type: ignore[misc, assignment]
if TYPE_CHECKING:
from collections.abc import Callable, Generator, Iterable
from typing_extensions import Self # Python 3.11+
@ -131,7 +152,11 @@ def auto_garbage_clean(
except KeyError:
pass
# See also `GpuProcess.failsafe`
if fallback is _RAISE or not getattr(_USE_FALLBACK_WHEN_RAISE, 'value', False):
if fallback is _RAISE or not getattr(
_USE_FALLBACK_WHEN_RAISE,
'value',
False,
):
raise
if isinstance(fallback, tuple):
if isinstance(ex, host.AccessDenied) and fallback == ('No Such Process',):
@ -317,7 +342,9 @@ class HostProcess(host.Process, ABC):
host.AccessDenied:
If the user does not have read privilege to the process' status file.
"""
return datetime.datetime.now() - datetime.datetime.fromtimestamp(self.create_time())
return datetime.datetime.now() - datetime.datetime.fromtimestamp(
self.create_time(),
)
def running_time_human(self) -> str:
"""Return the elapsed time this process has been running in human-readable format.
@ -413,10 +440,77 @@ class HostProcess(host.Process, ABC):
try:
self.cmdline.cache_activate(self) # type: ignore[attr-defined]
self.running_time.cache_activate(self) # type: ignore[attr-defined]
self._get_kubernetes_info.cache_activate(self) # type: ignore[attr-defined]
yield
finally:
self.cmdline.cache_deactivate(self) # type: ignore[attr-defined]
self.running_time.cache_deactivate(self) # type: ignore[attr-defined]
self._get_kubernetes_info.cache_deactivate(self) # type: ignore[attr-defined]
# Kubernetes integration methods
@memoize_when_activated
def _get_kubernetes_info(self) -> KubernetesInfo:
"""Get cached Kubernetes information for this process."""
if kubernetes is not None:
try:
return kubernetes.get_kubernetes_info(self.pid)
except (ImportError, kubernetes.KubernetesError, OSError):
pass
return KubernetesInfo(
pod_name=NA,
pod_namespace=NA,
pod_uid=NA,
container_name=NA,
container_id=NA,
node_name=NA,
metadata={},
)
@auto_garbage_clean(fallback=NA)
def pod_name(self) -> str | NaType:
"""Get the Kubernetes pod name if running in a pod."""
return self._get_kubernetes_info().pod_name
@auto_garbage_clean(fallback=NA)
def pod_namespace(self) -> str | NaType:
"""Get the Kubernetes pod namespace if running in a pod."""
return self._get_kubernetes_info().pod_namespace
@auto_garbage_clean(fallback=NA)
def pod_uid(self) -> str | NaType:
"""Get the Kubernetes pod UID if running in a pod."""
return self._get_kubernetes_info().pod_uid
@auto_garbage_clean(fallback=NA)
def container_name(self) -> str | NaType:
"""Get the container name if running in a container."""
return self._get_kubernetes_info().container_name
@auto_garbage_clean(fallback=NA)
def container_id(self) -> str | NaType:
"""Get the container ID if running in a container."""
return self._get_kubernetes_info().container_id
@auto_garbage_clean(fallback=NA)
def node_name(self) -> str | NaType:
"""Get the Kubernetes node name if running in a pod."""
return self._get_kubernetes_info().node_name
@auto_garbage_clean(fallback=NA)
def pod_labels(self) -> dict[str, str] | NaType:
"""Get the Kubernetes pod labels if running in a pod."""
return self._get_kubernetes_info().pod_labels
@auto_garbage_clean(fallback=NA)
def nvidia_gpu_requests(self) -> int | NaType:
"""Get the number of NVIDIA GPUs requested by this process's container."""
return self._get_kubernetes_info().nvidia_gpu_requests
@auto_garbage_clean(fallback=NA)
def nvidia_gpu_limits(self) -> int | NaType:
"""Get the number of NVIDIA GPUs limited to this process's container."""
return self._get_kubernetes_info().nvidia_gpu_limits
def as_snapshot(
self,
@ -551,7 +645,9 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
def __hash__(self) -> int:
"""Return a hash value of the GPU process."""
if self._hash is None: # pylint: disable=access-member-before-definition
self._hash = hash(self._ident) # pylint: disable=attribute-defined-outside-init
self._hash = hash(
self._ident,
) # pylint: disable=attribute-defined-outside-init
return self._hash
def __getattr__(self, name: str) -> Any | Callable[..., Any]:
@ -639,7 +735,10 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
self._gpu_memory_human = bytes2human(self.gpu_memory())
memory_total = self.device.memory_total()
gpu_memory_percent = NA
if libnvml.nvmlCheckReturn(memory_used, int) and libnvml.nvmlCheckReturn(memory_total, int):
if libnvml.nvmlCheckReturn(memory_used, int) and libnvml.nvmlCheckReturn(
memory_total,
int,
):
gpu_memory_percent = round(100.0 * memory_used / memory_total, 1) # type: ignore[assignment]
self._gpu_memory_percent = gpu_memory_percent
@ -1002,6 +1101,43 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
gpu_decoder_utilization=self.gpu_decoder_utilization(),
)
# Kubernetes integration methods - delegate to host process
def pod_name(self) -> str | NaType:
"""Get the Kubernetes pod name if running in a pod."""
return self.host.pod_name()
def pod_namespace(self) -> str | NaType:
"""Get the Kubernetes pod namespace if running in a pod."""
return self.host.pod_namespace()
def pod_uid(self) -> str | NaType:
"""Get the Kubernetes pod UID if running in a pod."""
return self.host.pod_uid()
def container_name(self) -> str | NaType:
"""Get the container name if running in a container."""
return self.host.container_name()
def container_id(self) -> str | NaType:
"""Get the container ID if running in a container."""
return self.host.container_id()
def node_name(self) -> str | NaType:
"""Get the Kubernetes node name if running in a pod."""
return self.host.node_name()
def pod_labels(self) -> dict[str, str] | NaType:
"""Get the Kubernetes pod labels if running in a pod."""
return self.host.pod_labels()
def nvidia_gpu_requests(self) -> int | NaType:
"""Get the number of NVIDIA GPUs requested by this process's container."""
return self.host.nvidia_gpu_requests()
def nvidia_gpu_limits(self) -> int | NaType:
"""Get the number of NVIDIA GPUs limited to this process's container."""
return self.host.nvidia_gpu_limits()
@classmethod
def take_snapshots( # batched version of `as_snapshot`
cls,

View file

@ -364,8 +364,14 @@ class ProcessPanel(BaseSelectablePanel): # pylint: disable=too-many-instance-at
time_length = max(4, max((len(p.running_time_human) for p in snapshots), default=4))
for snapshot in snapshots:
if hasattr(snapshot, 'pod_name') and snapshot.pod_name not in ('N/A', '', None):
k8s_info = f'[{snapshot.pod_name}/{snapshot.pod_namespace}]'
else:
k8s_info = 'NA'
snapshot.host_info = WideString(
'{:>5} {:>5} {} {}'.format(
'{:<20} {:>5} {:>5} {} {}'.format(
k8s_info,
snapshot.cpu_percent_string.replace('%', ''),
snapshot.memory_percent_string.replace('%', ''),
' ' * (time_length - len(snapshot.running_time_human))
@ -386,11 +392,12 @@ class ProcessPanel(BaseSelectablePanel): # pylint: disable=too-many-instance-at
time.sleep(self.SNAPSHOT_INTERVAL)
def header_lines(self) -> list[str]:
pod_headers = ['POD', *self.host_headers]
header = [
'' + '' * (self.width - 2) + '',
'{}'.format('Processes:'.ljust(self.width - 4)),
r'│ GPU PID USER GPU-MEM %SM %GMBW {}'.format(
' '.join(self.host_headers).ljust(self.width - 46),
' '.join(pod_headers).ljust(self.width - 46),
),
'' + '' * (self.width - 2) + '',
]

View file

@ -49,6 +49,7 @@ dependencies = [
# Sync with nvitop/version.py and requirements.txt
"nvidia-ml-py >= 11.450.51, < 13.581.0a0",
"psutil >= 5.6.6",
"kubernetes >= 28.0.0, < 35.0.0",
"colorama >= 0.4.0; platform_system == 'Windows'",
"windows-curses >= 2.2.0; platform_system == 'Windows'",
]
@ -100,8 +101,8 @@ messages-control.disable = [
"duplicate-code",
"wrong-import-order",
]
spelling.spelling-dict = "en_US"
spelling.spelling-private-dict-file = "docs/source/spelling_wordlist.txt"
# spelling.spelling-dict = "en_US" # Disabled due to missing dictionary
# spelling.spelling-private-dict-file = "docs/source/spelling_wordlist.txt"
[tool.codespell]
ignore-words = "docs/source/spelling_wordlist.txt"

View file

@ -1,5 +1,6 @@
# Sync with pyproject.toml and nvitop/version.py
nvidia-ml-py >= 11.450.51, < 13.581.0a0
psutil >= 5.6.6
kubernetes >= 28.0.0, < 35.0.0
colorama >= 0.4.0; platform_system == 'Windows'
windows-curses >= 2.2.0; platform_system == 'Windows'