feat(linter): mypy integration (#73)

This commit is contained in:
Xuehai Pan 2023-05-01 13:02:01 +08:00 committed by GitHub
parent 2408735f54
commit f0b055bfcf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 640 additions and 355 deletions

View file

@ -20,6 +20,7 @@ per-file-ignores =
# F401: module imported but unused
# intentionally unused imports
__init__.py: F401
nvitop/api/host.py: F401
# SIM113: use enumarate
# false positive
nvitop/gui/screens/main/process.py: SIM113

View file

@ -72,7 +72,7 @@ jobs:
python -m venv venv &&
(
source venv/bin/activate &&
python -m pip install --upgrade pip setuptools pre-commit pylint[spelling]
python -m pip install --upgrade pip setuptools pre-commit pylint[spelling] mypy typing-extensions
python -m pip install -r requirements.txt &&
python -m pre_commit install --install-hooks &&
python -m pre_commit run --all-files &&

View file

@ -69,7 +69,7 @@ jobs:
- name: Install linters
run: |
python -m pip install --upgrade pre-commit pylint[spelling]
python -m pip install --upgrade pre-commit pylint[spelling] mypy typing-extensions
- name: pre-commit
run: |

View file

@ -25,7 +25,7 @@ repos:
- id: debug-statements
- id: double-quote-string-fixer
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.262
rev: v0.0.263
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
@ -38,10 +38,10 @@ repos:
hooks:
- id: black
- repo: https://github.com/asottile/pyupgrade
rev: v3.3.1
rev: v3.3.2
hooks:
- id: pyupgrade
args: [--py37-plus]
args: [--py37-plus] # sync with requires-python
- repo: https://github.com/pycqa/flake8
rev: 6.0.0
hooks:
@ -59,8 +59,8 @@ repos:
- repo: https://github.com/codespell-project/codespell
rev: v2.2.4
hooks:
- id: codespell
additional_dependencies: [".[toml]"]
- id: codespell
additional_dependencies: [".[toml]"]
- repo: local
hooks:
- id: pylint
@ -80,3 +80,11 @@ repos:
^nvitop/callbacks/|
^docs/
)
- repo: local
hooks:
- id: mypy
name: mypy
entry: mypy
language: system
types_or: [python, pyi]
require_serial: true

View file

@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
-
- Add `mypy` integration and update type annotations by [@XuehaiPan](https://github.com/XuehaiPan) in [#73](https://github.com/XuehaiPan/nvitop/pull/73).
### Changed
@ -33,7 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Further isolate the `CUDA_VISIBLE_DEVICE` parser in a subprocess by [@XuehaiPan](https://github.com/XuehaiPan) in [#70](https://github.com/XuehaiPan/nvitop/pull/70).
- Further isolate the `CUDA_VISIBLE_DEVICES` parser in a subprocess by [@XuehaiPan](https://github.com/XuehaiPan) in [#70](https://github.com/XuehaiPan/nvitop/pull/70).
------

View file

@ -686,6 +686,7 @@ for device in devices:
```python
In [1]: from nvitop import take_snapshots, Device
...: import os
...: os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
...: os.environ['CUDA_VISIBLE_DEVICES'] = '1,0' # comma-separated integers or UUID strings
In [2]: take_snapshots() # equivalent to `take_snapshots(Device.all())`
@ -763,6 +764,7 @@ Please refer to section [Low-level APIs](#low-level-apis) for more information.
```python
In [1]: from nvitop import ResourceMetricCollector, Device
...: import os
...: os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
...: os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0' # comma-separated integers or UUID strings
In [2]: collector = ResourceMetricCollector() # log all devices and descendant processes of the current process on the GPUs
@ -983,6 +985,7 @@ In [1]: from nvitop import (
...: NA,
...: )
...: import os
...: os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
...: os.environ['CUDA_VISIBLE_DEVICES'] = '9,8,7,6' # comma-separated integers or UUID strings
In [2]: Device.driver_version()

View file

@ -140,3 +140,4 @@ noqa
uptime
ot
oT
mypy

View file

@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Resource metrics collectors."""
from __future__ import annotations
@ -26,7 +25,7 @@ import os
import threading
import time
from collections import OrderedDict, defaultdict
from typing import Callable, Hashable, Iterable, NamedTuple
from typing import Callable, Generator, Iterable, NamedTuple, TypeVar
from weakref import WeakSet
from nvitop.api import host
@ -46,7 +45,10 @@ class SnapshotResult(NamedTuple): # pylint: disable=missing-class-docstring
timer = time.monotonic
def _unique(iterable: Iterable[Hashable]) -> list[Hashable]:
_T = TypeVar('_T')
def _unique(iterable: Iterable[_T]) -> list[_T]:
return list(OrderedDict.fromkeys(iterable).keys())
@ -78,6 +80,7 @@ def take_snapshots(
Examples:
>>> from nvitop import take_snapshots, Device
>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '1,0'
>>> take_snapshots() # equivalent to `take_snapshots(Device.all())`
@ -145,8 +148,8 @@ def take_snapshots(
if isinstance(gpu_processes, GpuProcess):
gpu_processes = [gpu_processes]
if gpu_processes is not None:
if gpu_processes: # is not False or is a non-empty list/tuple
if gpu_processes is not None and gpu_processes is not True:
if gpu_processes: # is a non-empty list/tuple
gpu_processes = list(gpu_processes)
process_devices = _unique(process.device for process in gpu_processes)
for device in process_devices:
@ -161,7 +164,7 @@ def take_snapshots(
if devices is None:
physical_devices = Device.all()
devices = []
leaf_devices = []
leaf_devices: list[Device] = []
for physical_device in physical_devices:
devices.append(physical_device)
mig_devices = physical_device.mig_devices()
@ -176,7 +179,7 @@ def take_snapshots(
itertools.chain.from_iterable(device.processes().values() for device in leaf_devices),
)
devices = [device.as_snapshot() for device in devices]
devices = [device.as_snapshot() for device in devices] # type: ignore[union-attr]
gpu_processes = GpuProcess.take_snapshots(gpu_processes, failsafe=True)
return SnapshotResult(devices, gpu_processes)
@ -254,22 +257,22 @@ def collect_in_background(
def target() -> None:
if on_start is not None:
on_start(collector)
on_start(collector) # type: ignore[arg-type]
try:
with collector(tag):
with collector(tag): # type: ignore[misc]
try:
next_snapshot = timer() + interval
while on_collect(collector.collect()):
next_snapshot = timer() + interval # type: ignore[operator]
while on_collect(collector.collect()): # type: ignore[union-attr]
time.sleep(max(0.0, next_snapshot - timer()))
next_snapshot += interval
next_snapshot += interval # type: ignore[operator]
except KeyboardInterrupt:
pass
finally:
if on_stop is not None:
on_stop(collector)
on_stop(collector) # type: ignore[arg-type]
daemon = threading.Thread(target=target, name=tag, daemon=True)
daemon.collector = collector
daemon.collector = collector # type: ignore[attr-defined]
if start:
daemon.start()
return daemon
@ -304,6 +307,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
Examples:
>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0'
>>> from nvitop import ResourceMetricCollector, Device
@ -404,13 +408,13 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
if devices is None:
devices = Device.all()
root_pids = {os.getpid()} if root_pids is None else set(root_pids)
root_pids: set[int] = {os.getpid()} if root_pids is None else set(root_pids)
self.interval = interval
self.interval: float = interval
self.devices = list(devices)
self.all_devices = []
self.leaf_devices = []
self.devices: list[Device] = list(devices)
self.all_devices: list[Device] = []
self.leaf_devices: list[Device] = []
for device in self.devices:
self.all_devices.append(device)
mig_devices = device.mig_devices()
@ -420,21 +424,23 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
else:
self.leaf_devices.append(device)
self.root_pids = root_pids
self._positive_processes = WeakSet(HostProcess(pid) for pid in self.root_pids)
self._negative_processes = WeakSet()
self.root_pids: set[int] = root_pids
self._positive_processes: WeakSet[HostProcess] = WeakSet(
HostProcess(pid) for pid in self.root_pids
)
self._negative_processes: WeakSet[HostProcess] = WeakSet()
self._last_timestamp = timer() - 2.0 * self.interval
self._lock = threading.RLock()
self._metric_buffer = None
self._tags = set()
self._last_timestamp: float = timer() - 2.0 * self.interval
self._lock: threading.RLock = threading.RLock()
self._metric_buffer: _MetricBuffer | None = None
self._tags: set[str] = set()
self._daemon = threading.Thread(
self._daemon: threading.Thread = threading.Thread(
name='gpu_metric_collector_daemon',
target=self._target,
daemon=True,
)
self._daemon_running = threading.Event()
self._daemon_running: threading.Event = threading.Event()
def activate(self, tag: str) -> ResourceMetricCollector:
"""Start a new metric collection with the given tag.
@ -500,7 +506,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
if buffer.tag == tag:
self._metric_buffer = buffer.prev
break
buffer = buffer.prev
buffer = buffer.prev # type: ignore[assignment]
if self._metric_buffer is None:
self._daemon_running.clear()
@ -510,7 +516,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
stop = deactivate
@contextlib.contextmanager
def context(self, tag: str) -> ResourceMetricCollector:
def context(self, tag: str) -> Generator[ResourceMetricCollector, None, None]:
"""A context manager for starting and stopping resource metric collection.
Args:
@ -578,7 +584,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
buffer.clear()
if buffer.tag == tag:
break
buffer = buffer.prev
buffer = buffer.prev # type: ignore[assignment]
def collect(self) -> dict[str, float]:
"""Get the average resource consumption during collection."""
@ -665,7 +671,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
def take_snapshots(self) -> SnapshotResult:
"""Take snapshots of the current resource metrics and update the metric buffer."""
if len(self.root_pids) > 0:
all_gpu_processes = []
all_gpu_processes: list[GpuProcess] = []
for device in self.leaf_devices:
all_gpu_processes.extend(device.processes().values())
@ -685,7 +691,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
positive = True
break
try:
p = p.parent()
p = p.parent() # type: ignore[assignment]
except host.PsutilError:
break
if positive:
@ -700,8 +706,8 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
timestamp = timer()
metrics = {}
devices = [device.as_snapshot() for device in self.all_devices]
gpu_processes = GpuProcess.take_snapshots(gpu_processes, failsafe=True)
device_snapshots = [device.as_snapshot() for device in self.all_devices]
gpu_process_snapshots = GpuProcess.take_snapshots(gpu_processes, failsafe=True)
metrics.update(
{
@ -722,23 +728,23 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
)
device_identifiers = {}
for device in devices:
identifier = f'gpu:{device.index}'
if isinstance(device.real, CudaDevice):
identifier = f'cuda:{device.cuda_index} ({identifier})'
device_identifiers[device.real] = identifier
for device_snapshot in device_snapshots:
identifier = f'gpu:{device_snapshot.index}'
if isinstance(device_snapshot.real, CudaDevice):
identifier = f'cuda:{device_snapshot.cuda_index} ({identifier})'
device_identifiers[device_snapshot.real] = identifier
for attr, name, unit in self.DEVICE_METRICS:
value = float(getattr(device, attr)) / unit
value = float(getattr(device_snapshot, attr)) / unit
metrics[f'{identifier}/{name}'] = value
for process in gpu_processes:
device_identifier = device_identifiers[process.device]
identifier = f'pid:{process.pid}'
for process_snapshot in gpu_process_snapshots:
device_identifier = device_identifiers[process_snapshot.device]
identifier = f'pid:{process_snapshot.pid}'
for attr, scope, name, unit in self.PROCESS_METRICS:
scope = scope or device_identifier
value = float(getattr(process, attr)) / unit
value = float(getattr(process_snapshot, attr)) / unit
metrics[f'{identifier}/{scope}/{name}'] = value
with self._lock:
@ -746,7 +752,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes
self._metric_buffer.add(metrics, timestamp=timestamp)
self._last_timestamp = timestamp
return SnapshotResult(devices, gpu_processes)
return SnapshotResult(device_snapshots, gpu_process_snapshots)
def _target(self) -> None:
self._daemon_running.wait()
@ -762,17 +768,20 @@ class _MetricBuffer: # pylint: disable=missing-class-docstring,missing-function
collector: ResourceMetricCollector,
prev: _MetricBuffer | None = None,
) -> None:
self.collector = collector
self.prev = prev
self.collector: ResourceMetricCollector = collector
self.prev: _MetricBuffer | None = prev
self.tag = tag
self.tag: str = tag
self.key_prefix: str
if self.prev is not None:
self.key_prefix = f'{self.prev.key_prefix}/{self.tag}'
else:
self.key_prefix = self.tag
self.last_timestamp = self.start_timestamp = timer()
self.buffer = defaultdict(lambda: _StatisticsMaintainer(self.last_timestamp))
self.buffer: defaultdict[str, _StatisticsMaintainer] = defaultdict(
lambda: _StatisticsMaintainer(self.last_timestamp),
)
self.len = 0
@ -817,13 +826,13 @@ class _MetricBuffer: # pylint: disable=missing-class-docstring,missing-function
class _StatisticsMaintainer: # pylint: disable=missing-class-docstring,missing-function-docstring
def __init__(self, timestamp: float) -> None:
self.start_timestamp = timestamp
self.last_timestamp = None
self.integral = None
self.last_value = None
self.min_value = None
self.max_value = None
self.has_nan = False
self.start_timestamp: float = timestamp
self.last_timestamp: float = math.nan
self.integral: float | None = None
self.last_value: float | None = None
self.min_value: float | None = None
self.max_value: float | None = None
self.has_nan: bool = False
def add(self, value: float, timestamp: float | None = None) -> None:
if timestamp is None:
@ -837,30 +846,32 @@ class _StatisticsMaintainer: # pylint: disable=missing-class-docstring,missing-
self.integral = value * (timestamp - self.start_timestamp)
self.last_value = self.min_value = self.max_value = value
else:
self.integral += (value + self.last_value) * (timestamp - self.last_timestamp) / 2.0
# pylint: disable-next=line-too-long
self.integral += (value + self.last_value) * (timestamp - self.last_timestamp) / 2.0 # type: ignore[operator]
self.last_value = value
self.min_value = min(self.min_value, value)
self.max_value = max(self.max_value, value)
self.min_value = min(self.min_value, value) # type: ignore[type-var]
self.max_value = max(self.max_value, value) # type: ignore[type-var]
self.last_timestamp = timestamp
def mean(self) -> float:
if self.integral is None:
return math.nan
if self.has_nan:
if self.integral is None:
return math.nan
return self.integral / (self.last_timestamp - self.start_timestamp)
timestamp = timer()
integral = self.integral + self.last_value * (timestamp - self.last_timestamp)
integral = self.integral + self.last_value * (timestamp - self.last_timestamp) # type: ignore[operator]
return integral / (timestamp - self.start_timestamp)
def min(self) -> float:
if self.has_nan and self.min_value is None:
if self.min_value is None:
return math.nan
return self.min_value
def max(self) -> float:
if self.has_nan and self.max_value is None:
if self.max_value is None:
return math.nan
return self.max_value

View file

@ -69,6 +69,7 @@ Examples:
)
>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0'
>>> CudaDevice.count() # number of NVIDIA GPUs visible to CUDA applications
@ -113,13 +114,17 @@ import sys
import textwrap
import threading
from collections import OrderedDict
from typing import Any, Callable, Iterable, NamedTuple
from typing import TYPE_CHECKING, Any, Callable, Generator, Hashable, Iterable, NamedTuple, overload
from nvitop.api import libcuda, libcudart, libnvml
from nvitop.api.process import GpuProcess
from nvitop.api.utils import NA, NaType, Snapshot, boolify, bytes2human, memoize_when_activated
if TYPE_CHECKING:
from typing_extensions import Literal # Python 3.8+
__all__ = [
'Device',
'PhysicalDevice',
@ -158,7 +163,7 @@ class UtilizationRates(NamedTuple): # in percentage # pylint: disable=missing-c
decoder: int | NaType
_VALUE_OMITTED = object()
_VALUE_OMITTED: str = object() # type: ignore[assignment]
class Device: # pylint: disable=too-many-instance-attributes,too-many-public-methods
@ -226,7 +231,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
# GPU UUID : `GPU-<GPU-UUID>`
# MIG UUID : `MIG-GPU-<GPU-UUID>/<GPU instance ID>/<compute instance ID>`
# MIG UUID (R470+): `MIG-<MIG-UUID>`
UUID_PATTERN = re.compile(
UUID_PATTERN: re.Pattern = re.compile(
r"""^ # full match
(?:(?P<MigMode>MIG)-)? # prefix for MIG UUID
(?:(?P<GpuUuid>GPU)-)? # prefix for GPU UUID
@ -243,10 +248,12 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
flags=re.VERBOSE,
)
GPU_PROCESS_CLASS = GpuProcess
cuda = None # defined in below
GPU_PROCESS_CLASS: type[GpuProcess] = GpuProcess
cuda: type[CudaDevice] = None # type: ignore[assignment] # defined in below
"""Shortcut for class :class:`CudaDevice`."""
_nvml_index: int | tuple[int, int]
@classmethod
def is_available(cls) -> bool:
"""Test whether there are any devices and the NVML library is successfully loaded."""
@ -347,7 +354,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
@classmethod
def all(cls) -> list[PhysicalDevice]:
"""Return a list of all physical devices in the system."""
return cls.from_indices()
return cls.from_indices() # type: ignore[return-value]
@classmethod
def from_indices(
@ -388,7 +395,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
if isinstance(indices, int):
indices = [indices]
return list(map(cls, indices))
return list(map(cls, indices)) # type: ignore[arg-type]
@staticmethod
def from_cuda_visible_devices() -> list[CudaDevice]:
@ -408,8 +415,9 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
""" # pylint: disable=line-too-long
visible_device_indices = Device.parse_cuda_visible_devices()
cuda_devices = []
for cuda_index, device_index in enumerate(visible_device_indices):
device_index: int | tuple[int, int]
cuda_devices: list[CudaDevice] = []
for cuda_index, device_index in enumerate(visible_device_indices): # type: ignore[assignment]
cuda_devices.append(CudaDevice(cuda_index, nvml_index=device_index))
return cuda_devices
@ -552,7 +560,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
if cls is not Device:
return super().__new__(cls)
match = None
match: re.Match | None = None
if isinstance(index, str):
match = cls.UUID_PATTERN.match(index)
if match is not None: # passed by UUID
@ -608,17 +616,17 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
arg.encode() if isinstance(arg, str) else arg for arg in (index, uuid, bus_id)
)
self._name = NA
self._uuid = NA
self._bus_id = NA
self._memory_total = NA
self._memory_total_human = NA
self._is_mig_device = None
self._cuda_index = None
self._cuda_compute_capability = None
self._name: str = NA
self._uuid: str = NA
self._bus_id: str = NA
self._memory_total: int | NaType = NA
self._memory_total_human: str = NA
self._is_mig_device: bool | None = None
self._cuda_index: int | None = None
self._cuda_compute_capability: tuple[int, int] | NaType | None = None
if index is not None:
self._nvml_index = index
self._nvml_index = index # type: ignore[assignment]
try:
self._handle = libnvml.nvmlQuery(
'nvmlDeviceGetHandleByIndex',
@ -647,21 +655,21 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
)
except libnvml.NVMLError_GpuIsLost:
self._handle = None
self._nvml_index = NA
self._nvml_index = NA # type: ignore[assignment]
self._name = 'ERROR: GPU is Lost'
except libnvml.NVMLError_Unknown:
self._handle = None
self._nvml_index = NA
self._nvml_index = NA # type: ignore[assignment]
self._name = 'ERROR: Unknown'
else:
self._nvml_index = libnvml.nvmlQuery('nvmlDeviceGetIndex', self._handle)
self._max_clock_infos = ClockInfos(graphics=NA, sm=NA, memory=NA, video=NA)
self._timestamp = 0
self._lock = threading.RLock()
self._max_clock_infos: ClockInfos = ClockInfos(graphics=NA, sm=NA, memory=NA, video=NA)
self._timestamp: int = 0
self._lock: threading.RLock = threading.RLock()
self._ident = (self.index, self.uuid())
self._hash = None
self._ident: tuple[Hashable, str] = (self.index, self.uuid())
self._hash: int | None = None
def __repr__(self) -> str:
"""Return a string representation of the device."""
@ -709,7 +717,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
(8, 6)
""" # pylint: disable=line-too-long
try:
return super().__getattr__(name)
return super().__getattr__(name) # type: ignore[misc]
except AttributeError:
if name == '_cache':
raise
@ -779,7 +787,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
An int for the physical device index. For MIG devices, returns the index of the parent
physical device.
"""
return self._nvml_index # will be overridden in MigDevice
return self._nvml_index # type: ignore[return-value] # will be overridden in MigDevice
@property
def handle(self) -> libnvml.c_nvmlDevice_t:
@ -800,7 +808,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
if self._cuda_index is None:
visible_device_indices = self.parse_cuda_visible_devices()
try:
self._cuda_index = visible_device_indices.index(self.index)
self._cuda_index = visible_device_indices.index(self.index) # type: ignore[arg-type]
except ValueError as ex:
raise RuntimeError(
f'CUDA Error: Device(index={self.index}) is not visible to CUDA applications',
@ -1628,13 +1636,15 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
"""
return self.is_mig_device() or not self.is_mig_mode_enabled()
def to_leaf_devices(self) -> list[PhysicalDevice | MigDevice | CudaDevice]:
def to_leaf_devices(
self,
) -> list[PhysicalDevice] | list[MigDevice] | list[CudaDevice] | list[CudaMigDevice]:
"""Return a list of leaf devices.
Note that a CUDA device is always a leaf device.
"""
if isinstance(self, CudaDevice) or self.is_leaf_device():
return [self]
return [self] # type: ignore[return-value]
return self.mig_devices()
def processes(self) -> dict[int, GpuProcess]:
@ -1656,7 +1666,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
else:
# Used GPU memory is `N/A` on Windows Display Driver Model (WDDM)
# or on MIG-enabled GPUs
gpu_memory = NA
gpu_memory = NA # type: ignore[assignment]
found_na = True
proc = processes[p.pid] = self.GPU_PROCESS_CLASS(
pid=p.pid,
@ -1740,7 +1750,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
# Modified from psutil (https://github.com/giampaolo/psutil)
@contextlib.contextmanager
def oneshot(self) -> contextlib.AbstractContextManager:
def oneshot(self) -> Generator[None, None, None]:
"""A utility context manager which considerably speeds up the retrieval of multiple device information at the same time.
Internally different device info (e.g. memory_info, utilization_rates, ...) may be fetched
@ -1781,22 +1791,22 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
yield
else:
try:
self.memory_info.cache_activate(self)
self.bar1_memory_info.cache_activate(self)
self.utilization_rates.cache_activate(self)
self.clock_infos.cache_activate(self)
self.max_clock_infos.cache_activate(self)
self.power_usage.cache_activate(self)
self.power_limit.cache_activate(self)
self.memory_info.cache_activate(self) # type: ignore[attr-defined]
self.bar1_memory_info.cache_activate(self) # type: ignore[attr-defined]
self.utilization_rates.cache_activate(self) # type: ignore[attr-defined]
self.clock_infos.cache_activate(self) # type: ignore[attr-defined]
self.max_clock_infos.cache_activate(self) # type: ignore[attr-defined]
self.power_usage.cache_activate(self) # type: ignore[attr-defined]
self.power_limit.cache_activate(self) # type: ignore[attr-defined]
yield
finally:
self.memory_info.cache_deactivate(self)
self.bar1_memory_info.cache_deactivate(self)
self.utilization_rates.cache_deactivate(self)
self.clock_infos.cache_deactivate(self)
self.max_clock_infos.cache_deactivate(self)
self.power_usage.cache_deactivate(self)
self.power_limit.cache_deactivate(self)
self.memory_info.cache_deactivate(self) # type: ignore[attr-defined]
self.bar1_memory_info.cache_deactivate(self) # type: ignore[attr-defined]
self.utilization_rates.cache_deactivate(self) # type: ignore[attr-defined]
self.clock_infos.cache_deactivate(self) # type: ignore[attr-defined]
self.max_clock_infos.cache_deactivate(self) # type: ignore[attr-defined]
self.power_usage.cache_deactivate(self) # type: ignore[attr-defined]
self.power_limit.cache_deactivate(self) # type: ignore[attr-defined]
class PhysicalDevice(Device):
@ -1805,6 +1815,10 @@ class PhysicalDevice(Device):
This is the real GPU installed in the system.
"""
_nvml_index: int
index: int
nvml_index: int
@property
def physical_index(self) -> int:
"""Zero based index of the GPU. Can change at each boot.
@ -1864,13 +1878,16 @@ class PhysicalDevice(Device):
class MigDevice(Device): # pylint: disable=too-many-instance-attributes
"""Class for MIG devices."""
_nvml_index: tuple[int, int]
nvml_index: tuple[int, int]
@classmethod
def count(cls) -> int:
"""The number of total MIG devices aggregated over all physical devices."""
return len(cls.all())
@classmethod
def all(cls) -> list[MigDevice]:
def all(cls) -> list[MigDevice]: # type: ignore[override]
"""Return a list of MIG devices aggregated over all physical devices."""
mig_devices = []
for device in PhysicalDevice.all():
@ -1878,7 +1895,7 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes
return mig_devices
@classmethod
def from_indices( # pylint: disable=signature-differs
def from_indices( # type: ignore[override] # pylint: disable=signature-differs
cls,
indices: Iterable[tuple[int, int]],
) -> list[MigDevice]:
@ -1929,19 +1946,19 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes
index, uuid = (arg.encode() if isinstance(arg, str) else arg for arg in (index, uuid))
self._name = NA
self._uuid = NA
self._bus_id = NA
self._memory_total = NA
self._memory_total_human = NA
self._gpu_instance_id = NA
self._compute_instance_id = NA
self._is_mig_device = True
self._cuda_index = None
self._cuda_compute_capability = None
self._name: str = NA
self._uuid: str = NA
self._bus_id: str = NA
self._memory_total: int | NaType = NA
self._memory_total_human: str = NA
self._gpu_instance_id: int | NaType = NA
self._compute_instance_id: int | NaType = NA
self._is_mig_device: bool = True
self._cuda_index: int | None = None
self._cuda_compute_capability: tuple[int, int] | NaType | None = None
if index is not None:
self._nvml_index = index
self._nvml_index = index # type: ignore[assignment]
self._handle = None
parent = _get_global_physical_device()
@ -2047,7 +2064,7 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes
The attributes are defined in :attr:`SNAPSHOT_KEYS`.
"""
snapshot = super().as_snapshot()
snapshot.mig_index = self.mig_index
snapshot.mig_index = self.mig_index # type: ignore[attr-defined]
return snapshot
@ -2074,6 +2091,7 @@ class CudaDevice(Device):
Examples:
>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '3,2,1,0'
>>> CudaDevice.count() # number of NVIDIA GPUs visible to CUDA applications
@ -2123,6 +2141,10 @@ class CudaDevice(Device):
If the index is out of range for the given ``CUDA_VISIBLE_DEVICES`` environment variable.
""" # pylint: disable=line-too-long
_nvml_index: int
index: int
nvml_index: int
@classmethod
def is_available(cls) -> bool:
"""Test whether there are any CUDA-capable devices available."""
@ -2137,7 +2159,7 @@ class CudaDevice(Device):
return 0
@classmethod
def all(cls) -> list[CudaDevice]:
def all(cls) -> list[CudaDevice]: # type: ignore[override]
"""All CUDA visible devices.
Note:
@ -2146,7 +2168,7 @@ class CudaDevice(Device):
return cls.from_indices()
@classmethod
def from_indices(
def from_indices( # type: ignore[override]
cls,
indices: int | Iterable[int] | None = None,
) -> list[CudaDevice]:
@ -2184,7 +2206,7 @@ class CudaDevice(Device):
*,
nvml_index: int | tuple[int, int] | None = None,
uuid: str | None = None,
) -> Device:
) -> CudaDevice:
"""Create a new instance of CudaDevice.
The type of the result is determined by the given argument.
@ -2209,16 +2231,22 @@ class CudaDevice(Device):
RuntimeError:
If the index is out of range for the given ``CUDA_VISIBLE_DEVICES`` environment variable.
"""
if nvml_index is not None and uuid is not None:
raise TypeError(
f'CudaDevice(cuda_index=None, nvml_index=None, uuid=None) takes 1 non-None arguments '
f'but (cuda_index, nvml_index, uuid) = {(cuda_index, nvml_index, uuid)!r} were given',
)
if cuda_index is not None and nvml_index is None and uuid is None:
cuda_visible_devices = cls.parse_cuda_visible_devices()
if not isinstance(cuda_index, int) or not 0 <= cuda_index < len(cuda_visible_devices):
raise RuntimeError(f'CUDA Error: invalid device ordinal: {cuda_index!r}.')
nvml_index = cuda_visible_devices[cuda_index]
if not isinstance(nvml_index, int) or is_mig_device_uuid(uuid):
return super().__new__(CudaMigDevice, index=nvml_index, uuid=uuid)
if (nvml_index is not None and not isinstance(nvml_index, int)) or is_mig_device_uuid(uuid):
return super().__new__(CudaMigDevice, index=nvml_index, uuid=uuid) # type: ignore[return-value]
return super().__new__(cls, index=nvml_index, uuid=uuid)
return super().__new__(cls, index=nvml_index, uuid=uuid) # type: ignore[return-value]
def __init__(
self,
@ -2251,13 +2279,13 @@ class CudaDevice(Device):
raise RuntimeError(f'CUDA Error: invalid device ordinal: {cuda_index!r}.')
nvml_index = cuda_visible_devices[cuda_index]
super().__init__(index=nvml_index, uuid=uuid)
super().__init__(index=nvml_index, uuid=uuid) # type: ignore[arg-type]
if cuda_index is None:
cuda_index = super().cuda_index
self._cuda_index = cuda_index
self._cuda_index: int = cuda_index
self._ident = ((self._cuda_index, self.index), self.uuid())
self._ident: tuple[Hashable, str] = ((self._cuda_index, self.index), self.uuid())
def __repr__(self) -> str:
"""Return a string representation of the CUDA device."""
@ -2279,7 +2307,7 @@ class CudaDevice(Device):
The attributes are defined in :attr:`SNAPSHOT_KEYS`.
"""
snapshot = super().as_snapshot()
snapshot.cuda_index = self.cuda_index
snapshot.cuda_index = self.cuda_index # type: ignore[attr-defined]
return snapshot
@ -2288,9 +2316,13 @@ Device.cuda = CudaDevice
"""Shortcut for class :class:`CudaDevice`."""
class CudaMigDevice(CudaDevice, MigDevice):
class CudaMigDevice(CudaDevice, MigDevice): # type: ignore[misc]
"""Class for CUDA devices that are MIG devices."""
_nvml_index: tuple[int, int] # type: ignore[assignment]
index: tuple[int, int] # type: ignore[assignment]
nvml_index: tuple[int, int] # type: ignore[assignment]
def is_mig_device_uuid(uuid: str | None) -> bool:
"""Return :data:`True` if the argument is a MIG device UUID, otherwise, return :data:`False`."""
@ -2327,6 +2359,7 @@ def parse_cuda_visible_devices(
Examples:
>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '6,5'
>>> parse_cuda_visible_devices() # parse the `CUDA_VISIBLE_DEVICES` environment variable to NVML indices
[6, 5]
@ -2383,6 +2416,7 @@ def normalize_cuda_visible_devices(cuda_visible_devices: str | None = _VALUE_OMI
Examples:
>>> import os
>>> os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
>>> os.environ['CUDA_VISIBLE_DEVICES'] = '6,5'
>>> normalize_cuda_visible_devices() # normalize the `CUDA_VISIBLE_DEVICES` environment variable to UUID strings
'GPU-849d5a8d-610e-eeea-1fd4-81ff44a23794,GPU-18ef14e9-dec6-1d7e-1284-3010c6ce98b1'
@ -2420,7 +2454,7 @@ def normalize_cuda_visible_devices(cuda_visible_devices: str | None = _VALUE_OMI
class _PhysicalDeviceAttrs(NamedTuple):
index: int
index: int # type: ignore[assignment]
name: str
uuid: str
support_mig_mode: bool
@ -2460,7 +2494,7 @@ def _does_any_device_support_mig_mode(uuids: Iterable[str] | None = None) -> boo
@contextlib.contextmanager
def _global_physical_device(device: PhysicalDevice) -> PhysicalDevice:
def _global_physical_device(device: PhysicalDevice) -> Generator[PhysicalDevice, None, None]:
global _GLOBAL_PHYSICAL_DEVICE # pylint: disable=global-statement
with _GLOBAL_PHYSICAL_DEVICE_LOCK:
@ -2473,13 +2507,29 @@ def _global_physical_device(device: PhysicalDevice) -> PhysicalDevice:
def _get_global_physical_device() -> PhysicalDevice:
with _GLOBAL_PHYSICAL_DEVICE_LOCK:
return _GLOBAL_PHYSICAL_DEVICE
return _GLOBAL_PHYSICAL_DEVICE # type: ignore[return-value]
@overload
def _parse_cuda_visible_devices(
cuda_visible_devices: str | None,
format: Literal['index'], # pylint: disable=redefined-builtin
) -> list[int] | list[tuple[int, int]]:
...
@overload
def _parse_cuda_visible_devices(
cuda_visible_devices: str | None,
format: Literal['uuid'], # pylint: disable=redefined-builtin
) -> list[str]:
...
@functools.lru_cache()
def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-statements
cuda_visible_devices: str | None = None,
format: str = 'index', # pylint: disable=redefined-builtin
format: Literal['index', 'uuid'] = 'index', # pylint: disable=redefined-builtin
) -> list[int] | list[tuple[int, int]] | list[str]:
"""The underlining implementation for :meth:`parse_cuda_visible_devices`. The result will be cached."""
assert format in ('index', 'uuid')
@ -2487,7 +2537,7 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s
try:
physical_device_attrs = _get_all_physical_device_attrs()
except libnvml.NVMLError:
return []
return [] # type: ignore[return-value]
gpu_uuids = set(physical_device_attrs)
try:
@ -2532,6 +2582,10 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s
if cuda_visible_devices is None:
cuda_visible_devices = ','.join(physical_device_attrs.keys())
devices: list[Device] = []
presented: set[str] = set()
use_integer_identifiers: bool | None = None
def from_index_or_uuid(index_or_uuid: int | str) -> Device:
nonlocal use_integer_identifiers
@ -2562,12 +2616,9 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s
identifier = identifier[:offset]
return identifier
devices = []
presented = set()
use_integer_identifiers = None
for identifier in map(strip_identifier, cuda_visible_devices.split(',')):
if identifier in presented:
return [] # duplicate identifiers found
return [] # type: ignore[return-value] # duplicate identifiers found
try:
device = from_index_or_uuid(identifier)
@ -2600,7 +2651,7 @@ def _parse_cuda_visible_devices( # pylint: disable=too-many-branches,too-many-s
if format == 'uuid':
return [device.uuid() for device in devices]
return [device.index for device in devices]
return [device.index for device in devices] # type: ignore[return-value]
def _parse_cuda_visible_devices_to_uuids(
@ -2655,7 +2706,7 @@ def _parse_cuda_visible_devices_to_uuids(
def _cuda_visible_devices_parser(
cuda_visible_devices: str,
cuda_visible_devices: str | None,
queue: mp.SimpleQueue,
verbose: bool = True,
) -> None:

View file

@ -43,13 +43,21 @@ __all__ = [name for name in _psutil.__all__ if not name.startswith('_')] + [
__all__[__all__.index('Error')] = 'PsutilError'
PsutilError = Error # make alias # noqa: F405
del Error # noqa: F821 # pylint: disable=undefined-variable
PsutilError = Error = _psutil.Error # make alias
del Error
cpu_percent = _psutil.cpu_percent
virtual_memory = _psutil.virtual_memory
swap_memory = _psutil.swap_memory
Process = _psutil.Process
NoSuchProcess = _psutil.NoSuchProcess
ZombieProcess = _psutil.ZombieProcess
AccessDenied = _psutil.AccessDenied
POSIX = _psutil.POSIX
WINDOWS = _psutil.WINDOWS
LINUX = _psutil.LINUX
MACOS = _psutil.MACOS
if hasattr(_psutil, 'getloadavg'):
@ -60,7 +68,7 @@ if hasattr(_psutil, 'getloadavg'):
else:
def load_average() -> None:
def load_average() -> None: # type: ignore[misc]
"""Get the system load average."""
return
@ -95,7 +103,7 @@ def reverse_ppid_map() -> dict[int, list[int]]: # pylint: disable=function-rede
return tree
if LINUX: # noqa: F405
if LINUX:
WSL = _os.getenv('WSL_DISTRO_NAME', default=None)
if WSL is not None and WSL == '':
WSL = 'WSL'

View file

@ -26,7 +26,13 @@ import platform as _platform
import string as _string
import sys as _sys
import threading as _threading
from typing import TYPE_CHECKING as _TYPE_CHECKING
from typing import Any as _Any
from typing import Callable as _Callable
if _TYPE_CHECKING:
from typing_extensions import TypeAlias as _TypeAlias # Python 3.10+
# pylint: disable-next=missing-class-docstring,too-few-public-methods
@ -34,9 +40,9 @@ class _struct_c_CUdevice_t(_ctypes.Structure):
pass # opaque handle
_c_CUdevice_t = _ctypes.POINTER(_struct_c_CUdevice_t)
_c_CUdevice_t: _TypeAlias = _ctypes.POINTER(_struct_c_CUdevice_t) # type: ignore[valid-type]
_CUresult_t = _ctypes.c_uint
_CUresult_t: _TypeAlias = _ctypes.c_uint
# Error codes #
# pylint: disable=line-too-long
@ -215,8 +221,8 @@ CUDA_ERROR_UNKNOWN = 999
class CUDAError(Exception):
"""Base exception class for CUDA driver query errors."""
_value_class_mapping = {}
_errcode_to_string = { # List of currently known error codes
_value_class_mapping: dict[int, type[CUDAError]] = {}
_errcode_to_string: dict[int, str] = { # List of currently known error codes
CUDA_ERROR_NOT_INITIALIZED: 'Initialization error.',
CUDA_ERROR_NOT_FOUND: 'Named symbol not found.',
CUDA_ERROR_INVALID_VALUE: 'Invalid argument.',
@ -227,7 +233,8 @@ class CUDAError(Exception):
CUDA_ERROR_COMPAT_NOT_SUPPORTED_ON_DEVICE: 'Forward compatibility was attempted on non supported Hardware.',
CUDA_ERROR_INVALID_CONTEXT: 'Invalid device context.',
} # fmt:skip
_errcode_to_name = {}
_errcode_to_name: dict[int, str] = {}
value: int
def __new__(cls, value: int) -> CUDAError:
"""Map value to a proper subclass of :class:`CUDAError`."""
@ -295,8 +302,8 @@ def _extract_cuda_errors_as_classes() -> None:
class_name = f'CUDAError_{pascal_case}'
err_val = getattr(this_module, err_name)
def gen_new(value):
def new(cls):
def gen_new(value: int) -> _Callable[[type[CUDAError]], CUDAError]:
def new(cls: type[CUDAError]) -> CUDAError:
return CUDAError.__new__(cls, value)
return new
@ -317,6 +324,24 @@ def _extract_cuda_errors_as_classes() -> None:
CUDAError._errcode_to_name[err_val] = err_name
# Add explicit references to appease linters
class __CUDAError(CUDAError):
value: int
def __new__(cls) -> CUDAError: # type: ignore[misc,empty-body]
...
CUDAError_NotInitialized: type[__CUDAError]
CUDAError_NotFound: type[__CUDAError]
CUDAError_InvalidValue: type[__CUDAError]
CUDAError_NoDevice: type[__CUDAError]
CUDAError_InvalidDevice: type[__CUDAError]
CUDAError_SystemDriverMismatch: type[__CUDAError]
CUDAError_Deinitialized: type[__CUDAError]
CUDAError_CompatNotSupportedOnDevice: type[__CUDAError]
CUDAError_InvalidContext: type[__CUDAError]
_extract_cuda_errors_as_classes()
del _extract_cuda_errors_as_classes
@ -328,14 +353,14 @@ def _cudaCheckReturn(ret: _Any) -> _Any:
# Function access #
__cudaLib = None
__initialized = False
__libLoadLock = _threading.Lock()
__cudaLib: _ctypes.CDLL | None = None
__initialized: bool = False
__libLoadLock: _threading.Lock = _threading.Lock()
# Function pointers are cached to prevent unnecessary libLoadLock locking
__cudaGetFunctionPointer_cache = {}
__cudaGetFunctionPointer_cache: dict[str, _ctypes._CFuncPtr] = {} # type: ignore[name-defined]
def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr:
def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined]
"""Get the function pointer from the CUDA driver library.
Raises:
@ -658,11 +683,10 @@ def cuDeviceGetUuid(device: _c_CUdevice_t) -> str:
except CUDAError_NotFound: # noqa: F821 # pylint: disable=undefined-variable
fn = __cudaGetFunctionPointer('cuDeviceGetUuid')
ubyte_array = _ctypes.c_ubyte * 16
uuid = ubyte_array()
uuid = _ctypes.create_string_buffer(16)
ret = fn(uuid, device)
_cudaCheckReturn(ret)
uuid = ''.join(map('{:02x}'.format, uuid))
uuid = ''.join(map('{:02x}'.format, uuid.value))
return '-'.join((uuid[:8], uuid[8:12], uuid[12:16], uuid[16:20], uuid[20:32]))
@ -682,11 +706,10 @@ def cuDeviceGetUuid_v2(device: _c_CUdevice_t) -> str:
"""
fn = __cudaGetFunctionPointer('cuDeviceGetUuid_v2')
ubyte_array = _ctypes.c_ubyte * 16
uuid = ubyte_array()
uuid = _ctypes.create_string_buffer(16)
ret = fn(uuid, device)
_cudaCheckReturn(ret)
uuid = ''.join(map('{:0x}'.format, uuid.value))
uuid = ''.join(map('{:02x}'.format, uuid.value))
return '-'.join((uuid[:8], uuid[8:12], uuid[12:16], uuid[16:20], uuid[20:32]))

View file

@ -27,6 +27,7 @@ import platform as _platform
import sys as _sys
import threading as _threading
from typing import Any as _Any
from typing import Callable as _Callable
_cudaError_t = _ctypes.c_int
@ -266,8 +267,8 @@ cudaErrorUnknown = 999
class cudaError(Exception):
"""Base exception class for CUDA driver query errors."""
_value_class_mapping = {}
_errcode_to_string = { # List of currently known error codes
_value_class_mapping: dict[int, type[cudaError]] = {}
_errcode_to_string: dict[int, str] = { # List of currently known error codes
cudaErrorInitializationError: 'Initialization error.',
cudaErrorSymbolNotFound: 'Named symbol not found.',
cudaErrorInvalidValue: 'Invalid argument.',
@ -278,7 +279,8 @@ class cudaError(Exception):
cudaErrorCompatNotSupportedOnDevice: 'Forward compatibility was attempted on non supported Hardware.',
cudaErrorDeviceUninitialized: 'Invalid device context.',
} # fmt:skip
_errcode_to_name = {}
_errcode_to_name: dict[int, str] = {}
value: int
def __new__(cls, value: int) -> cudaError:
"""Map value to a proper subclass of :class:`cudaError`."""
@ -349,8 +351,8 @@ def _extract_cuda_errors_as_classes() -> None:
class_name = err_name.replace('cudaError', 'cudaError_')
err_val = getattr(this_module, err_name)
def gen_new(value):
def new(cls):
def gen_new(value: int) -> _Callable[[type[cudaError]], cudaError]:
def new(cls: type[cudaError]) -> cudaError:
return cudaError.__new__(cls, value)
return new
@ -371,6 +373,24 @@ def _extract_cuda_errors_as_classes() -> None:
cudaError._errcode_to_name[err_val] = err_name
# Add explicit references to appease linters
class __cudaError(cudaError):
value: int
def __new__(cls) -> cudaError: # type: ignore[misc,empty-body]
...
cudaError_InitializationError: type[__cudaError]
cudaError_SymbolNotFound: type[__cudaError]
cudaError_InvalidValue: type[__cudaError]
cudaError_NoDevice: type[__cudaError]
cudaError_InvalidDevice: type[__cudaError]
cudaError_SystemDriverMismatch: type[__cudaError]
cudaError_CudartUnloading: type[__cudaError]
cudaError_CompatNotSupportedOnDevice: type[__cudaError]
cudaError_DeviceUninitialized: type[__cudaError]
_extract_cuda_errors_as_classes()
del _extract_cuda_errors_as_classes
@ -382,13 +402,13 @@ def _cudaCheckReturn(ret: _Any) -> _Any:
# Function access #
__cudaLib = None
__libLoadLock = _threading.Lock()
__cudaLib: _ctypes.CDLL | None = None
__libLoadLock: _threading.Lock = _threading.Lock()
# Function pointers are cached to prevent unnecessary libLoadLock locking
__cudaGetFunctionPointer_cache = {}
__cudaGetFunctionPointer_cache: dict[str, _ctypes._CFuncPtr] = {} # type: ignore[name-defined]
def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr:
def __cudaGetFunctionPointer(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined]
"""Get the function pointer from the CUDA Runtime library.
Raises:

View file

@ -31,6 +31,7 @@ import sys as _sys
import threading as _threading
from types import FunctionType as _FunctionType
from types import ModuleType as _ModuleType
from typing import TYPE_CHECKING as _TYPE_CHECKING
from typing import Any as _Any
from typing import Callable as _Callable
@ -38,11 +39,16 @@ from typing import Callable as _Callable
# https://pypi.org/project/nvidia-ml-py
import pynvml as _pynvml
from pynvml import * # noqa: F403 # pylint: disable=wildcard-import,unused-wildcard-import
from pynvml import nvmlDeviceGetPciInfo # appease mypy # noqa: F401 # pylint: disable=unused-import
from nvitop.api.utils import NA
from nvitop.api.utils import colored as __colored
if _TYPE_CHECKING:
from typing_extensions import TypeAlias as _TypeAlias # Python 3.10+
__all__ = [ # will be updated in below
'NA',
'nvmlCheckReturn',
@ -63,10 +69,10 @@ if not callable(getattr(_pynvml, 'nvmlInitWithFlags', None)):
# Members from `pynvml` ############################################################################
NVMLError = _pynvml.NVMLError
NVMLError: type[_pynvml.NVMLError] = _pynvml.NVMLError
NVMLError.__doc__ = """Base exception class for NVML query errors."""
NVMLError.__new__.__doc__ = """Map value to a proper subclass of :class:`NVMLError`."""
nvmlExceptionClass = _pynvml.nvmlExceptionClass
nvmlExceptionClass: _Callable[[int], type[_pynvml.NVMLError]] = _pynvml.nvmlExceptionClass
nvmlExceptionClass.__doc__ = """Map value to a proper subclass of :class:`NVMLError`."""
# Load members from module `pynvml` and register them in `__all__` and globals.
@ -161,26 +167,40 @@ del (
_sphinx_doc,
)
# 5. Add explicit references to appease linters
# pylint: disable=no-member
c_nvmlDevice_t = _pynvml.c_nvmlDevice_t
NVMLError_FunctionNotFound = _pynvml.NVMLError_FunctionNotFound
NVMLError_GpuIsLost = _pynvml.NVMLError_GpuIsLost
NVMLError_InvalidArgument = _pynvml.NVMLError_InvalidArgument
NVMLError_LibraryNotFound = _pynvml.NVMLError_LibraryNotFound
NVMLError_NoPermission = _pynvml.NVMLError_NoPermission
NVMLError_NotFound = _pynvml.NVMLError_NotFound
NVMLError_NotSupported = _pynvml.NVMLError_NotSupported
NVMLError_Unknown = _pynvml.NVMLError_Unknown
c_nvmlDevice_t: _TypeAlias = _pynvml.c_nvmlDevice_t
NVMLError_FunctionNotFound: _TypeAlias = _pynvml.NVMLError_FunctionNotFound
NVMLError_GpuIsLost: _TypeAlias = _pynvml.NVMLError_GpuIsLost
NVMLError_InvalidArgument: _TypeAlias = _pynvml.NVMLError_InvalidArgument
NVMLError_LibraryNotFound: _TypeAlias = _pynvml.NVMLError_LibraryNotFound
NVMLError_NoPermission: _TypeAlias = _pynvml.NVMLError_NoPermission
NVMLError_NotFound: _TypeAlias = _pynvml.NVMLError_NotFound
NVMLError_NotSupported: _TypeAlias = _pynvml.NVMLError_NotSupported
NVMLError_Unknown: _TypeAlias = _pynvml.NVMLError_Unknown
NVML_CLOCK_GRAPHICS: int = _pynvml.NVML_CLOCK_GRAPHICS
NVML_CLOCK_SM: int = _pynvml.NVML_CLOCK_SM
NVML_CLOCK_MEM: int = _pynvml.NVML_CLOCK_MEM
NVML_CLOCK_VIDEO: int = _pynvml.NVML_CLOCK_VIDEO
NVML_TEMPERATURE_GPU: int = _pynvml.NVML_TEMPERATURE_GPU
NVML_DRIVER_WDDM: int = _pynvml.NVML_DRIVER_WDDM
NVML_DRIVER_WDM: int = _pynvml.NVML_DRIVER_WDM
NVML_MEMORY_ERROR_TYPE_UNCORRECTED: int = _pynvml.NVML_MEMORY_ERROR_TYPE_UNCORRECTED
NVML_VOLATILE_ECC: int = _pynvml.NVML_VOLATILE_ECC
NVML_COMPUTEMODE_DEFAULT: int = _pynvml.NVML_COMPUTEMODE_DEFAULT
NVML_COMPUTEMODE_EXCLUSIVE_THREAD: int = _pynvml.NVML_COMPUTEMODE_EXCLUSIVE_THREAD
NVML_COMPUTEMODE_PROHIBITED: int = _pynvml.NVML_COMPUTEMODE_PROHIBITED
NVML_COMPUTEMODE_EXCLUSIVE_PROCESS: int = _pynvml.NVML_COMPUTEMODE_EXCLUSIVE_PROCESS
# pylint: enable=no-member
# New members in `libnvml` #########################################################################
__flags = []
__initialized = False
__lock = _threading.Lock()
__flags: list[int] = []
__initialized: bool = False
__lock: _threading.Lock = _threading.Lock()
LOGGER = _logging.getLogger(__name__)
LOGGER: _logging.Logger = _logging.getLogger(__name__)
try:
LOGGER.setLevel(_os.getenv('LOGLEVEL', default='WARNING').upper())
except (ValueError, TypeError):
@ -197,9 +217,9 @@ if not LOGGER.hasHandlers() and LOGGER.isEnabledFor(_logging.DEBUG):
LOGGER.addHandler(_file_handler)
del _formatter, _stream_handler, _file_handler
UNKNOWN_FUNCTIONS = {}
UNKNOWN_FUNCTIONS_CACHE_SIZE = 1024
VERSIONED_PATTERN = _re.compile(r'^(?P<name>\w+)(?P<suffix>_v(\d)+)$')
UNKNOWN_FUNCTIONS: dict[str, tuple[_Callable | str, NVMLError_FunctionNotFound]] = {}
UNKNOWN_FUNCTIONS_CACHE_SIZE: int = 1024
VERSIONED_PATTERN: _re.Pattern = _re.compile(r'^(?P<name>\w+)(?P<suffix>_v(\d)+)$')
def _lazy_init() -> None:
@ -389,10 +409,14 @@ def nvmlQuery(
except AttributeError as e1:
raise NVMLError_FunctionNotFound from e1
retval = func(*args, **kwargs)
retval = func(*args, **kwargs) # type: ignore[operator]
except NVMLError_FunctionNotFound as e2:
if not ignore_function_not_found:
identifier = _inspect.getsource(func) if func.__name__ == '<lambda>' else repr(func)
identifier = (
func
if isinstance(func, str)
else (_inspect.getsource(func) if func.__name__ == '<lambda>' else repr(func))
)
with __lock:
if (
identifier not in UNKNOWN_FUNCTIONS
@ -431,7 +455,7 @@ def nvmlCheckReturn(
# Patch layers for backward compatibility ##########################################################
__patched_backward_compatibility_layers = False
__patched_backward_compatibility_layers: bool = False
def __patch_backward_compatibility_layers() -> None:
@ -441,9 +465,9 @@ def __patch_backward_compatibility_layers() -> None:
return
function_name_mapping_lock = _threading.Lock()
function_name_mapping = {}
function_name_mapping: dict[str, str] = {}
def function_mapping_update(mapping):
def function_mapping_update(mapping: dict[str, str]) -> dict[str, str]:
with function_name_mapping_lock:
mapping = dict(mapping)
for name, mapped_name in function_name_mapping.items():
@ -452,10 +476,12 @@ def __patch_backward_compatibility_layers() -> None:
function_name_mapping.update(mapping)
return mapping
def with_mapped_function_name():
def wrapper(nvmlGetFunctionPointer):
def with_mapped_function_name() -> None:
def wrapper(
nvmlGetFunctionPointer: _Callable[[str], _ctypes._CFuncPtr], # type: ignore[name-defined]
) -> _Callable[[str], _ctypes._CFuncPtr]: # type: ignore[name-defined]
@_functools.wraps(nvmlGetFunctionPointer)
def wrapped(name):
def wrapped(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined]
mapped_name = function_name_mapping.get(name, name)
return nvmlGetFunctionPointer(mapped_name)
@ -467,10 +493,18 @@ def __patch_backward_compatibility_layers() -> None:
),
)
def patch_function_pointers_when_fail(names, callback):
def wrapper(nvmlGetFunctionPointer):
def patch_function_pointers_when_fail(
names: set[str],
callback: _Callable[[str, set[str], Exception, _ModuleType, _ModuleType], str],
) -> _Callable[ # type: ignore[name-defined]
[_Callable[[str], _ctypes._CFuncPtr]],
_Callable[[str], _ctypes._CFuncPtr],
]:
def wrapper(
nvmlGetFunctionPointer: _Callable[[str], _ctypes._CFuncPtr], # type: ignore[name-defined]
) -> _Callable[[str], _ctypes._CFuncPtr]: # type: ignore[name-defined]
@_functools.wraps(nvmlGetFunctionPointer)
def wrapped(name):
def wrapped(name: str) -> _ctypes._CFuncPtr: # type: ignore[name-defined]
try:
return nvmlGetFunctionPointer(name)
except NVMLError_FunctionNotFound as ex:
@ -483,12 +517,12 @@ def __patch_backward_compatibility_layers() -> None:
return wrapper
def patch_process_info():
def patch_process_info() -> None:
# pylint: disable-next=protected-access,no-member
PrintableStructure = _pynvml._PrintableStructure
# pylint: disable-next=missing-class-docstring,too-few-public-methods
class c_nvmlProcessInfo_v1_t(PrintableStructure):
class c_nvmlProcessInfo_v1_t(PrintableStructure): # type: ignore[misc,valid-type]
_fields_ = [
('pid', _ctypes.c_uint),
('usedGpuMemory', _ctypes.c_ulonglong),
@ -498,7 +532,7 @@ def __patch_backward_compatibility_layers() -> None:
}
# pylint: disable-next=missing-class-docstring,too-few-public-methods
class c_nvmlProcessInfo_v2_t(PrintableStructure):
class c_nvmlProcessInfo_v2_t(PrintableStructure): # type: ignore[misc,valid-type]
_fields_ = [
('pid', _ctypes.c_uint),
('usedGpuMemory', _ctypes.c_ulonglong),
@ -521,12 +555,12 @@ def __patch_backward_compatibility_layers() -> None:
}
def patch_process_info_callback(
name,
names, # pylint: disable=unused-argument
exception,
pynvml,
modself,
):
name: str,
names: set[str], # pylint: disable=unused-argument
exception: Exception,
pynvml: _ModuleType,
modself: _ModuleType,
) -> str:
if name in nvmlDeviceGetRunningProcesses_v3_v2:
mapping = nvmlDeviceGetRunningProcesses_v3_v2
struct_type = c_nvmlProcessInfo_v2_t
@ -570,16 +604,20 @@ def __patch_backward_compatibility_layers() -> None:
__patched_backward_compatibility_layers = True
_pynvml_installation_corrupted = not callable(getattr(_pynvml, '_nvmlGetFunctionPointer', None))
_pynvml_installation_corrupted: bool = not callable(
getattr(_pynvml, '_nvmlGetFunctionPointer', None),
)
if not _pynvml_installation_corrupted:
__patch_backward_compatibility_layers()
del __patch_backward_compatibility_layers
_pynvml_memory_v2_available = hasattr(_pynvml, 'nvmlMemory_v2')
_pynvml_get_memory_info_v2_available = _pynvml_memory_v2_available
_driver_get_memory_info_v2_available = None if not _pynvml_installation_corrupted else False
_pynvml_memory_v2_available: bool = hasattr(_pynvml, 'nvmlMemory_v2')
_pynvml_get_memory_info_v2_available: bool = _pynvml_memory_v2_available
_driver_get_memory_info_v2_available: bool | None = (
None if not _pynvml_installation_corrupted else False
)
# pylint: disable-next=function-redefined,too-many-branches

View file

@ -27,7 +27,7 @@ import os
import threading
from abc import ABCMeta
from types import FunctionType
from typing import TYPE_CHECKING, Any, Callable, Iterable
from typing import TYPE_CHECKING, Any, Callable, Generator, Iterable
from weakref import WeakValueDictionary
from nvitop.api import host, libnvml
@ -181,8 +181,14 @@ class HostProcess(host.Process, metaclass=ABCMeta):
)
"""
INSTANCE_LOCK = threading.RLock()
INSTANCES = WeakValueDictionary()
INSTANCE_LOCK: threading.RLock = threading.RLock()
INSTANCES: WeakValueDictionary[int, HostProcess] = WeakValueDictionary()
_pid: int
_super_gone: bool
_username: str | None
_ident: tuple
_lock: threading.RLock
def __new__(cls, pid: int | None = None) -> HostProcess:
"""Return the cached instance of :class:`HostProcess`."""
@ -378,7 +384,7 @@ class HostProcess(host.Process, metaclass=ABCMeta):
return [HostProcess(child.pid) for child in super().children(recursive)]
@contextlib.contextmanager
def oneshot(self) -> contextlib.AbstractContextManager:
def oneshot(self) -> Generator[None, None, None]:
"""A utility context manager which considerably speeds up the retrieval of multiple process information at the same time.
Internally different process info (e.g. name, ppid, uids, gids, ...) may be fetched by using
@ -405,12 +411,12 @@ class HostProcess(host.Process, metaclass=ABCMeta):
with super().oneshot():
# pylint: disable=no-member
try:
self.cmdline.cache_activate(self)
self.running_time.cache_activate(self)
self.cmdline.cache_activate(self) # type: ignore[attr-defined]
self.running_time.cache_activate(self) # type: ignore[attr-defined]
yield
finally:
self.cmdline.cache_deactivate(self)
self.running_time.cache_deactivate(self)
self.cmdline.cache_deactivate(self) # type: ignore[attr-defined]
self.running_time.cache_deactivate(self) # type: ignore[attr-defined]
def as_snapshot(
self,
@ -441,13 +447,20 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
representing the same PID on the host but different GPU devices are different.
"""
INSTANCE_LOCK = threading.RLock()
INSTANCES = WeakValueDictionary()
INSTANCE_LOCK: threading.RLock = threading.RLock()
INSTANCES: WeakValueDictionary[tuple[int, Device], GpuProcess] = WeakValueDictionary()
_pid: int
_host: HostProcess
_device: Device
_username: str | None
_ident: tuple
_hash: int | None
# pylint: disable-next=too-many-arguments
def __new__(
cls,
pid: int,
pid: int | None,
device: Device,
# pylint: disable=unused-argument
gpu_memory: int | NaType | None = None,
@ -485,7 +498,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
# pylint: disable-next=too-many-arguments
def __init__(
self,
pid: int, # pylint: disable=unused-argument
pid: int | None, # pylint: disable=unused-argument
device: Device,
gpu_memory: int | NaType | None = None,
gpu_instance_id: int | NaType | None = None,
@ -553,7 +566,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
If the user do not have read privilege to the process' status file.
"""
try:
return super().__getattr__(name)
return super().__getattr__(name) # type: ignore[misc]
except AttributeError:
if name == '_cache':
raise
@ -627,7 +640,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
memory_total = self.device.memory_total()
gpu_memory_percent = NA
if libnvml.nvmlCheckReturn(memory_used, int) and libnvml.nvmlCheckReturn(memory_total, int):
gpu_memory_percent = round(100.0 * memory_used / memory_total, 1)
gpu_memory_percent = round(100.0 * memory_used / memory_total, 1) # type: ignore[assignment]
self._gpu_memory_percent = gpu_memory_percent
def set_gpu_utilization(
@ -651,7 +664,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
def update_gpu_status(self) -> int | NaType:
"""Update the GPU consumption status from a new NVML query."""
self.set_gpu_memory(NA)
self.set_gpu_utilization(NA, NA, NA, NA)
self.set_gpu_utilization(NA, NA, NA, NA) # type: ignore[arg-type]
self.device.processes()
return self.gpu_memory()
@ -989,8 +1002,10 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
If *failsafe* is :data:`True`, then if any method fails, the fallback value in
:func:`auto_garbage_clean` will be used.
"""
cache = {}
context = cls.failsafe if failsafe else contextlib.nullcontext
cache: dict[int, Snapshot] = {}
context: Callable[[], contextlib.AbstractContextManager[None]] = (
cls.failsafe if failsafe else contextlib.nullcontext # type: ignore[assignment]
)
with context():
return [
process.as_snapshot(host_process_snapshot_cache=cache) for process in gpu_processes
@ -998,7 +1013,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi
@classmethod
@contextlib.contextmanager
def failsafe(cls) -> contextlib.AbstractContextManager:
def failsafe(cls) -> Generator[None, None, None]:
"""A context manager that enables fallback values for methods that fail.
Examples:

View file

@ -27,7 +27,7 @@ import os
import re
import sys
import time
from typing import Any, Callable, Iterable
from typing import Any, Callable, Generator, Iterable, Iterator
from psutil import WINDOWS
@ -70,12 +70,15 @@ except ImportError:
text: str,
color: str | None = None,
on_color: str | None = None,
attrs: Iterable[str] = None,
attrs: Iterable[str] | None = None,
*,
no_color: bool | None = None,
force_color: bool | None = None,
) -> str:
return text
COLOR = sys.stdout.isatty()
COLOR: bool = sys.stdout.isatty()
def set_color(value: bool) -> None:
@ -94,7 +97,7 @@ def colored(
text: str,
color: str | None = None,
on_color: str | None = None,
attrs: Iterable[str] = None,
attrs: Iterable[str] | None = None,
) -> str:
"""Colorize text with ANSI color escape codes.
@ -178,7 +181,7 @@ class NaType(str):
"""
return math.nan
def __add__(self, other: object) -> str | float:
def __add__(self, other: object) -> str | float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number or uses string concatenation if the operand is a string (``NA + other``).
A special case is when the operand is :const:`nvitop.NA` itself, the result is
@ -193,9 +196,11 @@ class NaType(str):
>>> NA + 1.0
nan
""" # pylint: disable=line-too-long
if isinstance(other, (int, float)) or other is NA:
if isinstance(other, (int, float)):
return float(self) + other
return super().__add__(other)
if other is NA:
return float(self)
return super().__add__(other) # type: ignore[operator]
def __radd__(self, other: object) -> str | float:
"""Return :data:`math.nan` if the operand is a number or uses string concatenation if the operand is a string (``other + NA``).
@ -223,8 +228,10 @@ class NaType(str):
>>> NA + 1.0
nan
"""
if isinstance(other, (int, float)) or other is NA:
if isinstance(other, (int, float)):
return float(self) - other
if other is NA:
return float(self)
return NotImplemented
def __rsub__(self, other: object) -> float:
@ -241,7 +248,7 @@ class NaType(str):
return other - float(self)
return NotImplemented
def __mul__(self, other: object) -> float:
def __mul__(self, other: object) -> float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number (``NA * other``).
A special case is when the operand is :const:`nvitop.NA` itself, the result is also :data:`math.nan`.
@ -253,11 +260,13 @@ class NaType(str):
>>> NA * NA
nan
"""
if isinstance(other, (int, float)) or other is NA:
if isinstance(other, (int, float)):
return float(self) * other
if other is NA:
return float(self)
return NotImplemented
def __rmul__(self, other: object) -> float:
def __rmul__(self, other: object) -> float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number (``other * NA``).
>>> 1024 * NA
@ -280,9 +289,13 @@ class NaType(str):
ZeroDivisionError: float division by zero
>>> NA / 0.0
ZeroDivisionError: float division by zero
>>> NA / NA
nan
"""
if isinstance(other, (int, float)):
return float(self) / other
if other is NA:
return float(self)
return NotImplemented
def __rtruediv__(self, other: object) -> float:
@ -308,9 +321,13 @@ class NaType(str):
ZeroDivisionError: float division by zero
>>> NA / 0.0
ZeroDivisionError: float division by zero
>>> NA // NA
nan
"""
if isinstance(other, (int, float)):
return float(self) // other
if other is NA:
return float(self)
return NotImplemented
def __rfloordiv__(self, other: object) -> float:
@ -325,7 +342,7 @@ class NaType(str):
return other // float(self)
return NotImplemented
def __mod__(self, other: object) -> float:
def __mod__(self, other: object) -> float: # type: ignore[override]
"""Return :data:`math.nan` if the operand is a number (``NA % other``).
>>> NA % 1024
@ -339,6 +356,8 @@ class NaType(str):
"""
if isinstance(other, (int, float)):
return float(self) % other
if other is NA:
return float(self)
return NotImplemented
def __rmod__(self, other: object) -> float:
@ -421,25 +440,25 @@ class NaType(str):
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return False
return super().__lt__(x)
return super().__lt__(x) # type: ignore[operator]
def __le__(self, x: object) -> bool:
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return False
return super().__le__(x)
return super().__le__(x) # type: ignore[operator]
def __gt__(self, x: object) -> bool:
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return True
return super().__gt__(x)
return super().__gt__(x) # type: ignore[operator]
def __ge__(self, x: object) -> bool:
"""The :const:`nvitop.NA` is always greater than any number, or uses the dictionary order for string."""
if isinstance(x, (int, float)):
return True
return super().__ge__(x)
return super().__ge__(x) # type: ignore[operator]
def __format__(self, format_spec: str) -> str:
"""Format :const:`nvitop.NA` according to ``format_spec``."""
@ -459,22 +478,22 @@ NA.__doc__ = """The singleton instance of :class:`NaType`. The actual value is :
NotApplicable = NA
KiB = 1 << 10
KiB: int = 1 << 10
"""Kibibyte (1024)"""
MiB = 1 << 20
MiB: int = 1 << 20
"""Mebibyte (1024 * 1024)"""
GiB = 1 << 30
GiB: int = 1 << 30
"""Gibibyte (1024 * 1024 * 1024)"""
TiB = 1 << 40
TiB: int = 1 << 40
"""Tebibyte (1024 * 1024 * 1024 * 1024)"""
PiB = 1 << 50
PiB: int = 1 << 50
"""Pebibyte (1024 * 1024 * 1024 * 1024 * 1024)"""
SIZE_UNITS = {
SIZE_UNITS: dict[str | None, int] = {
None: 1,
'': 1,
'B': 1,
@ -487,10 +506,10 @@ SIZE_UNITS = {
'MB': 1000**2,
'GB': 1000**3,
'TB': 1000**4,
'PB': 1000**4,
'PB': 1000**5,
}
"""Units of storage and memory measurements."""
SIZE_PATTERN = re.compile(
SIZE_PATTERN: re.Pattern = re.compile(
r'^\s*\+?\s*(?P<size>\d+(?:\.\d+)?)\s*(?P<unit>[KMGTP]i?B?|B?)\s*$',
flags=re.IGNORECASE,
)
@ -651,7 +670,7 @@ class Snapshot:
If the attribute is not defined, fetches from the original object and makes a function call.
"""
try:
return super().__getattr__(name)
return super().__getattr__(name) # type: ignore[misc]
except AttributeError:
attribute = getattr(self.real, name)
if callable(attribute):
@ -671,17 +690,15 @@ class Snapshot:
"""Support ``snapshot['name'] = value`` syntax."""
setattr(self, name, value)
def __iter__(self) -> Iterable[str]:
def __iter__(self) -> Iterator[str]:
"""Support ``for name in snapshot`` syntax and ``*`` tuple unpack ``[*snapshot]`` syntax."""
def gen() -> str:
for name in self.__dict__:
if name not in ('real', 'timestamp'):
yield name
def gen() -> Generator[str, None, None]:
yield from (name for name in self.__dict__ if name not in ('real', 'timestamp'))
return gen()
def keys(self) -> Iterable[str]:
def keys(self) -> Iterator[str]:
# pylint: disable-next=line-too-long
"""Support ``**`` dictionary unpack ``{**snapshot}`` / ``dict(**snapshot)`` syntax and ``dict(snapshot)`` dictionary conversion."""
return iter(self)
@ -730,6 +747,6 @@ def memoize_when_activated(method: Callable[[Any], Any]) -> Callable[[Any], Any]
except AttributeError:
pass
wrapped.cache_activate = cache_activate
wrapped.cache_deactivate = cache_deactivate
wrapped.cache_activate = cache_activate # type: ignore[attr-defined]
wrapped.cache_deactivate = cache_deactivate # type: ignore[attr-defined]
return wrapped

View file

@ -57,8 +57,7 @@ class GpuStatsLogger(Callback): # pylint: disable=too-many-instance-attributes
ValueError:
If NVIDIA driver is not installed, or the `gpus` argument does not match available devices.
Example::
Examples:
>>> from tensorflow.python.keras.utils.multi_gpu_utils import multi_gpu_model
>>> from tensorflow.python.keras.callbacks import TensorBoard
>>> from nvitop.callbacks.keras import GpuStatsLogger

View file

@ -58,8 +58,7 @@ class GpuStatsLogger(Callback): # pylint: disable=too-many-instance-attributes
MisconfigurationException:
If NVIDIA driver is not installed, not running on GPUs, or ``Trainer`` has no logger.
Example::
Examples:
>>> from pytorch_lightning import Trainer
>>> from nvitop.callbacks.pytorch_lightning import GpuStatsLogger
>>> gpu_stats = GpuStatsLogger()

View file

@ -237,29 +237,31 @@ def parse_arguments() -> argparse.Namespace:
args.user.append(USERNAME)
if args.gpu_util_thresh is None:
try:
gpu_util_thresh = os.getenv('NVITOP_GPU_UTILIZATION_THRESHOLDS', None)
gpu_util_thresh = list(map(int, gpu_util_thresh.split(',')))[:2]
gpu_util_thresh = list(
map(int, os.getenv('NVITOP_GPU_UTILIZATION_THRESHOLDS', '').split(',')),
)[:2]
if (
len(gpu_util_thresh) != 2
or min(gpu_util_thresh) <= 0
or max(gpu_util_thresh) >= 100
):
raise ValueError
except (ValueError, AttributeError):
except ValueError:
pass
else:
args.gpu_util_thresh = gpu_util_thresh
if args.mem_util_thresh is None:
try:
mem_util_thresh = os.getenv('NVITOP_MEMORY_UTILIZATION_THRESHOLDS', None)
mem_util_thresh = list(map(int, mem_util_thresh.split(',')))[:2]
mem_util_thresh = list(
map(int, os.getenv('NVITOP_MEMORY_UTILIZATION_THRESHOLDS', '').split(',')),
)[:2]
if (
len(mem_util_thresh) != 2
or min(mem_util_thresh) <= 0
or max(mem_util_thresh) >= 100
):
raise ValueError
except (ValueError, AttributeError):
except ValueError:
pass
else:
args.mem_util_thresh = mem_util_thresh
@ -268,7 +270,7 @@ def parse_arguments() -> argparse.Namespace:
# pylint: disable-next=too-many-branches,too-many-statements,too-many-locals
def main() -> None:
def main() -> int:
"""Main function for ``nvitop`` CLI."""
args = parse_arguments()
@ -307,9 +309,9 @@ def main() -> None:
return 1
if args.gpu_util_thresh is not None:
Device.GPU_UTILIZATION_THRESHOLDS = tuple(sorted(args.gpu_util_thresh))
Device.GPU_UTILIZATION_THRESHOLDS = tuple(sorted(args.gpu_util_thresh)) # type: ignore[assignment]
if args.mem_util_thresh is not None:
Device.MEMORY_UTILIZATION_THRESHOLDS = tuple(sorted(args.mem_util_thresh))
Device.MEMORY_UTILIZATION_THRESHOLDS = tuple(sorted(args.mem_util_thresh)) # type: ignore[assignment]
if args.only is not None:
indices = set(args.only)
@ -325,8 +327,8 @@ def main() -> None:
for index in Device.parse_cuda_visible_devices()
}
else:
indices = range(device_count)
devices = Device.from_indices(sorted(set(indices)))
indices = set(range(device_count))
devices = Device.from_indices(sorted(indices))
filters = []
if args.compute:
@ -366,12 +368,17 @@ def main() -> None:
ui = UI(devices, filters, ascii=args.ascii)
if not sys.stdout.isatty():
parent = HostProcess().parent()
grandparent = parent.parent() if parent is not None else None
if grandparent is not None and parent.name() == 'sh' and grandparent.name() == 'watch':
messages.append(
'HINT: You are running `nvitop` under `watch` command. '
'Please try `nvitop -m` directly.',
)
if parent is not None:
grandparent = parent.parent()
if (
grandparent is not None
and parent.name() == 'sh'
and grandparent.name() == 'watch'
):
messages.append(
'HINT: You are running `nvitop` under `watch` command. '
'Please try `nvitop -m` directly.',
)
ui.print()
ui.destroy()
@ -383,7 +390,7 @@ def main() -> None:
else 'ERROR: A FunctionNotFound error occurred while calling:',
]
unknown_function_messages.extend(
f' nvmlQuery({func.__name__!r}, *args, **kwargs)'
f' nvmlQuery({(func.__name__ if not isinstance(func, str) else func)!r}, *args, **kwargs)'
for func, _ in libnvml.UNKNOWN_FUNCTIONS.values()
)
unknown_function_messages.append(

View file

@ -8,7 +8,7 @@ from cachetools.func import ttl_cache
from nvitop.api import NA
from nvitop.api import MigDevice as MigDeviceBase
from nvitop.api import PhysicalDevice as DeviceBase
from nvitop.api import Snapshot, libnvml, utilization2string
from nvitop.api import libnvml, utilization2string
from nvitop.gui.library.process import GpuProcess
@ -75,7 +75,7 @@ class Device(DeviceBase):
return self._snapshot
@property
def snapshot(self) -> Snapshot:
def snapshot(self):
if self._snapshot is None:
self.as_snapshot()
return self._snapshot

View file

@ -62,12 +62,16 @@ import math
import os
import sys
import warnings
from typing import Any, Iterable
from typing import TYPE_CHECKING, Any, Callable, Iterable, Sequence, overload
from nvitop.api import Device, GpuProcess, colored, human2bytes, libnvml
from nvitop.api import Device, GpuProcess, Snapshot, colored, human2bytes, libnvml
from nvitop.version import __version__
if TYPE_CHECKING:
from typing_extensions import Literal # Python 3.8+
__all__ = ['select_devices']
try:
@ -78,10 +82,70 @@ except ModuleNotFoundError:
TTY = sys.stdout.isatty()
@overload
def select_devices(
devices: Iterable[Device] | None,
*,
format: Literal['index'], # pylint: disable=redefined-builtin
force_index: bool,
min_count: int,
max_count: int | None,
min_free_memory: int | str | None,
min_total_memory: int | str | None,
max_gpu_utilization: int | None,
max_memory_utilization: int | None,
tolerance: int,
free_accounts: list[str] | None,
sort: bool,
**kwargs: Any,
) -> list[int] | list[tuple[int, int]]:
...
@overload
def select_devices(
devices: Iterable[Device] | None,
*,
format: Literal['uuid'], # pylint: disable=redefined-builtin
force_index: bool,
min_count: int,
max_count: int | None,
min_free_memory: int | str | None,
min_total_memory: int | str | None,
max_gpu_utilization: int | None,
max_memory_utilization: int | None,
tolerance: int,
free_accounts: list[str] | None,
sort: bool,
**kwargs: Any,
) -> list[int] | list[tuple[int, int]]:
...
@overload
def select_devices(
devices: Iterable[Device] | None,
*,
format: Literal['device'], # pylint: disable=redefined-builtin
force_index: bool,
min_count: int,
max_count: int | None,
min_free_memory: int | str | None,
min_total_memory: int | str | None,
max_gpu_utilization: int | None,
max_memory_utilization: int | None,
tolerance: int,
free_accounts: list[str] | None,
sort: bool,
**kwargs: Any,
) -> list[Device]:
...
def select_devices( # pylint: disable=too-many-branches,too-many-statements,too-many-locals,unused-argument
devices: Iterable[Device] | None = None,
*,
format: str = 'index', # pylint: disable=redefined-builtin
format: Literal['index', 'uuid', 'device'] = 'index', # pylint: disable=redefined-builtin
force_index: bool = False,
min_count: int = 0,
max_count: int | None = None,
@ -90,10 +154,10 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too
max_gpu_utilization: int | None = None, # in percentage
max_memory_utilization: int | None = None, # in percentage
tolerance: int = 0, # in percentage
free_accounts: list[str] = None,
free_accounts: list[str] | None = None,
sort: bool = True,
**kwargs: Any,
) -> list[int] | list[tuple[int, int]] | list[str]:
) -> list[int] | list[tuple[int, int]] | list[str] | list[Device]:
"""Select a subset of devices satisfying the specified criteria.
Note:
@ -151,7 +215,7 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too
if max_count is not None:
if max_count == 0:
return []
return [] # type: ignore[return-value]
assert max_count >= min_count >= 0
free_accounts = set(free_accounts or [])
@ -164,11 +228,11 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too
if isinstance(min_total_memory, str):
min_total_memory = human2bytes(min_total_memory)
available_devices = []
available_devices: list[Snapshot] = []
for device in devices:
available_devices.extend(dev.as_snapshot() for dev in device.to_leaf_devices())
for device in available_devices:
device.loosen_constraints = 0
device.loosen_constraints = 0 # type: ignore[attr-defined]
if len(free_accounts) > 0:
with GpuProcess.failsafe():
@ -177,61 +241,53 @@ def select_devices( # pylint: disable=too-many-branches,too-many-statements,too
for process in device.real.processes().values():
if process.username() in free_accounts:
as_free_memory += process.gpu_memory()
device.memory_free += as_free_memory
device.memory_used -= as_free_memory
device.memory_free += as_free_memory # type: ignore[attr-defined]
device.memory_used -= as_free_memory # type: ignore[attr-defined]
def filter_func(
criteria: Callable[[Snapshot], bool],
original_criteria: Callable[[Snapshot], bool],
) -> Callable[[Snapshot], bool]:
def wrapped(device: Snapshot) -> bool:
device.loosen_constraints += int(not original_criteria(device)) # type: ignore[attr-defined]
return criteria(device)
return wrapped
if min_free_memory is not None:
loosen_min_free_memory = min_free_memory * (1.0 - tolerance)
available_devices = filter(
lambda device: (
device.memory_free >= loosen_min_free_memory,
setattr(
device,
'loosen_constraints',
device.loosen_constraints + int(not device.memory_free >= min_free_memory),
),
)[0],
available_devices = filter( # type: ignore[assignment]
filter_func(
lambda device: device.memory_free >= loosen_min_free_memory,
lambda device: device.memory_free >= min_free_memory,
),
available_devices,
)
if min_total_memory is not None:
loosen_min_total_memory = min_total_memory * (1.0 - tolerance)
available_devices = filter(
lambda device: (
device.memory_total >= loosen_min_total_memory,
setattr(
device,
'loosen_constraints',
device.loosen_constraints + int(not device.memory_total >= min_total_memory),
),
)[0],
available_devices = filter( # type: ignore[assignment]
filter_func(
lambda device: device.memory_total >= loosen_min_total_memory,
lambda device: device.memory_total >= min_total_memory,
),
available_devices,
)
if max_gpu_utilization is not None:
loosen_max_gpu_utilization = max_gpu_utilization + 100.0 * tolerance
available_devices = filter(
lambda device: (
device.gpu_utilization <= loosen_max_gpu_utilization,
setattr(
device,
'loosen_constraints',
device.loosen_constraints
+ int(not device.gpu_utilization <= max_gpu_utilization),
),
)[0],
available_devices = filter( # type: ignore[assignment]
filter_func(
lambda device: device.gpu_utilization <= loosen_max_gpu_utilization,
lambda device: device.gpu_utilization <= max_gpu_utilization,
),
available_devices,
)
if max_memory_utilization is not None:
loosen_max_memory_utilization = max_memory_utilization + 100.0 * tolerance
available_devices = filter(
lambda device: (
device.memory_utilization <= loosen_max_memory_utilization,
setattr(
device,
'loosen_constraints',
device.loosen_constraints
+ int(not device.memory_utilization <= max_memory_utilization),
),
)[0],
available_devices = filter( # type: ignore[assignment]
filter_func(
lambda device: device.memory_utilization <= loosen_max_memory_utilization,
lambda device: device.memory_utilization <= max_memory_utilization,
),
available_devices,
)
@ -486,10 +542,11 @@ def parse_arguments() -> argparse.Namespace:
return args
def main() -> None:
def main() -> int:
"""Main function for ``nvisel`` CLI."""
args = parse_arguments()
devices: Sequence[Device]
try:
if hasattr(args, 'inherit'):
if args.inherit is not None:

View file

@ -82,6 +82,31 @@ line_length = 100
lines_after_imports = 2
multi_line_output = 3
[tool.mypy]
# Sync with requires-python
python_version = 3.8 # appease mypy for syntax errors in numpy stubs
pretty = true
show_error_codes = true
show_error_context = true
show_traceback = true
allow_redefinition = true
check_untyped_defs = true
disallow_incomplete_defs = false
disallow_untyped_defs = false
ignore_missing_imports = true
no_implicit_optional = true
strict_equality = true
strict_optional = true
warn_no_return = true
warn_redundant_casts = true
warn_unreachable = true
warn_unused_configs = true
warn_unused_ignores = true
[[tool.mypy.overrides]]
module = ['nvitop.callbacks.*', 'nvitop.gui.*']
ignore_errors= true
[tool.pydocstyle]
convention = "google"
match-dir = '^(?!(gui|callbacks|docs))[^\.].*'
@ -90,6 +115,7 @@ match-dir = '^(?!(gui|callbacks|docs))[^\.].*'
ignore-words = "docs/source/spelling_wordlist.txt"
[tool.ruff]
# Sync with requires-python
target-version = "py37"
line-length = 100
show-source = true
@ -145,7 +171,6 @@ ignore = [
]
"nvitop/api/lib*.py" = [
"N", # pep8-naming
"ANN", # flake8-annotations
]
"nvitop/callbacks/*.py" = [
"ANN", # flake8-annotations

View file

@ -51,6 +51,8 @@ try:
'black >= 22.6.0',
'isort',
'pylint[spelling] >= 2.16.0',
'mypy',
'typing-extensions',
'pre-commit',
],
'cuda10': ['nvidia-ml-py == 11.450.51'],