diff --git a/.flake8 b/.flake8 index bee04d3..a194617 100644 --- a/.flake8 +++ b/.flake8 @@ -20,6 +20,7 @@ per-file-ignores = # F401: module imported but unused # intentionally unused imports __init__.py: F401 + nvitop/api/host.py: F401 # SIM113: use enumarate # false positive nvitop/gui/screens/main/process.py: SIM113 diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 125d942..f1d16ef 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -72,7 +72,7 @@ jobs: python -m venv venv && ( source venv/bin/activate && - python -m pip install --upgrade pip setuptools pre-commit pylint[spelling] + python -m pip install --upgrade pip setuptools pre-commit pylint[spelling] mypy typing-extensions python -m pip install -r requirements.txt && python -m pre_commit install --install-hooks && python -m pre_commit run --all-files && diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index 5f713fc..8c5a8cd 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -69,7 +69,7 @@ jobs: - name: Install linters run: | - python -m pip install --upgrade pre-commit pylint[spelling] + python -m pip install --upgrade pre-commit pylint[spelling] mypy typing-extensions - name: pre-commit run: | diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c422db1..b56f308 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -25,7 +25,7 @@ repos: - id: debug-statements - id: double-quote-string-fixer - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.0.262 + rev: v0.0.263 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] @@ -38,10 +38,10 @@ repos: hooks: - id: black - repo: https://github.com/asottile/pyupgrade - rev: v3.3.1 + rev: v3.3.2 hooks: - id: pyupgrade - args: [--py37-plus] + args: [--py37-plus] # sync with requires-python - repo: https://github.com/pycqa/flake8 rev: 6.0.0 hooks: @@ -59,8 +59,8 @@ repos: - repo: https://github.com/codespell-project/codespell rev: v2.2.4 hooks: - - id: codespell - additional_dependencies: [".[toml]"] + - id: codespell + additional_dependencies: [".[toml]"] - repo: local hooks: - id: pylint @@ -80,3 +80,11 @@ repos: ^nvitop/callbacks/| ^docs/ ) + - repo: local + hooks: + - id: mypy + name: mypy + entry: mypy + language: system + types_or: [python, pyi] + require_serial: true diff --git a/CHANGELOG.md b/CHANGELOG.md index 922ee47..37e97ea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- +- Add `mypy` integration and update type annotations by [@XuehaiPan](https://github.com/XuehaiPan) in [#73](https://github.com/XuehaiPan/nvitop/pull/73). ### Changed @@ -33,7 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed -- Further isolate the `CUDA_VISIBLE_DEVICE` parser in a subprocess by [@XuehaiPan](https://github.com/XuehaiPan) in [#70](https://github.com/XuehaiPan/nvitop/pull/70). +- Further isolate the `CUDA_VISIBLE_DEVICES` parser in a subprocess by [@XuehaiPan](https://github.com/XuehaiPan) in [#70](https://github.com/XuehaiPan/nvitop/pull/70). ------ diff --git a/README.md b/README.md index 0aa898e..d19764b 100644 --- a/README.md +++ b/README.md @@ -686,6 +686,7 @@ for device in devices: ```python In [1]: from nvitop import take_snapshots, Device ...: import os + ...: os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' ...: os.environ['CUDA_VISIBLE_DEVICES'] = '1,0' # comma-separated integers or UUID strings In [2]: take_snapshots() # equivalent to `take_snapshots(Device.all())` @@ -763,6 +764,7 @@ Please refer to section [Low-level APIs](#low-level-apis) for more information. ```python In [1]: from nvitop import ResourceMetricCollector, Device ...: import os + ...: os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' ...: os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0' # comma-separated integers or UUID strings In [2]: collector = ResourceMetricCollector() # log all devices and descendant processes of the current process on the GPUs @@ -983,6 +985,7 @@ In [1]: from nvitop import ( ...: NA, ...: ) ...: import os + ...: os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' ...: os.environ['CUDA_VISIBLE_DEVICES'] = '9,8,7,6' # comma-separated integers or UUID strings In [2]: Device.driver_version() diff --git a/docs/source/spelling_wordlist.txt b/docs/source/spelling_wordlist.txt index eb9fe73..a5528ea 100644 --- a/docs/source/spelling_wordlist.txt +++ b/docs/source/spelling_wordlist.txt @@ -140,3 +140,4 @@ noqa uptime ot oT +mypy diff --git a/nvitop/api/collector.py b/nvitop/api/collector.py index 940d864..ea81578 100644 --- a/nvitop/api/collector.py +++ b/nvitop/api/collector.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== - """Resource metrics collectors.""" from __future__ import annotations @@ -26,7 +25,7 @@ import os import threading import time from collections import OrderedDict, defaultdict -from typing import Callable, Hashable, Iterable, NamedTuple +from typing import Callable, Generator, Iterable, NamedTuple, TypeVar from weakref import WeakSet from nvitop.api import host @@ -46,7 +45,10 @@ class SnapshotResult(NamedTuple): # pylint: disable=missing-class-docstring timer = time.monotonic -def _unique(iterable: Iterable[Hashable]) -> list[Hashable]: +_T = TypeVar('_T') + + +def _unique(iterable: Iterable[_T]) -> list[_T]: return list(OrderedDict.fromkeys(iterable).keys()) @@ -78,6 +80,7 @@ def take_snapshots( Examples: >>> from nvitop import take_snapshots, Device >>> import os + >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '1,0' >>> take_snapshots() # equivalent to `take_snapshots(Device.all())` @@ -145,8 +148,8 @@ def take_snapshots( if isinstance(gpu_processes, GpuProcess): gpu_processes = [gpu_processes] - if gpu_processes is not None: - if gpu_processes: # is not False or is a non-empty list/tuple + if gpu_processes is not None and gpu_processes is not True: + if gpu_processes: # is a non-empty list/tuple gpu_processes = list(gpu_processes) process_devices = _unique(process.device for process in gpu_processes) for device in process_devices: @@ -161,7 +164,7 @@ def take_snapshots( if devices is None: physical_devices = Device.all() devices = [] - leaf_devices = [] + leaf_devices: list[Device] = [] for physical_device in physical_devices: devices.append(physical_device) mig_devices = physical_device.mig_devices() @@ -176,7 +179,7 @@ def take_snapshots( itertools.chain.from_iterable(device.processes().values() for device in leaf_devices), ) - devices = [device.as_snapshot() for device in devices] + devices = [device.as_snapshot() for device in devices] # type: ignore[union-attr] gpu_processes = GpuProcess.take_snapshots(gpu_processes, failsafe=True) return SnapshotResult(devices, gpu_processes) @@ -254,22 +257,22 @@ def collect_in_background( def target() -> None: if on_start is not None: - on_start(collector) + on_start(collector) # type: ignore[arg-type] try: - with collector(tag): + with collector(tag): # type: ignore[misc] try: - next_snapshot = timer() + interval - while on_collect(collector.collect()): + next_snapshot = timer() + interval # type: ignore[operator] + while on_collect(collector.collect()): # type: ignore[union-attr] time.sleep(max(0.0, next_snapshot - timer())) - next_snapshot += interval + next_snapshot += interval # type: ignore[operator] except KeyboardInterrupt: pass finally: if on_stop is not None: - on_stop(collector) + on_stop(collector) # type: ignore[arg-type] daemon = threading.Thread(target=target, name=tag, daemon=True) - daemon.collector = collector + daemon.collector = collector # type: ignore[attr-defined] if start: daemon.start() return daemon @@ -304,6 +307,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes Examples: >>> import os + >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0' >>> from nvitop import ResourceMetricCollector, Device @@ -404,13 +408,13 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes if devices is None: devices = Device.all() - root_pids = {os.getpid()} if root_pids is None else set(root_pids) + root_pids: set[int] = {os.getpid()} if root_pids is None else set(root_pids) - self.interval = interval + self.interval: float = interval - self.devices = list(devices) - self.all_devices = [] - self.leaf_devices = [] + self.devices: list[Device] = list(devices) + self.all_devices: list[Device] = [] + self.leaf_devices: list[Device] = [] for device in self.devices: self.all_devices.append(device) mig_devices = device.mig_devices() @@ -420,21 +424,23 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes else: self.leaf_devices.append(device) - self.root_pids = root_pids - self._positive_processes = WeakSet(HostProcess(pid) for pid in self.root_pids) - self._negative_processes = WeakSet() + self.root_pids: set[int] = root_pids + self._positive_processes: WeakSet[HostProcess] = WeakSet( + HostProcess(pid) for pid in self.root_pids + ) + self._negative_processes: WeakSet[HostProcess] = WeakSet() - self._last_timestamp = timer() - 2.0 * self.interval - self._lock = threading.RLock() - self._metric_buffer = None - self._tags = set() + self._last_timestamp: float = timer() - 2.0 * self.interval + self._lock: threading.RLock = threading.RLock() + self._metric_buffer: _MetricBuffer | None = None + self._tags: set[str] = set() - self._daemon = threading.Thread( + self._daemon: threading.Thread = threading.Thread( name='gpu_metric_collector_daemon', target=self._target, daemon=True, ) - self._daemon_running = threading.Event() + self._daemon_running: threading.Event = threading.Event() def activate(self, tag: str) -> ResourceMetricCollector: """Start a new metric collection with the given tag. @@ -500,7 +506,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes if buffer.tag == tag: self._metric_buffer = buffer.prev break - buffer = buffer.prev + buffer = buffer.prev # type: ignore[assignment] if self._metric_buffer is None: self._daemon_running.clear() @@ -510,7 +516,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes stop = deactivate @contextlib.contextmanager - def context(self, tag: str) -> ResourceMetricCollector: + def context(self, tag: str) -> Generator[ResourceMetricCollector, None, None]: """A context manager for starting and stopping resource metric collection. Args: @@ -578,7 +584,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes buffer.clear() if buffer.tag == tag: break - buffer = buffer.prev + buffer = buffer.prev # type: ignore[assignment] def collect(self) -> dict[str, float]: """Get the average resource consumption during collection.""" @@ -665,7 +671,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes def take_snapshots(self) -> SnapshotResult: """Take snapshots of the current resource metrics and update the metric buffer.""" if len(self.root_pids) > 0: - all_gpu_processes = [] + all_gpu_processes: list[GpuProcess] = [] for device in self.leaf_devices: all_gpu_processes.extend(device.processes().values()) @@ -685,7 +691,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes positive = True break try: - p = p.parent() + p = p.parent() # type: ignore[assignment] except host.PsutilError: break if positive: @@ -700,8 +706,8 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes timestamp = timer() metrics = {} - devices = [device.as_snapshot() for device in self.all_devices] - gpu_processes = GpuProcess.take_snapshots(gpu_processes, failsafe=True) + device_snapshots = [device.as_snapshot() for device in self.all_devices] + gpu_process_snapshots = GpuProcess.take_snapshots(gpu_processes, failsafe=True) metrics.update( { @@ -722,23 +728,23 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes ) device_identifiers = {} - for device in devices: - identifier = f'gpu:{device.index}' - if isinstance(device.real, CudaDevice): - identifier = f'cuda:{device.cuda_index} ({identifier})' - device_identifiers[device.real] = identifier + for device_snapshot in device_snapshots: + identifier = f'gpu:{device_snapshot.index}' + if isinstance(device_snapshot.real, CudaDevice): + identifier = f'cuda:{device_snapshot.cuda_index} ({identifier})' + device_identifiers[device_snapshot.real] = identifier for attr, name, unit in self.DEVICE_METRICS: - value = float(getattr(device, attr)) / unit + value = float(getattr(device_snapshot, attr)) / unit metrics[f'{identifier}/{name}'] = value - for process in gpu_processes: - device_identifier = device_identifiers[process.device] - identifier = f'pid:{process.pid}' + for process_snapshot in gpu_process_snapshots: + device_identifier = device_identifiers[process_snapshot.device] + identifier = f'pid:{process_snapshot.pid}' for attr, scope, name, unit in self.PROCESS_METRICS: scope = scope or device_identifier - value = float(getattr(process, attr)) / unit + value = float(getattr(process_snapshot, attr)) / unit metrics[f'{identifier}/{scope}/{name}'] = value with self._lock: @@ -746,7 +752,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes self._metric_buffer.add(metrics, timestamp=timestamp) self._last_timestamp = timestamp - return SnapshotResult(devices, gpu_processes) + return SnapshotResult(device_snapshots, gpu_process_snapshots) def _target(self) -> None: self._daemon_running.wait() @@ -762,17 +768,20 @@ class _MetricBuffer: # pylint: disable=missing-class-docstring,missing-function collector: ResourceMetricCollector, prev: _MetricBuffer | None = None, ) -> None: - self.collector = collector - self.prev = prev + self.collector: ResourceMetricCollector = collector + self.prev: _MetricBuffer | None = prev - self.tag = tag + self.tag: str = tag + self.key_prefix: str if self.prev is not None: self.key_prefix = f'{self.prev.key_prefix}/{self.tag}' else: self.key_prefix = self.tag self.last_timestamp = self.start_timestamp = timer() - self.buffer = defaultdict(lambda: _StatisticsMaintainer(self.last_timestamp)) + self.buffer: defaultdict[str, _StatisticsMaintainer] = defaultdict( + lambda: _StatisticsMaintainer(self.last_timestamp), + ) self.len = 0 @@ -817,13 +826,13 @@ class _MetricBuffer: # pylint: disable=missing-class-docstring,missing-function class _StatisticsMaintainer: # pylint: disable=missing-class-docstring,missing-function-docstring def __init__(self, timestamp: float) -> None: - self.start_timestamp = timestamp - self.last_timestamp = None - self.integral = None - self.last_value = None - self.min_value = None - self.max_value = None - self.has_nan = False + self.start_timestamp: float = timestamp + self.last_timestamp: float = math.nan + self.integral: float | None = None + self.last_value: float | None = None + self.min_value: float | None = None + self.max_value: float | None = None + self.has_nan: bool = False def add(self, value: float, timestamp: float | None = None) -> None: if timestamp is None: @@ -837,30 +846,32 @@ class _StatisticsMaintainer: # pylint: disable=missing-class-docstring,missing- self.integral = value * (timestamp - self.start_timestamp) self.last_value = self.min_value = self.max_value = value else: - self.integral += (value + self.last_value) * (timestamp - self.last_timestamp) / 2.0 + # pylint: disable-next=line-too-long + self.integral += (value + self.last_value) * (timestamp - self.last_timestamp) / 2.0 # type: ignore[operator] self.last_value = value - self.min_value = min(self.min_value, value) - self.max_value = max(self.max_value, value) + self.min_value = min(self.min_value, value) # type: ignore[type-var] + self.max_value = max(self.max_value, value) # type: ignore[type-var] self.last_timestamp = timestamp def mean(self) -> float: + if self.integral is None: + return math.nan + if self.has_nan: - if self.integral is None: - return math.nan return self.integral / (self.last_timestamp - self.start_timestamp) timestamp = timer() - integral = self.integral + self.last_value * (timestamp - self.last_timestamp) + integral = self.integral + self.last_value * (timestamp - self.last_timestamp) # type: ignore[operator] return integral / (timestamp - self.start_timestamp) def min(self) -> float: - if self.has_nan and self.min_value is None: + if self.min_value is None: return math.nan return self.min_value def max(self) -> float: - if self.has_nan and self.max_value is None: + if self.max_value is None: return math.nan return self.max_value diff --git a/nvitop/api/device.py b/nvitop/api/device.py index fcd3e3f..fa41546 100644 --- a/nvitop/api/device.py +++ b/nvitop/api/device.py @@ -69,6 +69,7 @@ Examples: ) >>> import os + >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0' >>> CudaDevice.count() # number of NVIDIA GPUs visible to CUDA applications @@ -113,13 +114,17 @@ import sys import textwrap import threading from collections import OrderedDict -from typing import Any, Callable, Iterable, NamedTuple +from typing import TYPE_CHECKING, Any, Callable, Generator, Hashable, Iterable, NamedTuple, overload from nvitop.api import libcuda, libcudart, libnvml from nvitop.api.process import GpuProcess from nvitop.api.utils import NA, NaType, Snapshot, boolify, bytes2human, memoize_when_activated +if TYPE_CHECKING: + from typing_extensions import Literal # Python 3.8+ + + __all__ = [ 'Device', 'PhysicalDevice', @@ -158,7 +163,7 @@ class UtilizationRates(NamedTuple): # in percentage # pylint: disable=missing-c decoder: int | NaType -_VALUE_OMITTED = object() +_VALUE_OMITTED: str = object() # type: ignore[assignment] class Device: # pylint: disable=too-many-instance-attributes,too-many-public-methods @@ -226,7 +231,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me # GPU UUID : `GPU-` # MIG UUID : `MIG-GPU-//` # MIG UUID (R470+): `MIG-` - UUID_PATTERN = re.compile( + UUID_PATTERN: re.Pattern = re.compile( r"""^ # full match (?:(?PMIG)-)? # prefix for MIG UUID (?:(?PGPU)-)? # prefix for GPU UUID @@ -243,10 +248,12 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me flags=re.VERBOSE, ) - GPU_PROCESS_CLASS = GpuProcess - cuda = None # defined in below + GPU_PROCESS_CLASS: type[GpuProcess] = GpuProcess + cuda: type[CudaDevice] = None # type: ignore[assignment] # defined in below """Shortcut for class :class:`CudaDevice`.""" + _nvml_index: int | tuple[int, int] + @classmethod def is_available(cls) -> bool: """Test whether there are any devices and the NVML library is successfully loaded.""" @@ -347,7 +354,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me @classmethod def all(cls) -> list[PhysicalDevice]: """Return a list of all physical devices in the system.""" - return cls.from_indices() + return cls.from_indices() # type: ignore[return-value] @classmethod def from_indices( @@ -388,7 +395,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me if isinstance(indices, int): indices = [indices] - return list(map(cls, indices)) + return list(map(cls, indices)) # type: ignore[arg-type] @staticmethod def from_cuda_visible_devices() -> list[CudaDevice]: @@ -408,8 +415,9 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me """ # pylint: disable=line-too-long visible_device_indices = Device.parse_cuda_visible_devices() - cuda_devices = [] - for cuda_index, device_index in enumerate(visible_device_indices): + device_index: int | tuple[int, int] + cuda_devices: list[CudaDevice] = [] + for cuda_index, device_index in enumerate(visible_device_indices): # type: ignore[assignment] cuda_devices.append(CudaDevice(cuda_index, nvml_index=device_index)) return cuda_devices @@ -552,7 +560,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me if cls is not Device: return super().__new__(cls) - match = None + match: re.Match | None = None if isinstance(index, str): match = cls.UUID_PATTERN.match(index) if match is not None: # passed by UUID @@ -608,17 +616,17 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me arg.encode() if isinstance(arg, str) else arg for arg in (index, uuid, bus_id) ) - self._name = NA - self._uuid = NA - self._bus_id = NA - self._memory_total = NA - self._memory_total_human = NA - self._is_mig_device = None - self._cuda_index = None - self._cuda_compute_capability = None + self._name: str = NA + self._uuid: str = NA + self._bus_id: str = NA + self._memory_total: int | NaType = NA + self._memory_total_human: str = NA + self._is_mig_device: bool | None = None + self._cuda_index: int | None = None + self._cuda_compute_capability: tuple[int, int] | NaType | None = None if index is not None: - self._nvml_index = index + self._nvml_index = index # type: ignore[assignment] try: self._handle = libnvml.nvmlQuery( 'nvmlDeviceGetHandleByIndex', @@ -647,21 +655,21 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me ) except libnvml.NVMLError_GpuIsLost: self._handle = None - self._nvml_index = NA + self._nvml_index = NA # type: ignore[assignment] self._name = 'ERROR: GPU is Lost' except libnvml.NVMLError_Unknown: self._handle = None - self._nvml_index = NA + self._nvml_index = NA # type: ignore[assignment] self._name = 'ERROR: Unknown' else: self._nvml_index = libnvml.nvmlQuery('nvmlDeviceGetIndex', self._handle) - self._max_clock_infos = ClockInfos(graphics=NA, sm=NA, memory=NA, video=NA) - self._timestamp = 0 - self._lock = threading.RLock() + self._max_clock_infos: ClockInfos = ClockInfos(graphics=NA, sm=NA, memory=NA, video=NA) + self._timestamp: int = 0 + self._lock: threading.RLock = threading.RLock() - self._ident = (self.index, self.uuid()) - self._hash = None + self._ident: tuple[Hashable, str] = (self.index, self.uuid()) + self._hash: int | None = None def __repr__(self) -> str: """Return a string representation of the device.""" @@ -709,7 +717,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me (8, 6) """ # pylint: disable=line-too-long try: - return super().__getattr__(name) + return super().__getattr__(name) # type: ignore[misc] except AttributeError: if name == '_cache': raise @@ -779,7 +787,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me An int for the physical device index. For MIG devices, returns the index of the parent physical device. """ - return self._nvml_index # will be overridden in MigDevice + return self._nvml_index # type: ignore[return-value] # will be overridden in MigDevice @property def handle(self) -> libnvml.c_nvmlDevice_t: @@ -800,7 +808,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me if self._cuda_index is None: visible_device_indices = self.parse_cuda_visible_devices() try: - self._cuda_index = visible_device_indices.index(self.index) + self._cuda_index = visible_device_indices.index(self.index) # type: ignore[arg-type] except ValueError as ex: raise RuntimeError( f'CUDA Error: Device(index={self.index}) is not visible to CUDA applications', @@ -1628,13 +1636,15 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me """ return self.is_mig_device() or not self.is_mig_mode_enabled() - def to_leaf_devices(self) -> list[PhysicalDevice | MigDevice | CudaDevice]: + def to_leaf_devices( + self, + ) -> list[PhysicalDevice] | list[MigDevice] | list[CudaDevice] | list[CudaMigDevice]: """Return a list of leaf devices. Note that a CUDA device is always a leaf device. """ if isinstance(self, CudaDevice) or self.is_leaf_device(): - return [self] + return [self] # type: ignore[return-value] return self.mig_devices() def processes(self) -> dict[int, GpuProcess]: @@ -1656,7 +1666,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me else: # Used GPU memory is `N/A` on Windows Display Driver Model (WDDM) # or on MIG-enabled GPUs - gpu_memory = NA + gpu_memory = NA # type: ignore[assignment] found_na = True proc = processes[p.pid] = self.GPU_PROCESS_CLASS( pid=p.pid, @@ -1740,7 +1750,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me # Modified from psutil (https://github.com/giampaolo/psutil) @contextlib.contextmanager - def oneshot(self) -> contextlib.AbstractContextManager: + def oneshot(self) -> Generator[None, None, None]: """A utility context manager which considerably speeds up the retrieval of multiple device information at the same time. Internally different device info (e.g. memory_info, utilization_rates, ...) may be fetched @@ -1781,22 +1791,22 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me yield else: try: - self.memory_info.cache_activate(self) - self.bar1_memory_info.cache_activate(self) - self.utilization_rates.cache_activate(self) - self.clock_infos.cache_activate(self) - self.max_clock_infos.cache_activate(self) - self.power_usage.cache_activate(self) - self.power_limit.cache_activate(self) + self.memory_info.cache_activate(self) # type: ignore[attr-defined] + self.bar1_memory_info.cache_activate(self) # type: ignore[attr-defined] + self.utilization_rates.cache_activate(self) # type: ignore[attr-defined] + self.clock_infos.cache_activate(self) # type: ignore[attr-defined] + self.max_clock_infos.cache_activate(self) # type: ignore[attr-defined] + self.power_usage.cache_activate(self) # type: ignore[attr-defined] + self.power_limit.cache_activate(self) # type: ignore[attr-defined] yield finally: - self.memory_info.cache_deactivate(self) - self.bar1_memory_info.cache_deactivate(self) - self.utilization_rates.cache_deactivate(self) - self.clock_infos.cache_deactivate(self) - self.max_clock_infos.cache_deactivate(self) - self.power_usage.cache_deactivate(self) - self.power_limit.cache_deactivate(self) + self.memory_info.cache_deactivate(self) # type: ignore[attr-defined] + self.bar1_memory_info.cache_deactivate(self) # type: ignore[attr-defined] + self.utilization_rates.cache_deactivate(self) # type: ignore[attr-defined] + self.clock_infos.cache_deactivate(self) # type: ignore[attr-defined] + self.max_clock_infos.cache_deactivate(self) # type: ignore[attr-defined] + self.power_usage.cache_deactivate(self) # type: ignore[attr-defined] + self.power_limit.cache_deactivate(self) # type: ignore[attr-defined] class PhysicalDevice(Device): @@ -1805,6 +1815,10 @@ class PhysicalDevice(Device): This is the real GPU installed in the system. """ + _nvml_index: int + index: int + nvml_index: int + @property def physical_index(self) -> int: """Zero based index of the GPU. Can change at each boot. @@ -1864,13 +1878,16 @@ class PhysicalDevice(Device): class MigDevice(Device): # pylint: disable=too-many-instance-attributes """Class for MIG devices.""" + _nvml_index: tuple[int, int] + nvml_index: tuple[int, int] + @classmethod def count(cls) -> int: """The number of total MIG devices aggregated over all physical devices.""" return len(cls.all()) @classmethod - def all(cls) -> list[MigDevice]: + def all(cls) -> list[MigDevice]: # type: ignore[override] """Return a list of MIG devices aggregated over all physical devices.""" mig_devices = [] for device in PhysicalDevice.all(): @@ -1878,7 +1895,7 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes return mig_devices @classmethod - def from_indices( # pylint: disable=signature-differs + def from_indices( # type: ignore[override] # pylint: disable=signature-differs cls, indices: Iterable[tuple[int, int]], ) -> list[MigDevice]: @@ -1929,19 +1946,19 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes index, uuid = (arg.encode() if isinstance(arg, str) else arg for arg in (index, uuid)) - self._name = NA - self._uuid = NA - self._bus_id = NA - self._memory_total = NA - self._memory_total_human = NA - self._gpu_instance_id = NA - self._compute_instance_id = NA - self._is_mig_device = True - self._cuda_index = None - self._cuda_compute_capability = None + self._name: str = NA + self._uuid: str = NA + self._bus_id: str = NA + self._memory_total: int | NaType = NA + self._memory_total_human: str = NA + self._gpu_instance_id: int | NaType = NA + self._compute_instance_id: int | NaType = NA + self._is_mig_device: bool = True + self._cuda_index: int | None = None + self._cuda_compute_capability: tuple[int, int] | NaType | None = None if index is not None: - self._nvml_index = index + self._nvml_index = index # type: ignore[assignment] self._handle = None parent = _get_global_physical_device() @@ -2047,7 +2064,7 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes The attributes are defined in :attr:`SNAPSHOT_KEYS`. """ snapshot = super().as_snapshot() - snapshot.mig_index = self.mig_index + snapshot.mig_index = self.mig_index # type: ignore[attr-defined] return snapshot @@ -2074,6 +2091,7 @@ class CudaDevice(Device): Examples: >>> import os + >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0' >>> CudaDevice.count() # number of NVIDIA GPUs visible to CUDA applications @@ -2123,6 +2141,10 @@ class CudaDevice(Device): If the index is out of range for the given ``CUDA_VISIBLE_DEVICES`` environment variable. """ # pylint: disable=line-too-long + _nvml_index: int + index: int + nvml_index: int + @classmethod def is_available(cls) -> bool: """Test whether there are any CUDA-capable devices available.""" @@ -2137,7 +2159,7 @@ class CudaDevice(Device): return 0 @classmethod - def all(cls) -> list[CudaDevice]: + def all(cls) -> list[CudaDevice]: # type: ignore[override] """All CUDA visible devices. Note: @@ -2146,7 +2168,7 @@ class CudaDevice(Device): return cls.from_indices() @classmethod - def from_indices( + def from_indices( # type: ignore[override] cls, indices: int | Iterable[int] | None = None, ) -> list[CudaDevice]: @@ -2184,7 +2206,7 @@ class CudaDevice(Device): *, nvml_index: int | tuple[int, int] | None = None, uuid: str | None = None, - ) -> Device: + ) -> CudaDevice: """Create a new instance of CudaDevice. The type of the result is determined by the given argument. @@ -2209,16 +2231,22 @@ class CudaDevice(Device): RuntimeError: If the index is out of range for the given ``CUDA_VISIBLE_DEVICES`` environment variable. """ + if nvml_index is not None and uuid is not None: + raise TypeError( + f'CudaDevice(cuda_index=None, nvml_index=None, uuid=None) takes 1 non-None arguments ' + f'but (cuda_index, nvml_index, uuid) = {(cuda_index, nvml_index, uuid)!r} were given', + ) + if cuda_index is not None and nvml_index is None and uuid is None: cuda_visible_devices = cls.parse_cuda_visible_devices() if not isinstance(cuda_index, int) or not 0 <= cuda_index < len(cuda_visible_devices): raise RuntimeError(f'CUDA Error: invalid device ordinal: {cuda_index!r}.') nvml_index = cuda_visible_devices[cuda_index] - if not isinstance(nvml_index, int) or is_mig_device_uuid(uuid): - return super().__new__(CudaMigDevice, index=nvml_index, uuid=uuid) + if (nvml_index is not None and not isinstance(nvml_index, int)) or is_mig_device_uuid(uuid): + return super().__new__(CudaMigDevice, index=nvml_index, uuid=uuid) # type: ignore[return-value] - return super().__new__(cls, index=nvml_index, uuid=uuid) + return super().__new__(cls, index=nvml_index, uuid=uuid) # type: ignore[return-value] def __init__( self, @@ -2251,13 +2279,13 @@ class CudaDevice(Device): raise RuntimeError(f'CUDA Error: invalid device ordinal: {cuda_index!r}.') nvml_index = cuda_visible_devices[cuda_index] - super().__init__(index=nvml_index, uuid=uuid) + super().__init__(index=nvml_index, uuid=uuid) # type: ignore[arg-type] if cuda_index is None: cuda_index = super().cuda_index - self._cuda_index = cuda_index + self._cuda_index: int = cuda_index - self._ident = ((self._cuda_index, self.index), self.uuid()) + self._ident: tuple[Hashable, str] = ((self._cuda_index, self.index), self.uuid()) def __repr__(self) -> str: """Return a string representation of the CUDA device.""" @@ -2279,7 +2307,7 @@ class CudaDevice(Device): The attributes are defined in :attr:`SNAPSHOT_KEYS`. """ snapshot = super().as_snapshot() - snapshot.cuda_index = self.cuda_index + snapshot.cuda_index = self.cuda_index # type: ignore[attr-defined] return snapshot @@ -2288,9 +2316,13 @@ Device.cuda = CudaDevice """Shortcut for class :class:`CudaDevice`.""" -class CudaMigDevice(CudaDevice, MigDevice): +class CudaMigDevice(CudaDevice, MigDevice): # type: ignore[misc] """Class for CUDA devices that are MIG devices.""" + _nvml_index: tuple[int, int] # type: ignore[assignment] + index: tuple[int, int] # type: ignore[assignment] + nvml_index: tuple[int, int] # type: ignore[assignment] + def is_mig_device_uuid(uuid: str | None) -> bool: """Return :data:`True` if the argument is a MIG device UUID, otherwise, return :data:`False`.""" @@ -2327,6 +2359,7 @@ def parse_cuda_visible_devices( Examples: >>> import os + >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '6,5' >>> parse_cuda_visible_devices() # parse the `CUDA_VISIBLE_DEVICES` environment variable to NVML indices [6, 5] @@ -2383,6 +2416,7 @@ def normalize_cuda_visible_devices(cuda_visible_devices: str | None = _VALUE_OMI Examples: >>> import os + >>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID' >>> os.environ['CUDA_VISIBLE_DEVICES'] = '6,5' >>> normalize_cuda_visible_devices() # normalize the `CUDA_VISIBLE_DEVICES` environment variable to UUID strings 'GPU-849d5a8d-610e-eeea-1fd4-81ff44a23794,GPU-18ef14e9-dec6-1d7e-1284-3010c6ce98b1' @@ -2420,7 +2454,7 @@ def normalize_cuda_visible_devices(cuda_visible_devices: str | None = _VALUE_OMI class _PhysicalDeviceAttrs(NamedTuple): - index: int + index: int # type: ignore[assignment] name: str uuid: str support_mig_mode: bool @@ -2460,7 +2494,7 @@ def _does_any_device_support_mig_mode(uuids: Iterable[str] | None = None) -> boo @contextlib.contextmanager -def _global_physical_device(device: PhysicalDevice) -> PhysicalDevice: +def _global_physical_device(device: PhysicalDevice) -> Generator[PhysicalDevice, None, None]: global _GLOBAL_PHYSICAL_DEVICE # pylint: disable=global-statement with _GLOBAL_PHYSICAL_DEVICE_LOCK: @@ -2473,13 +2507,29 @@ def _global_physical_device(device: PhysicalDevice) -> PhysicalDevice: def _get_global_physical_device() -> PhysicalDevice: with _GLOBAL_PHYSICAL_DEVICE_LOCK: - return _GLOBAL_PHYSICAL_DEVICE + return _GLOBAL_PHYSICAL_DEVICE # type: ignore[return-value] + + +@overload +def _parse_cuda_visible_devices( + cuda_visible_devices: str | None, + format: Literal['index'], # pylint: disable=redefined-builtin +) -> list[int] | list[tuple[int, int]]: + ... + + +@overload +def _parse_cuda_visible_devices( + cuda_visible_devices: str | None, + format: Literal['uuid'], # pylint: disable=redefined-builtin +) -> list[str]: + ... @functools.lru_cache() def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-statements cuda_visible_devices: str | None = None, - format: str = 'index', # pylint: disable=redefined-builtin + format: Literal['index', 'uuid'] = 'index', # pylint: disable=redefined-builtin ) -> list[int] | list[tuple[int, int]] | list[str]: """The underlining implementation for :meth:`parse_cuda_visible_devices`. The result will be cached.""" assert format in ('index', 'uuid') @@ -2487,7 +2537,7 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s try: physical_device_attrs = _get_all_physical_device_attrs() except libnvml.NVMLError: - return [] + return [] # type: ignore[return-value] gpu_uuids = set(physical_device_attrs) try: @@ -2532,6 +2582,10 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s if cuda_visible_devices is None: cuda_visible_devices = ','.join(physical_device_attrs.keys()) + devices: list[Device] = [] + presented: set[str] = set() + use_integer_identifiers: bool | None = None + def from_index_or_uuid(index_or_uuid: int | str) -> Device: nonlocal use_integer_identifiers @@ -2562,12 +2616,9 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s identifier = identifier[:offset] return identifier - devices = [] - presented = set() - use_integer_identifiers = None for identifier in map(strip_identifier, cuda_visible_devices.split(',')): if identifier in presented: - return [] # duplicate identifiers found + return [] # type: ignore[return-value] # duplicate identifiers found try: device = from_index_or_uuid(identifier) @@ -2600,7 +2651,7 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s if format == 'uuid': return [device.uuid() for device in devices] - return [device.index for device in devices] + return [device.index for device in devices] # type: ignore[return-value] def _parse_cuda_visible_devices_to_uuids( @@ -2655,7 +2706,7 @@ def _parse_cuda_visible_devices_to_uuids( def _cuda_visible_devices_parser( - cuda_visible_devices: str, + cuda_visible_devices: str | None, queue: mp.SimpleQueue, verbose: bool = True, ) -> None: diff --git a/nvitop/api/host.py b/nvitop/api/host.py index 591925a..c711627 100644 --- a/nvitop/api/host.py +++ b/nvitop/api/host.py @@ -43,13 +43,21 @@ __all__ = [name for name in _psutil.__all__ if not name.startswith('_')] + [ __all__[__all__.index('Error')] = 'PsutilError' -PsutilError = Error # make alias # noqa: F405 -del Error # noqa: F821 # pylint: disable=undefined-variable +PsutilError = Error = _psutil.Error # make alias +del Error cpu_percent = _psutil.cpu_percent virtual_memory = _psutil.virtual_memory swap_memory = _psutil.swap_memory +Process = _psutil.Process +NoSuchProcess = _psutil.NoSuchProcess +ZombieProcess = _psutil.ZombieProcess +AccessDenied = _psutil.AccessDenied +POSIX = _psutil.POSIX +WINDOWS = _psutil.WINDOWS +LINUX = _psutil.LINUX +MACOS = _psutil.MACOS if hasattr(_psutil, 'getloadavg'): @@ -60,7 +68,7 @@ if hasattr(_psutil, 'getloadavg'): else: - def load_average() -> None: + def load_average() -> None: # type: ignore[misc] """Get the system load average.""" return @@ -95,7 +103,7 @@ def reverse_ppid_map() -> dict[int, list[int]]: # pylint: disable=function-rede return tree -if LINUX: # noqa: F405 +if LINUX: WSL = _os.getenv('WSL_DISTRO_NAME', default=None) if WSL is not None and WSL == '': WSL = 'WSL' diff --git a/nvitop/api/libcuda.py b/nvitop/api/libcuda.py index dfd1b98..6aa3dbc 100644 --- a/nvitop/api/libcuda.py +++ b/nvitop/api/libcuda.py @@ -26,7 +26,13 @@ import platform as _platform import string as _string import sys as _sys import threading as _threading +from typing import TYPE_CHECKING as _TYPE_CHECKING from typing import Any as _Any +from typing import Callable as _Callable + + +if _TYPE_CHECKING: + from typing_extensions import TypeAlias as _TypeAlias # Python 3.10+ # pylint: disable-next=missing-class-docstring,too-few-public-methods @@ -34,9 +40,9 @@ class _struct_c_CUdevice_t(_ctypes.Structure): pass # opaque handle -_c_CUdevice_t = _ctypes.POINTER(_struct_c_CUdevice_t) +_c_CUdevice_t: _TypeAlias = _ctypes.POINTER(_struct_c_CUdevice_t) # type: ignore[valid-type] -_CUresult_t = _ctypes.c_uint +_CUresult_t: _TypeAlias = _ctypes.c_uint # Error codes # # pylint: disable=line-too-long @@ -215,8 +221,8 @@ CUDA_ERROR_UNKNOWN = 999 class CUDAError(Exception): """Base exception class for CUDA driver query errors.""" - _value_class_mapping = {} - _errcode_to_string = { # List of currently known error codes + _value_class_mapping: dict[int, type[CUDAError]] = {} + _errcode_to_string: dict[int, str] = { # List of currently known error codes CUDA_ERROR_NOT_INITIALIZED: 'Initialization error.', CUDA_ERROR_NOT_FOUND: 'Named symbol not found.', CUDA_ERROR_INVALID_VALUE: 'Invalid argument.', @@ -227,7 +233,8 @@ class CUDAError(Exception): CUDA_ERROR_COMPAT_NOT_SUPPORTED_ON_DEVICE: 'Forward compatibility was attempted on non supported Hardware.', CUDA_ERROR_INVALID_CONTEXT: 'Invalid device context.', } # fmt:skip - _errcode_to_name = {} + _errcode_to_name: dict[int, str] = {} + value: int def __new__(cls, value: int) -> CUDAError: """Map value to a proper subclass of :class:`CUDAError`.""" @@ -295,8 +302,8 @@ def _extract_cuda_errors_as_classes() -> None: class_name = f'CUDAError_{pascal_case}' err_val = getattr(this_module, err_name) - def gen_new(value): - def new(cls): + def gen_new(value: int) -> _Callable[[type[CUDAError]], CUDAError]: + def new(cls: type[CUDAError]) -> CUDAError: return CUDAError.__new__(cls, value) return new @@ -317,6 +324,24 @@ def _extract_cuda_errors_as_classes() -> None: CUDAError._errcode_to_name[err_val] = err_name +# Add explicit references to appease linters +class __CUDAError(CUDAError): + value: int + + def __new__(cls) -> CUDAError: # type: ignore[misc,empty-body] + ... + + +CUDAError_NotInitialized: type[__CUDAError] +CUDAError_NotFound: type[__CUDAError] +CUDAError_InvalidValue: type[__CUDAError] +CUDAError_NoDevice: type[__CUDAError] +CUDAError_InvalidDevice: type[__CUDAError] +CUDAError_SystemDriverMismatch: type[__CUDAError] +CUDAError_Deinitialized: type[__CUDAError] +CUDAError_CompatNotSupportedOnDevice: type[__CUDAError] +CUDAError_InvalidContext: type[__CUDAError] + _extract_cuda_errors_as_classes() del _extract_cuda_errors_as_classes @@ -328,14 +353,14 @@ def _cudaCheckReturn(ret: _Any) -> _Any: # Function access # -__cudaLib = None -__initialized = False -__libLoadLock = _threading.Lock() +__cudaLib: _ctypes.CDLL | None = None +__initialized: bool = False +__libLoadLock: _threading.Lock = _threading.Lock() # Function pointers are cached to prevent unnecessary libLoadLock locking -__cudaGetFunctionPointer_cache = {} +__cudaGetFunctionPointer_cache: dict[str, _ctypes._CFuncPtr] = {} # type: ignore[name-defined] -def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr: +def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined] """Get the function pointer from the CUDA driver library. Raises: @@ -658,11 +683,10 @@ def cuDeviceGetUuid(device: _c_CUdevice_t) -> str: except CUDAError_NotFound: # noqa: F821 # pylint: disable=undefined-variable fn = __cudaGetFunctionPointer('cuDeviceGetUuid') - ubyte_array = _ctypes.c_ubyte * 16 - uuid = ubyte_array() + uuid = _ctypes.create_string_buffer(16) ret = fn(uuid, device) _cudaCheckReturn(ret) - uuid = ''.join(map('{:02x}'.format, uuid)) + uuid = ''.join(map('{:02x}'.format, uuid.value)) return '-'.join((uuid[:8], uuid[8:12], uuid[12:16], uuid[16:20], uuid[20:32])) @@ -682,11 +706,10 @@ def cuDeviceGetUuid_v2(device: _c_CUdevice_t) -> str: """ fn = __cudaGetFunctionPointer('cuDeviceGetUuid_v2') - ubyte_array = _ctypes.c_ubyte * 16 - uuid = ubyte_array() + uuid = _ctypes.create_string_buffer(16) ret = fn(uuid, device) _cudaCheckReturn(ret) - uuid = ''.join(map('{:0x}'.format, uuid.value)) + uuid = ''.join(map('{:02x}'.format, uuid.value)) return '-'.join((uuid[:8], uuid[8:12], uuid[12:16], uuid[16:20], uuid[20:32])) diff --git a/nvitop/api/libcudart.py b/nvitop/api/libcudart.py index fc7043f..634de54 100644 --- a/nvitop/api/libcudart.py +++ b/nvitop/api/libcudart.py @@ -27,6 +27,7 @@ import platform as _platform import sys as _sys import threading as _threading from typing import Any as _Any +from typing import Callable as _Callable _cudaError_t = _ctypes.c_int @@ -266,8 +267,8 @@ cudaErrorUnknown = 999 class cudaError(Exception): """Base exception class for CUDA driver query errors.""" - _value_class_mapping = {} - _errcode_to_string = { # List of currently known error codes + _value_class_mapping: dict[int, type[cudaError]] = {} + _errcode_to_string: dict[int, str] = { # List of currently known error codes cudaErrorInitializationError: 'Initialization error.', cudaErrorSymbolNotFound: 'Named symbol not found.', cudaErrorInvalidValue: 'Invalid argument.', @@ -278,7 +279,8 @@ class cudaError(Exception): cudaErrorCompatNotSupportedOnDevice: 'Forward compatibility was attempted on non supported Hardware.', cudaErrorDeviceUninitialized: 'Invalid device context.', } # fmt:skip - _errcode_to_name = {} + _errcode_to_name: dict[int, str] = {} + value: int def __new__(cls, value: int) -> cudaError: """Map value to a proper subclass of :class:`cudaError`.""" @@ -349,8 +351,8 @@ def _extract_cuda_errors_as_classes() -> None: class_name = err_name.replace('cudaError', 'cudaError_') err_val = getattr(this_module, err_name) - def gen_new(value): - def new(cls): + def gen_new(value: int) -> _Callable[[type[cudaError]], cudaError]: + def new(cls: type[cudaError]) -> cudaError: return cudaError.__new__(cls, value) return new @@ -371,6 +373,24 @@ def _extract_cuda_errors_as_classes() -> None: cudaError._errcode_to_name[err_val] = err_name +# Add explicit references to appease linters +class __cudaError(cudaError): + value: int + + def __new__(cls) -> cudaError: # type: ignore[misc,empty-body] + ... + + +cudaError_InitializationError: type[__cudaError] +cudaError_SymbolNotFound: type[__cudaError] +cudaError_InvalidValue: type[__cudaError] +cudaError_NoDevice: type[__cudaError] +cudaError_InvalidDevice: type[__cudaError] +cudaError_SystemDriverMismatch: type[__cudaError] +cudaError_CudartUnloading: type[__cudaError] +cudaError_CompatNotSupportedOnDevice: type[__cudaError] +cudaError_DeviceUninitialized: type[__cudaError] + _extract_cuda_errors_as_classes() del _extract_cuda_errors_as_classes @@ -382,13 +402,13 @@ def _cudaCheckReturn(ret: _Any) -> _Any: # Function access # -__cudaLib = None -__libLoadLock = _threading.Lock() +__cudaLib: _ctypes.CDLL | None = None +__libLoadLock: _threading.Lock = _threading.Lock() # Function pointers are cached to prevent unnecessary libLoadLock locking -__cudaGetFunctionPointer_cache = {} +__cudaGetFunctionPointer_cache: dict[str, _ctypes._CFuncPtr] = {} # type: ignore[name-defined] -def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr: +def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined] """Get the function pointer from the CUDA Runtime library. Raises: diff --git a/nvitop/api/libnvml.py b/nvitop/api/libnvml.py index 8e45a3b..14f6a35 100644 --- a/nvitop/api/libnvml.py +++ b/nvitop/api/libnvml.py @@ -31,6 +31,7 @@ import sys as _sys import threading as _threading from types import FunctionType as _FunctionType from types import ModuleType as _ModuleType +from typing import TYPE_CHECKING as _TYPE_CHECKING from typing import Any as _Any from typing import Callable as _Callable @@ -38,11 +39,16 @@ from typing import Callable as _Callable # https://pypi.org/project/nvidia-ml-py import pynvml as _pynvml from pynvml import * # noqa: F403 # pylint: disable=wildcard-import,unused-wildcard-import +from pynvml import nvmlDeviceGetPciInfo # appease mypy # noqa: F401 # pylint: disable=unused-import from nvitop.api.utils import NA from nvitop.api.utils import colored as __colored +if _TYPE_CHECKING: + from typing_extensions import TypeAlias as _TypeAlias # Python 3.10+ + + __all__ = [ # will be updated in below 'NA', 'nvmlCheckReturn', @@ -63,10 +69,10 @@ if not callable(getattr(_pynvml, 'nvmlInitWithFlags', None)): # Members from `pynvml` ############################################################################ -NVMLError = _pynvml.NVMLError +NVMLError: type[_pynvml.NVMLError] = _pynvml.NVMLError NVMLError.__doc__ = """Base exception class for NVML query errors.""" NVMLError.__new__.__doc__ = """Map value to a proper subclass of :class:`NVMLError`.""" -nvmlExceptionClass = _pynvml.nvmlExceptionClass +nvmlExceptionClass: _Callable[[int], type[_pynvml.NVMLError]] = _pynvml.nvmlExceptionClass nvmlExceptionClass.__doc__ = """Map value to a proper subclass of :class:`NVMLError`.""" # Load members from module `pynvml` and register them in `__all__` and globals. @@ -161,26 +167,40 @@ del ( _sphinx_doc, ) + # 5. Add explicit references to appease linters # pylint: disable=no-member -c_nvmlDevice_t = _pynvml.c_nvmlDevice_t -NVMLError_FunctionNotFound = _pynvml.NVMLError_FunctionNotFound -NVMLError_GpuIsLost = _pynvml.NVMLError_GpuIsLost -NVMLError_InvalidArgument = _pynvml.NVMLError_InvalidArgument -NVMLError_LibraryNotFound = _pynvml.NVMLError_LibraryNotFound -NVMLError_NoPermission = _pynvml.NVMLError_NoPermission -NVMLError_NotFound = _pynvml.NVMLError_NotFound -NVMLError_NotSupported = _pynvml.NVMLError_NotSupported -NVMLError_Unknown = _pynvml.NVMLError_Unknown +c_nvmlDevice_t: _TypeAlias = _pynvml.c_nvmlDevice_t +NVMLError_FunctionNotFound: _TypeAlias = _pynvml.NVMLError_FunctionNotFound +NVMLError_GpuIsLost: _TypeAlias = _pynvml.NVMLError_GpuIsLost +NVMLError_InvalidArgument: _TypeAlias = _pynvml.NVMLError_InvalidArgument +NVMLError_LibraryNotFound: _TypeAlias = _pynvml.NVMLError_LibraryNotFound +NVMLError_NoPermission: _TypeAlias = _pynvml.NVMLError_NoPermission +NVMLError_NotFound: _TypeAlias = _pynvml.NVMLError_NotFound +NVMLError_NotSupported: _TypeAlias = _pynvml.NVMLError_NotSupported +NVMLError_Unknown: _TypeAlias = _pynvml.NVMLError_Unknown +NVML_CLOCK_GRAPHICS: int = _pynvml.NVML_CLOCK_GRAPHICS +NVML_CLOCK_SM: int = _pynvml.NVML_CLOCK_SM +NVML_CLOCK_MEM: int = _pynvml.NVML_CLOCK_MEM +NVML_CLOCK_VIDEO: int = _pynvml.NVML_CLOCK_VIDEO +NVML_TEMPERATURE_GPU: int = _pynvml.NVML_TEMPERATURE_GPU +NVML_DRIVER_WDDM: int = _pynvml.NVML_DRIVER_WDDM +NVML_DRIVER_WDM: int = _pynvml.NVML_DRIVER_WDM +NVML_MEMORY_ERROR_TYPE_UNCORRECTED: int = _pynvml.NVML_MEMORY_ERROR_TYPE_UNCORRECTED +NVML_VOLATILE_ECC: int = _pynvml.NVML_VOLATILE_ECC +NVML_COMPUTEMODE_DEFAULT: int = _pynvml.NVML_COMPUTEMODE_DEFAULT +NVML_COMPUTEMODE_EXCLUSIVE_THREAD: int = _pynvml.NVML_COMPUTEMODE_EXCLUSIVE_THREAD +NVML_COMPUTEMODE_PROHIBITED: int = _pynvml.NVML_COMPUTEMODE_PROHIBITED +NVML_COMPUTEMODE_EXCLUSIVE_PROCESS: int = _pynvml.NVML_COMPUTEMODE_EXCLUSIVE_PROCESS # pylint: enable=no-member # New members in `libnvml` ######################################################################### -__flags = [] -__initialized = False -__lock = _threading.Lock() +__flags: list[int] = [] +__initialized: bool = False +__lock: _threading.Lock = _threading.Lock() -LOGGER = _logging.getLogger(__name__) +LOGGER: _logging.Logger = _logging.getLogger(__name__) try: LOGGER.setLevel(_os.getenv('LOGLEVEL', default='WARNING').upper()) except (ValueError, TypeError): @@ -197,9 +217,9 @@ if not LOGGER.hasHandlers() and LOGGER.isEnabledFor(_logging.DEBUG): LOGGER.addHandler(_file_handler) del _formatter, _stream_handler, _file_handler -UNKNOWN_FUNCTIONS = {} -UNKNOWN_FUNCTIONS_CACHE_SIZE = 1024 -VERSIONED_PATTERN = _re.compile(r'^(?P\w+)(?P_v(\d)+)$') +UNKNOWN_FUNCTIONS: dict[str, tuple[_Callable | str, NVMLError_FunctionNotFound]] = {} +UNKNOWN_FUNCTIONS_CACHE_SIZE: int = 1024 +VERSIONED_PATTERN: _re.Pattern = _re.compile(r'^(?P\w+)(?P_v(\d)+)$') def _lazy_init() -> None: @@ -389,10 +409,14 @@ def nvmlQuery( except AttributeError as e1: raise NVMLError_FunctionNotFound from e1 - retval = func(*args, **kwargs) + retval = func(*args, **kwargs) # type: ignore[operator] except NVMLError_FunctionNotFound as e2: if not ignore_function_not_found: - identifier = _inspect.getsource(func) if func.__name__ == '' else repr(func) + identifier = ( + func + if isinstance(func, str) + else (_inspect.getsource(func) if func.__name__ == '' else repr(func)) + ) with __lock: if ( identifier not in UNKNOWN_FUNCTIONS @@ -431,7 +455,7 @@ def nvmlCheckReturn( # Patch layers for backward compatibility ########################################################## -__patched_backward_compatibility_layers = False +__patched_backward_compatibility_layers: bool = False def __patch_backward_compatibility_layers() -> None: @@ -441,9 +465,9 @@ def __patch_backward_compatibility_layers() -> None: return function_name_mapping_lock = _threading.Lock() - function_name_mapping = {} + function_name_mapping: dict[str, str] = {} - def function_mapping_update(mapping): + def function_mapping_update(mapping: dict[str, str]) -> dict[str, str]: with function_name_mapping_lock: mapping = dict(mapping) for name, mapped_name in function_name_mapping.items(): @@ -452,10 +476,12 @@ def __patch_backward_compatibility_layers() -> None: function_name_mapping.update(mapping) return mapping - def with_mapped_function_name(): - def wrapper(nvmlGetFunctionPointer): + def with_mapped_function_name() -> None: + def wrapper( + nvmlGetFunctionPointer: _Callable[[str], _ctypes._CFuncPtr], # type: ignore[name-defined] + ) -> _Callable[[str], _ctypes._CFuncPtr]: # type: ignore[name-defined] @_functools.wraps(nvmlGetFunctionPointer) - def wrapped(name): + def wrapped(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined] mapped_name = function_name_mapping.get(name, name) return nvmlGetFunctionPointer(mapped_name) @@ -467,10 +493,18 @@ def __patch_backward_compatibility_layers() -> None: ), ) - def patch_function_pointers_when_fail(names, callback): - def wrapper(nvmlGetFunctionPointer): + def patch_function_pointers_when_fail( + names: set[str], + callback: _Callable[[str, set[str], Exception, _ModuleType, _ModuleType], str], + ) -> _Callable[ # type: ignore[name-defined] + [_Callable[[str], _ctypes._CFuncPtr]], + _Callable[[str], _ctypes._CFuncPtr], + ]: + def wrapper( + nvmlGetFunctionPointer: _Callable[[str], _ctypes._CFuncPtr], # type: ignore[name-defined] + ) -> _Callable[[str], _ctypes._CFuncPtr]: # type: ignore[name-defined] @_functools.wraps(nvmlGetFunctionPointer) - def wrapped(name): + def wrapped(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined] try: return nvmlGetFunctionPointer(name) except NVMLError_FunctionNotFound as ex: @@ -483,12 +517,12 @@ def __patch_backward_compatibility_layers() -> None: return wrapper - def patch_process_info(): + def patch_process_info() -> None: # pylint: disable-next=protected-access,no-member PrintableStructure = _pynvml._PrintableStructure # pylint: disable-next=missing-class-docstring,too-few-public-methods - class c_nvmlProcessInfo_v1_t(PrintableStructure): + class c_nvmlProcessInfo_v1_t(PrintableStructure): # type: ignore[misc,valid-type] _fields_ = [ ('pid', _ctypes.c_uint), ('usedGpuMemory', _ctypes.c_ulonglong), @@ -498,7 +532,7 @@ def __patch_backward_compatibility_layers() -> None: } # pylint: disable-next=missing-class-docstring,too-few-public-methods - class c_nvmlProcessInfo_v2_t(PrintableStructure): + class c_nvmlProcessInfo_v2_t(PrintableStructure): # type: ignore[misc,valid-type] _fields_ = [ ('pid', _ctypes.c_uint), ('usedGpuMemory', _ctypes.c_ulonglong), @@ -521,12 +555,12 @@ def __patch_backward_compatibility_layers() -> None: } def patch_process_info_callback( - name, - names, # pylint: disable=unused-argument - exception, - pynvml, - modself, - ): + name: str, + names: set[str], # pylint: disable=unused-argument + exception: Exception, + pynvml: _ModuleType, + modself: _ModuleType, + ) -> str: if name in nvmlDeviceGetRunningProcesses_v3_v2: mapping = nvmlDeviceGetRunningProcesses_v3_v2 struct_type = c_nvmlProcessInfo_v2_t @@ -570,16 +604,20 @@ def __patch_backward_compatibility_layers() -> None: __patched_backward_compatibility_layers = True -_pynvml_installation_corrupted = not callable(getattr(_pynvml, '_nvmlGetFunctionPointer', None)) +_pynvml_installation_corrupted: bool = not callable( + getattr(_pynvml, '_nvmlGetFunctionPointer', None), +) if not _pynvml_installation_corrupted: __patch_backward_compatibility_layers() del __patch_backward_compatibility_layers -_pynvml_memory_v2_available = hasattr(_pynvml, 'nvmlMemory_v2') -_pynvml_get_memory_info_v2_available = _pynvml_memory_v2_available -_driver_get_memory_info_v2_available = None if not _pynvml_installation_corrupted else False +_pynvml_memory_v2_available: bool = hasattr(_pynvml, 'nvmlMemory_v2') +_pynvml_get_memory_info_v2_available: bool = _pynvml_memory_v2_available +_driver_get_memory_info_v2_available: bool | None = ( + None if not _pynvml_installation_corrupted else False +) # pylint: disable-next=function-redefined,too-many-branches diff --git a/nvitop/api/process.py b/nvitop/api/process.py index 0e0dd3e..d164579 100644 --- a/nvitop/api/process.py +++ b/nvitop/api/process.py @@ -27,7 +27,7 @@ import os import threading from abc import ABCMeta from types import FunctionType -from typing import TYPE_CHECKING, Any, Callable, Iterable +from typing import TYPE_CHECKING, Any, Callable, Generator, Iterable from weakref import WeakValueDictionary from nvitop.api import host, libnvml @@ -181,8 +181,14 @@ class HostProcess(host.Process, metaclass=ABCMeta): ) """ - INSTANCE_LOCK = threading.RLock() - INSTANCES = WeakValueDictionary() + INSTANCE_LOCK: threading.RLock = threading.RLock() + INSTANCES: WeakValueDictionary[int, HostProcess] = WeakValueDictionary() + + _pid: int + _super_gone: bool + _username: str | None + _ident: tuple + _lock: threading.RLock def __new__(cls, pid: int | None = None) -> HostProcess: """Return the cached instance of :class:`HostProcess`.""" @@ -378,7 +384,7 @@ class HostProcess(host.Process, metaclass=ABCMeta): return [HostProcess(child.pid) for child in super().children(recursive)] @contextlib.contextmanager - def oneshot(self) -> contextlib.AbstractContextManager: + def oneshot(self) -> Generator[None, None, None]: """A utility context manager which considerably speeds up the retrieval of multiple process information at the same time. Internally different process info (e.g. name, ppid, uids, gids, ...) may be fetched by using @@ -405,12 +411,12 @@ class HostProcess(host.Process, metaclass=ABCMeta): with super().oneshot(): # pylint: disable=no-member try: - self.cmdline.cache_activate(self) - self.running_time.cache_activate(self) + self.cmdline.cache_activate(self) # type: ignore[attr-defined] + self.running_time.cache_activate(self) # type: ignore[attr-defined] yield finally: - self.cmdline.cache_deactivate(self) - self.running_time.cache_deactivate(self) + self.cmdline.cache_deactivate(self) # type: ignore[attr-defined] + self.running_time.cache_deactivate(self) # type: ignore[attr-defined] def as_snapshot( self, @@ -441,13 +447,20 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi representing the same PID on the host but different GPU devices are different. """ - INSTANCE_LOCK = threading.RLock() - INSTANCES = WeakValueDictionary() + INSTANCE_LOCK: threading.RLock = threading.RLock() + INSTANCES: WeakValueDictionary[tuple[int, Device], GpuProcess] = WeakValueDictionary() + + _pid: int + _host: HostProcess + _device: Device + _username: str | None + _ident: tuple + _hash: int | None # pylint: disable-next=too-many-arguments def __new__( cls, - pid: int, + pid: int | None, device: Device, # pylint: disable=unused-argument gpu_memory: int | NaType | None = None, @@ -485,7 +498,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi # pylint: disable-next=too-many-arguments def __init__( self, - pid: int, # pylint: disable=unused-argument + pid: int | None, # pylint: disable=unused-argument device: Device, gpu_memory: int | NaType | None = None, gpu_instance_id: int | NaType | None = None, @@ -553,7 +566,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi If the user do not have read privilege to the process' status file. """ try: - return super().__getattr__(name) + return super().__getattr__(name) # type: ignore[misc] except AttributeError: if name == '_cache': raise @@ -627,7 +640,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi memory_total = self.device.memory_total() gpu_memory_percent = NA if libnvml.nvmlCheckReturn(memory_used, int) and libnvml.nvmlCheckReturn(memory_total, int): - gpu_memory_percent = round(100.0 * memory_used / memory_total, 1) + gpu_memory_percent = round(100.0 * memory_used / memory_total, 1) # type: ignore[assignment] self._gpu_memory_percent = gpu_memory_percent def set_gpu_utilization( @@ -651,7 +664,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi def update_gpu_status(self) -> int | NaType: """Update the GPU consumption status from a new NVML query.""" self.set_gpu_memory(NA) - self.set_gpu_utilization(NA, NA, NA, NA) + self.set_gpu_utilization(NA, NA, NA, NA) # type: ignore[arg-type] self.device.processes() return self.gpu_memory() @@ -989,8 +1002,10 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi If *failsafe* is :data:`True`, then if any method fails, the fallback value in :func:`auto_garbage_clean` will be used. """ - cache = {} - context = cls.failsafe if failsafe else contextlib.nullcontext + cache: dict[int, Snapshot] = {} + context: Callable[[], contextlib.AbstractContextManager[None]] = ( + cls.failsafe if failsafe else contextlib.nullcontext # type: ignore[assignment] + ) with context(): return [ process.as_snapshot(host_process_snapshot_cache=cache) for process in gpu_processes @@ -998,7 +1013,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi @classmethod @contextlib.contextmanager - def failsafe(cls) -> contextlib.AbstractContextManager: + def failsafe(cls) -> Generator[None, None, None]: """A context manager that enables fallback values for methods that fail. Examples: diff --git a/nvitop/api/utils.py b/nvitop/api/utils.py index f7b6d74..f9890ea 100644 --- a/nvitop/api/utils.py +++ b/nvitop/api/utils.py @@ -27,7 +27,7 @@ import os import re import sys import time -from typing import Any, Callable, Iterable +from typing import Any, Callable, Generator, Iterable, Iterator from psutil import WINDOWS @@ -70,12 +70,15 @@ except ImportError: text: str, color: str | None = None, on_color: str | None = None, - attrs: Iterable[str] = None, + attrs: Iterable[str] | None = None, + *, + no_color: bool | None = None, + force_color: bool | None = None, ) -> str: return text -COLOR = sys.stdout.isatty() +COLOR: bool = sys.stdout.isatty() def set_color(value: bool) -> None: @@ -94,7 +97,7 @@ def colored( text: str, color: str | None = None, on_color: str | None = None, - attrs: Iterable[str] = None, + attrs: Iterable[str] | None = None, ) -> str: """Colorize text with ANSI color escape codes. @@ -178,7 +181,7 @@ class NaType(str): """ return math.nan - def __add__(self, other: object) -> str | float: + def __add__(self, other: object) -> str | float: # type: ignore[override] """Return :data:`math.nan` if the operand is a number or uses string concatenation if the operand is a string (``NA + other``). A special case is when the operand is :const:`nvitop.NA` itself, the result is @@ -193,9 +196,11 @@ class NaType(str): >>> NA + 1.0 nan """ # pylint: disable=line-too-long - if isinstance(other, (int, float)) or other is NA: + if isinstance(other, (int, float)): return float(self) + other - return super().__add__(other) + if other is NA: + return float(self) + return super().__add__(other) # type: ignore[operator] def __radd__(self, other: object) -> str | float: """Return :data:`math.nan` if the operand is a number or uses string concatenation if the operand is a string (``other + NA``). @@ -223,8 +228,10 @@ class NaType(str): >>> NA + 1.0 nan """ - if isinstance(other, (int, float)) or other is NA: + if isinstance(other, (int, float)): return float(self) - other + if other is NA: + return float(self) return NotImplemented def __rsub__(self, other: object) -> float: @@ -241,7 +248,7 @@ class NaType(str): return other - float(self) return NotImplemented - def __mul__(self, other: object) -> float: + def __mul__(self, other: object) -> float: # type: ignore[override] """Return :data:`math.nan` if the operand is a number (``NA * other``). A special case is when the operand is :const:`nvitop.NA` itself, the result is also :data:`math.nan`. @@ -253,11 +260,13 @@ class NaType(str): >>> NA * NA nan """ - if isinstance(other, (int, float)) or other is NA: + if isinstance(other, (int, float)): return float(self) * other + if other is NA: + return float(self) return NotImplemented - def __rmul__(self, other: object) -> float: + def __rmul__(self, other: object) -> float: # type: ignore[override] """Return :data:`math.nan` if the operand is a number (``other * NA``). >>> 1024 * NA @@ -280,9 +289,13 @@ class NaType(str): ZeroDivisionError: float division by zero >>> NA / 0.0 ZeroDivisionError: float division by zero + >>> NA / NA + nan """ if isinstance(other, (int, float)): return float(self) / other + if other is NA: + return float(self) return NotImplemented def __rtruediv__(self, other: object) -> float: @@ -308,9 +321,13 @@ class NaType(str): ZeroDivisionError: float division by zero >>> NA / 0.0 ZeroDivisionError: float division by zero + >>> NA // NA + nan """ if isinstance(other, (int, float)): return float(self) // other + if other is NA: + return float(self) return NotImplemented def __rfloordiv__(self, other: object) -> float: @@ -325,7 +342,7 @@ class NaType(str): return other // float(self) return NotImplemented - def __mod__(self, other: object) -> float: + def __mod__(self, other: object) -> float: # type: ignore[override] """Return :data:`math.nan` if the operand is a number (``NA % other``). >>> NA % 1024 @@ -339,6 +356,8 @@ class NaType(str): """ if isinstance(other, (int, float)): return float(self) % other + if other is NA: + return float(self) return NotImplemented def __rmod__(self, other: object) -> float: @@ -421,25 +440,25 @@ class NaType(str): """The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string.""" if isinstance(x, (int, float)): return False - return super().__lt__(x) + return super().__lt__(x) # type: ignore[operator] def __le__(self, x: object) -> bool: """The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string.""" if isinstance(x, (int, float)): return False - return super().__le__(x) + return super().__le__(x) # type: ignore[operator] def __gt__(self, x: object) -> bool: """The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string.""" if isinstance(x, (int, float)): return True - return super().__gt__(x) + return super().__gt__(x) # type: ignore[operator] def __ge__(self, x: object) -> bool: """The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string.""" if isinstance(x, (int, float)): return True - return super().__ge__(x) + return super().__ge__(x) # type: ignore[operator] def __format__(self, format_spec: str) -> str: """Format :const:`nvitop.NA` according to ``format_spec``.""" @@ -459,22 +478,22 @@ NA.__doc__ = """The singleton instance of :class:`NaType`. The actual value is : NotApplicable = NA -KiB = 1 << 10 +KiB: int = 1 << 10 """Kibibyte (1024)""" -MiB = 1 << 20 +MiB: int = 1 << 20 """Mebibyte (1024 * 1024)""" -GiB = 1 << 30 +GiB: int = 1 << 30 """Gibibyte (1024 * 1024 * 1024)""" -TiB = 1 << 40 +TiB: int = 1 << 40 """Tebibyte (1024 * 1024 * 1024 * 1024)""" -PiB = 1 << 50 +PiB: int = 1 << 50 """Pebibyte (1024 * 1024 * 1024 * 1024 * 1024)""" -SIZE_UNITS = { +SIZE_UNITS: dict[str | None, int] = { None: 1, '': 1, 'B': 1, @@ -487,10 +506,10 @@ SIZE_UNITS = { 'MB': 1000**2, 'GB': 1000**3, 'TB': 1000**4, - 'PB': 1000**4, + 'PB': 1000**5, } """Units of storage and memory measurements.""" -SIZE_PATTERN = re.compile( +SIZE_PATTERN: re.Pattern = re.compile( r'^\s*\+?\s*(?P\d+(?:\.\d+)?)\s*(?P[KMGTP]i?B?|B?)\s*$', flags=re.IGNORECASE, ) @@ -651,7 +670,7 @@ class Snapshot: If the attribute is not defined, fetches from the original object and makes a function call. """ try: - return super().__getattr__(name) + return super().__getattr__(name) # type: ignore[misc] except AttributeError: attribute = getattr(self.real, name) if callable(attribute): @@ -671,17 +690,15 @@ class Snapshot: """Support ``snapshot['name'] = value`` syntax.""" setattr(self, name, value) - def __iter__(self) -> Iterable[str]: + def __iter__(self) -> Iterator[str]: """Support ``for name in snapshot`` syntax and ``*`` tuple unpack ``[*snapshot]`` syntax.""" - def gen() -> str: - for name in self.__dict__: - if name not in ('real', 'timestamp'): - yield name + def gen() -> Generator[str, None, None]: + yield from (name for name in self.__dict__ if name not in ('real', 'timestamp')) return gen() - def keys(self) -> Iterable[str]: + def keys(self) -> Iterator[str]: # pylint: disable-next=line-too-long """Support ``**`` dictionary unpack ``{**snapshot}`` / ``dict(**snapshot)`` syntax and ``dict(snapshot)`` dictionary conversion.""" return iter(self) @@ -730,6 +747,6 @@ def memoize_when_activated(method: Callable[[Any], Any]) -> Callable[[Any], Any] except AttributeError: pass - wrapped.cache_activate = cache_activate - wrapped.cache_deactivate = cache_deactivate + wrapped.cache_activate = cache_activate # type: ignore[attr-defined] + wrapped.cache_deactivate = cache_deactivate # type: ignore[attr-defined] return wrapped diff --git a/nvitop/callbacks/keras.py b/nvitop/callbacks/keras.py index c480399..3ac0855 100644 --- a/nvitop/callbacks/keras.py +++ b/nvitop/callbacks/keras.py @@ -57,8 +57,7 @@ class GpuStatsLogger(Callback): # pylint: disable=too-many-instance-attributes ValueError: If NVIDIA driver is not installed, or the `gpus` argument does not match available devices. - Example:: - + Examples: >>> from tensorflow.python.keras.utils.multi_gpu_utils import multi_gpu_model >>> from tensorflow.python.keras.callbacks import TensorBoard >>> from nvitop.callbacks.keras import GpuStatsLogger diff --git a/nvitop/callbacks/pytorch_lightning.py b/nvitop/callbacks/pytorch_lightning.py index a37fa0c..0c49ce4 100644 --- a/nvitop/callbacks/pytorch_lightning.py +++ b/nvitop/callbacks/pytorch_lightning.py @@ -58,8 +58,7 @@ class GpuStatsLogger(Callback): # pylint: disable=too-many-instance-attributes MisconfigurationException: If NVIDIA driver is not installed, not running on GPUs, or ``Trainer`` has no logger. - Example:: - + Examples: >>> from pytorch_lightning import Trainer >>> from nvitop.callbacks.pytorch_lightning import GpuStatsLogger >>> gpu_stats = GpuStatsLogger() diff --git a/nvitop/cli.py b/nvitop/cli.py index 54092e0..e4a619e 100644 --- a/nvitop/cli.py +++ b/nvitop/cli.py @@ -237,29 +237,31 @@ def parse_arguments() -> argparse.Namespace: args.user.append(USERNAME) if args.gpu_util_thresh is None: try: - gpu_util_thresh = os.getenv('NVITOP_GPU_UTILIZATION_THRESHOLDS', None) - gpu_util_thresh = list(map(int, gpu_util_thresh.split(',')))[:2] + gpu_util_thresh = list( + map(int, os.getenv('NVITOP_GPU_UTILIZATION_THRESHOLDS', '').split(',')), + )[:2] if ( len(gpu_util_thresh) != 2 or min(gpu_util_thresh) <= 0 or max(gpu_util_thresh) >= 100 ): raise ValueError - except (ValueError, AttributeError): + except ValueError: pass else: args.gpu_util_thresh = gpu_util_thresh if args.mem_util_thresh is None: try: - mem_util_thresh = os.getenv('NVITOP_MEMORY_UTILIZATION_THRESHOLDS', None) - mem_util_thresh = list(map(int, mem_util_thresh.split(',')))[:2] + mem_util_thresh = list( + map(int, os.getenv('NVITOP_MEMORY_UTILIZATION_THRESHOLDS', '').split(',')), + )[:2] if ( len(mem_util_thresh) != 2 or min(mem_util_thresh) <= 0 or max(mem_util_thresh) >= 100 ): raise ValueError - except (ValueError, AttributeError): + except ValueError: pass else: args.mem_util_thresh = mem_util_thresh @@ -268,7 +270,7 @@ def parse_arguments() -> argparse.Namespace: # pylint: disable-next=too-many-branches,too-many-statements,too-many-locals -def main() -> None: +def main() -> int: """Main function for ``nvitop`` CLI.""" args = parse_arguments() @@ -307,9 +309,9 @@ def main() -> None: return 1 if args.gpu_util_thresh is not None: - Device.GPU_UTILIZATION_THRESHOLDS = tuple(sorted(args.gpu_util_thresh)) + Device.GPU_UTILIZATION_THRESHOLDS = tuple(sorted(args.gpu_util_thresh)) # type: ignore[assignment] if args.mem_util_thresh is not None: - Device.MEMORY_UTILIZATION_THRESHOLDS = tuple(sorted(args.mem_util_thresh)) + Device.MEMORY_UTILIZATION_THRESHOLDS = tuple(sorted(args.mem_util_thresh)) # type: ignore[assignment] if args.only is not None: indices = set(args.only) @@ -325,8 +327,8 @@ def main() -> None: for index in Device.parse_cuda_visible_devices() } else: - indices = range(device_count) - devices = Device.from_indices(sorted(set(indices))) + indices = set(range(device_count)) + devices = Device.from_indices(sorted(indices)) filters = [] if args.compute: @@ -366,12 +368,17 @@ def main() -> None: ui = UI(devices, filters, ascii=args.ascii) if not sys.stdout.isatty(): parent = HostProcess().parent() - grandparent = parent.parent() if parent is not None else None - if grandparent is not None and parent.name() == 'sh' and grandparent.name() == 'watch': - messages.append( - 'HINT: You are running `nvitop` under `watch` command. ' - 'Please try `nvitop -m` directly.', - ) + if parent is not None: + grandparent = parent.parent() + if ( + grandparent is not None + and parent.name() == 'sh' + and grandparent.name() == 'watch' + ): + messages.append( + 'HINT: You are running `nvitop` under `watch` command. ' + 'Please try `nvitop -m` directly.', + ) ui.print() ui.destroy() @@ -383,7 +390,7 @@ def main() -> None: else 'ERROR: A FunctionNotFound error occurred while calling:', ] unknown_function_messages.extend( - f' nvmlQuery({func.__name__!r}, *args, **kwargs)' + f' nvmlQuery({(func.__name__ if not isinstance(func, str) else func)!r}, *args, **kwargs)' for func, _ in libnvml.UNKNOWN_FUNCTIONS.values() ) unknown_function_messages.append( diff --git a/nvitop/gui/library/device.py b/nvitop/gui/library/device.py index 381bcba..c8eb2da 100644 --- a/nvitop/gui/library/device.py +++ b/nvitop/gui/library/device.py @@ -8,7 +8,7 @@ from cachetools.func import ttl_cache from nvitop.api import NA from nvitop.api import MigDevice as MigDeviceBase from nvitop.api import PhysicalDevice as DeviceBase -from nvitop.api import Snapshot, libnvml, utilization2string +from nvitop.api import libnvml, utilization2string from nvitop.gui.library.process import GpuProcess @@ -75,7 +75,7 @@ class Device(DeviceBase): return self._snapshot @property - def snapshot(self) -> Snapshot: + def snapshot(self): if self._snapshot is None: self.as_snapshot() return self._snapshot diff --git a/nvitop/select.py b/nvitop/select.py index 6c37d2a..6bba248 100644 --- a/nvitop/select.py +++ b/nvitop/select.py @@ -62,12 +62,16 @@ import math import os import sys import warnings -from typing import Any, Iterable +from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence, overload -from nvitop.api import Device, GpuProcess, colored, human2bytes, libnvml +from nvitop.api import Device, GpuProcess, Snapshot, colored, human2bytes, libnvml from nvitop.version import __version__ +if TYPE_CHECKING: + from typing_extensions import Literal # Python 3.8+ + + __all__ = ['select_devices'] try: @@ -78,10 +82,70 @@ except ModuleNotFoundError: TTY = sys.stdout.isatty() +@overload +def select_devices( + devices: Iterable[Device] | None, + *, + format: Literal['index'], # pylint: disable=redefined-builtin + force_index: bool, + min_count: int, + max_count: int | None, + min_free_memory: int | str | None, + min_total_memory: int | str | None, + max_gpu_utilization: int | None, + max_memory_utilization: int | None, + tolerance: int, + free_accounts: list[str] | None, + sort: bool, + **kwargs: Any, +) -> list[int] | list[tuple[int, int]]: + ... + + +@overload +def select_devices( + devices: Iterable[Device] | None, + *, + format: Literal['uuid'], # pylint: disable=redefined-builtin + force_index: bool, + min_count: int, + max_count: int | None, + min_free_memory: int | str | None, + min_total_memory: int | str | None, + max_gpu_utilization: int | None, + max_memory_utilization: int | None, + tolerance: int, + free_accounts: list[str] | None, + sort: bool, + **kwargs: Any, +) -> list[int] | list[tuple[int, int]]: + ... + + +@overload +def select_devices( + devices: Iterable[Device] | None, + *, + format: Literal['device'], # pylint: disable=redefined-builtin + force_index: bool, + min_count: int, + max_count: int | None, + min_free_memory: int | str | None, + min_total_memory: int | str | None, + max_gpu_utilization: int | None, + max_memory_utilization: int | None, + tolerance: int, + free_accounts: list[str] | None, + sort: bool, + **kwargs: Any, +) -> list[Device]: + ... + + def select_devices( # pylint: disable=too-many-branches,too-many-statements,too-many-locals,unused-argument devices: Iterable[Device] | None = None, *, - format: str = 'index', # pylint: disable=redefined-builtin + format: Literal['index', 'uuid', 'device'] = 'index', # pylint: disable=redefined-builtin force_index: bool = False, min_count: int = 0, max_count: int | None = None, @@ -90,10 +154,10 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too max_gpu_utilization: int | None = None, # in percentage max_memory_utilization: int | None = None, # in percentage tolerance: int = 0, # in percentage - free_accounts: list[str] = None, + free_accounts: list[str] | None = None, sort: bool = True, **kwargs: Any, -) -> list[int] | list[tuple[int, int]] | list[str]: +) -> list[int] | list[tuple[int, int]] | list[str] | list[Device]: """Select a subset of devices satisfying the specified criteria. Note: @@ -151,7 +215,7 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too if max_count is not None: if max_count == 0: - return [] + return [] # type: ignore[return-value] assert max_count >= min_count >= 0 free_accounts = set(free_accounts or []) @@ -164,11 +228,11 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too if isinstance(min_total_memory, str): min_total_memory = human2bytes(min_total_memory) - available_devices = [] + available_devices: list[Snapshot] = [] for device in devices: available_devices.extend(dev.as_snapshot() for dev in device.to_leaf_devices()) for device in available_devices: - device.loosen_constraints = 0 + device.loosen_constraints = 0 # type: ignore[attr-defined] if len(free_accounts) > 0: with GpuProcess.failsafe(): @@ -177,61 +241,53 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too for process in device.real.processes().values(): if process.username() in free_accounts: as_free_memory += process.gpu_memory() - device.memory_free += as_free_memory - device.memory_used -= as_free_memory + device.memory_free += as_free_memory # type: ignore[attr-defined] + device.memory_used -= as_free_memory # type: ignore[attr-defined] + + def filter_func( + criteria: Callable[[Snapshot], bool], + original_criteria: Callable[[Snapshot], bool], + ) -> Callable[[Snapshot], bool]: + def wrapped(device: Snapshot) -> bool: + device.loosen_constraints += int(not original_criteria(device)) # type: ignore[attr-defined] + return criteria(device) + + return wrapped if min_free_memory is not None: loosen_min_free_memory = min_free_memory * (1.0 - tolerance) - available_devices = filter( - lambda device: ( - device.memory_free >= loosen_min_free_memory, - setattr( - device, - 'loosen_constraints', - device.loosen_constraints + int(not device.memory_free >= min_free_memory), - ), - )[0], + available_devices = filter( # type: ignore[assignment] + filter_func( + lambda device: device.memory_free >= loosen_min_free_memory, + lambda device: device.memory_free >= min_free_memory, + ), available_devices, ) if min_total_memory is not None: loosen_min_total_memory = min_total_memory * (1.0 - tolerance) - available_devices = filter( - lambda device: ( - device.memory_total >= loosen_min_total_memory, - setattr( - device, - 'loosen_constraints', - device.loosen_constraints + int(not device.memory_total >= min_total_memory), - ), - )[0], + available_devices = filter( # type: ignore[assignment] + filter_func( + lambda device: device.memory_total >= loosen_min_total_memory, + lambda device: device.memory_total >= min_total_memory, + ), available_devices, ) if max_gpu_utilization is not None: loosen_max_gpu_utilization = max_gpu_utilization + 100.0 * tolerance - available_devices = filter( - lambda device: ( - device.gpu_utilization <= loosen_max_gpu_utilization, - setattr( - device, - 'loosen_constraints', - device.loosen_constraints - + int(not device.gpu_utilization <= max_gpu_utilization), - ), - )[0], + available_devices = filter( # type: ignore[assignment] + filter_func( + lambda device: device.gpu_utilization <= loosen_max_gpu_utilization, + lambda device: device.gpu_utilization <= max_gpu_utilization, + ), available_devices, ) if max_memory_utilization is not None: loosen_max_memory_utilization = max_memory_utilization + 100.0 * tolerance - available_devices = filter( - lambda device: ( - device.memory_utilization <= loosen_max_memory_utilization, - setattr( - device, - 'loosen_constraints', - device.loosen_constraints - + int(not device.memory_utilization <= max_memory_utilization), - ), - )[0], + available_devices = filter( # type: ignore[assignment] + filter_func( + lambda device: device.memory_utilization <= loosen_max_memory_utilization, + lambda device: device.memory_utilization <= max_memory_utilization, + ), available_devices, ) @@ -486,10 +542,11 @@ def parse_arguments() -> argparse.Namespace: return args -def main() -> None: +def main() -> int: """Main function for ``nvisel`` CLI.""" args = parse_arguments() + devices: Sequence[Device] try: if hasattr(args, 'inherit'): if args.inherit is not None: diff --git a/pyproject.toml b/pyproject.toml index 5829af9..6776fc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -82,6 +82,31 @@ line_length = 100 lines_after_imports = 2 multi_line_output = 3 +[tool.mypy] +# Sync with requires-python +python_version = 3.8 # appease mypy for syntax errors in numpy stubs +pretty = true +show_error_codes = true +show_error_context = true +show_traceback = true +allow_redefinition = true +check_untyped_defs = true +disallow_incomplete_defs = false +disallow_untyped_defs = false +ignore_missing_imports = true +no_implicit_optional = true +strict_equality = true +strict_optional = true +warn_no_return = true +warn_redundant_casts = true +warn_unreachable = true +warn_unused_configs = true +warn_unused_ignores = true + +[[tool.mypy.overrides]] +module = ['nvitop.callbacks.*', 'nvitop.gui.*'] +ignore_errors= true + [tool.pydocstyle] convention = "google" match-dir = '^(?!(gui|callbacks|docs))[^\.].*' @@ -90,6 +115,7 @@ match-dir = '^(?!(gui|callbacks|docs))[^\.].*' ignore-words = "docs/source/spelling_wordlist.txt" [tool.ruff] +# Sync with requires-python target-version = "py37" line-length = 100 show-source = true @@ -145,7 +171,6 @@ ignore = [ ] "nvitop/api/lib*.py" = [ "N", # pep8-naming - "ANN", # flake8-annotations ] "nvitop/callbacks/*.py" = [ "ANN", # flake8-annotations diff --git a/setup.py b/setup.py index 7524b24..7dac717 100755 --- a/setup.py +++ b/setup.py @@ -51,6 +51,8 @@ try: 'black >= 22.6.0', 'isort', 'pylint[spelling] >= 2.16.0', + 'mypy', + 'typing-extensions', 'pre-commit', ], 'cuda10': ['nvidia-ml-py == 11.450.51'],