diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8a40fb1..d006f4c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -25,7 +25,7 @@ repos: - id: debug-statements - id: double-quote-string-fixer - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.0.280 + rev: v0.0.284 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] @@ -38,12 +38,12 @@ repos: hooks: - id: black - repo: https://github.com/asottile/pyupgrade - rev: v3.9.0 + rev: v3.10.1 hooks: - id: pyupgrade args: [--py37-plus] # sync with requires-python - repo: https://github.com/pycqa/flake8 - rev: 6.0.0 + rev: 6.1.0 hooks: - id: flake8 additional_dependencies: diff --git a/.pylintrc b/.pylintrc index 8b7365a..ee0583f 100644 --- a/.pylintrc +++ b/.pylintrc @@ -190,7 +190,9 @@ good-names=i, fg, bg, n, - ui + ui, + tx, + rx # Good variable names regexes, separated by a comma. If names match any regex, # they will always be accepted diff --git a/CHANGELOG.md b/CHANGELOG.md index 4ba22fa..61cf1ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added -- +- Add device APIs to query PCIe and NVLink throughput by [@XuehaiPan](https://github.com/XuehaiPan) in [#87](https://github.com/XuehaiPan/nvitop/pull/87). ### Changed diff --git a/README.md b/README.md index 3bc91b0..93db153 100644 --- a/README.md +++ b/README.md @@ -1095,6 +1095,11 @@ Out[16]: PhysicalDeviceSnapshot( memory_utilization=7, # in percentage (NOTE: this is the utilization rate of GPU memory bandwidth) mig_mode='N/A', name='GeForce RTX 2080 Ti', + pcie_rx_throughput=1000, # in KiB/s + pcie_rx_throughput_human='1000KiB/s', + pcie_throughput=ThroughputInfo(tx=1000, rx=1000), # in KiB/s + pcie_tx_throughput=1000, # in KiB/s + pcie_tx_throughput_human='1000KiB/s', performance_state='P2', persistence_mode='Disabled', power_limit=250000, # in milliwatts (mW) diff --git a/docs/source/spelling_wordlist.txt b/docs/source/spelling_wordlist.txt index 86a820f..106df4f 100644 --- a/docs/source/spelling_wordlist.txt +++ b/docs/source/spelling_wordlist.txt @@ -146,3 +146,8 @@ MPS KMD conf Unallocated +KiB +tx +rx +ThroughputInfo +pytorch diff --git a/nvitop/api/collector.py b/nvitop/api/collector.py index 2180164..227c781 100644 --- a/nvitop/api/collector.py +++ b/nvitop/api/collector.py @@ -396,7 +396,7 @@ class ResourceMetricCollector: # pylint: disable=too-many-instance-attributes self, devices: Iterable[Device] | None = None, root_pids: Iterable[int] | None = None, - interval: int | float = 1.0, + interval: float = 1.0, ) -> None: """Initialize the resource metric collector.""" if isinstance(interval, (int, float)) and interval > 0: diff --git a/nvitop/api/device.py b/nvitop/api/device.py index 5d3b92a..bcf0b6f 100644 --- a/nvitop/api/device.py +++ b/nvitop/api/device.py @@ -142,6 +142,7 @@ from nvitop.api.utils import ( if TYPE_CHECKING: from typing_extensions import Literal # Python 3.8+ + from typing_extensions import Self # Python 3.11+ __all__ = [ @@ -182,6 +183,21 @@ class UtilizationRates(NamedTuple): # in percentage # pylint: disable=missing-c decoder: int | NaType +class ThroughputInfo(NamedTuple): # in KiB/s # pylint: disable=missing-class-docstring + tx: int | NaType + rx: int | NaType + + @property + def transmit(self) -> int | NaType: + """Alias of :attr:`tx`.""" + return self.tx + + @property + def receive(self) -> int | NaType: + """Alias of :attr:`rx`.""" + return self.rx + + _VALUE_OMITTED: str = object() # type: ignore[assignment] @@ -547,7 +563,7 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me *, uuid: str | None = None, bus_id: str | None = None, - ) -> Device: + ) -> Self: """Create a new instance of Device. The type of the result is determined by the given argument. @@ -577,8 +593,10 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me ) if cls is not Device: + # Use the subclass type if the type is explicitly specified return super().__new__(cls) + # Auto subclass type inference logic goes here when `cls` is `Device` (e.g., calls `Device(...)`) match: re.Match | None = None if isinstance(index, str): match = cls.UUID_PATTERN.match(index) @@ -601,10 +619,10 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me f'index for MIG device must be a tuple of two integers ' f'but index = {index!r} was given', ) - return super().__new__(MigDevice) + return super().__new__(MigDevice) # type: ignore[return-value] elif uuid is not None and match is not None and match.group('MigMode') is not None: - return super().__new__(MigDevice) - return super().__new__(PhysicalDevice) + return super().__new__(MigDevice) # type: ignore[return-value] + return super().__new__(PhysicalDevice) # type: ignore[return-value] def __init__( self, @@ -640,6 +658,8 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me self._bus_id: str = NA self._memory_total: int | NaType = NA self._memory_total_human: str = NA + self._nvlink_link_count: int | None = None + self._nvlink_throughput_counters: tuple[tuple[int | NaType, int]] | None = None self._is_mig_device: bool | None = None self._cuda_index: int | None = None self._cuda_compute_capability: tuple[int, int] | NaType | None = None @@ -1411,11 +1431,410 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me power_usage = self.power_usage() power_limit = self.power_limit() if libnvml.nvmlCheckReturn(power_usage, int): - power_usage = f'{round(power_usage / 1000.0)}W' + power_usage = f'{round(power_usage / 1000)}W' # type: ignore[assignment] if libnvml.nvmlCheckReturn(power_limit, int): - power_limit = f'{round(power_limit / 1000.0)}W' + power_limit = f'{round(power_limit / 1000)}W' # type: ignore[assignment] return f'{power_usage} / {power_limit}' + def pcie_throughput(self) -> ThroughputInfo: # in KiB/s + """The current PCIe throughput in KiB/s. + + This function is querying a byte counter over a 20ms interval and thus is the PCIe + throughput over that interval. + + Returns: ThroughputInfo(tx, rx) + A named tuple with current PCIe throughput in KiB/s, the item could be + :const:`nvitop.NA` when not applicable. + """ + return ThroughputInfo(tx=self.pcie_tx_throughput(), rx=self.pcie_rx_throughput()) + + @memoize_when_activated + def pcie_tx_throughput(self) -> int | NaType: # in KiB/s + """The current PCIe transmit throughput in KiB/s. + + This function is querying a byte counter over a 20ms interval and thus is the PCIe + throughput over that interval. + + Returns: Union[int, NaType] + The current PCIe transmit throughput in KiB/s, or :const:`nvitop.NA` when not applicable. + """ + return libnvml.nvmlQuery( + 'nvmlDeviceGetPcieThroughput', + self.handle, + libnvml.NVML_PCIE_UTIL_RX_BYTES, + ) + + @memoize_when_activated + def pcie_rx_throughput(self) -> int | NaType: # in KiB/s + """The current PCIe receive throughput in KiB/s. + + This function is querying a byte counter over a 20ms interval and thus is the PCIe + throughput over that interval. + + Returns: Union[int, NaType] + The current PCIe receive throughput in KiB/s, or :const:`nvitop.NA` when not applicable. + """ + return libnvml.nvmlQuery( + 'nvmlDeviceGetPcieThroughput', + self.handle, + libnvml.NVML_PCIE_UTIL_RX_BYTES, + ) + + def pcie_tx_throughput_human(self) -> str | NaType: # in human readable + """The current PCIe transmit throughput in human readable format. + + This function is querying a byte counter over a 20ms interval and thus is the PCIe + throughput over that interval. + + Returns: Union[str, NaType] + The current PCIe transmit throughput in human readable format, or :const:`nvitop.NA` + when not applicable. + """ + tx = self.pcie_tx_throughput() + if libnvml.nvmlCheckReturn(tx, int): + return f'{bytes2human(tx * 1024)}/s' + return NA + + def pcie_rx_throughput_human(self) -> str | NaType: # in human readable + """The current PCIe receive throughput in human readable format. + + This function is querying a byte counter over a 20ms interval and thus is the PCIe + throughput over that interval. + + Returns: Union[str, NaType] + The current PCIe receive throughput in human readable format, or :const:`nvitop.NA` when + not applicable. + """ + rx = self.pcie_rx_throughput() + if libnvml.nvmlCheckReturn(rx, int): + return f'{bytes2human(rx * 1024)}/s' + return NA + + def nvlink_link_count(self) -> int: + """The number of NVLinks that the GPU has. + + Returns: Union[int, NaType] + The number of NVLinks that the GPU has. + """ + if self._nvlink_link_count is None: + ((nvlink_link_count, _),) = libnvml.nvmlQueryFieldValues( + self.handle, + [libnvml.NVML_FI_DEV_NVLINK_LINK_COUNT], + ) + if libnvml.nvmlCheckReturn(nvlink_link_count, int): + self._nvlink_link_count = nvlink_link_count # type: ignore[assignment] + else: + self._nvlink_link_count = 0 + return self._nvlink_link_count # type: ignore[return-value] + + @memoize_when_activated + def nvlink_throughput( + self, + interval: float | None = None, + ) -> list[ThroughputInfo]: # in KiB/s + """The current NVLink throughput for each NVLink in KiB/s. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: List[ThroughputInfo(tx, rx)] + A list of named tuples with current NVLink throughput for each NVLink in KiB/s, the item + could be :const:`nvitop.NA` when not applicable. + """ + nvlink_link_count = self.nvlink_link_count() + if nvlink_link_count == 0: + return [] + + def query_nvlink_throughput_counters() -> tuple[tuple[int | NaType, int]]: + return tuple( # type: ignore[return-value] + libnvml.nvmlQueryFieldValues( + self.handle, + [ # type: ignore[arg-type] + (libnvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_TX, i) + for i in range(nvlink_link_count) + ] + + [ + (libnvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_RX, i) + for i in range(nvlink_link_count) + ], + ), + ) + + if interval is not None: + if not interval >= 0.0: + raise ValueError('`interval` must be a non-negative number, got {interval!r}.') + if interval > 0.0: + self._nvlink_throughput_counters = query_nvlink_throughput_counters() + time.sleep(interval) + + if self._nvlink_throughput_counters is None: + self._nvlink_throughput_counters = query_nvlink_throughput_counters() + time.sleep(0.02) # 20ms + + old_throughput_counters = self._nvlink_throughput_counters + new_throughput_counters = query_nvlink_throughput_counters() + + throughputs: list[int | NaType] = [] + for (old_counter, old_timestamp), (new_counter, new_timestamp) in zip( + old_throughput_counters, + new_throughput_counters, + ): + if ( + libnvml.nvmlCheckReturn(old_counter, int) + and libnvml.nvmlCheckReturn(new_counter, int) + and new_timestamp > old_timestamp + ): + throughputs.append( + round( + 1_000_000 * (new_counter - old_counter) / (new_timestamp - old_timestamp), + ), + ) + else: + throughputs.append(NA) + + self._nvlink_throughput_counters = new_throughput_counters + return [ + ThroughputInfo(tx=tx, rx=rx) + for tx, rx in zip(throughputs[:nvlink_link_count], throughputs[nvlink_link_count:]) + ] + + def nvlink_mean_throughput( + self, + interval: float | None = None, + ) -> ThroughputInfo: # in KiB/s + """The mean NVLink throughput for all NVLinks in KiB/s. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: ThroughputInfo(tx, rx) + A named tuple with the mean NVLink throughput for all NVLinks in KiB/s, the item could + be :const:`nvitop.NA` when not applicable. + """ + tx_throughputs = [] + rx_throughputs = [] + for tx, rx in self.nvlink_throughput(interval=interval): + if libnvml.nvmlCheckReturn(tx, int): + tx_throughputs.append(tx) + if libnvml.nvmlCheckReturn(rx, int): + rx_throughputs.append(rx) + return ThroughputInfo( + tx=round(sum(tx_throughputs) / len(tx_throughputs)) if tx_throughputs else NA, + rx=round(sum(rx_throughputs) / len(rx_throughputs)) if rx_throughputs else NA, + ) + + def nvlink_tx_throughput( + self, + interval: float | None = None, + ) -> list[int | NaType]: # in KiB/s + """The current NVLink transmit data throughput in KiB/s for each NVLink. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: List[Union[int, NaType]] + The current NVLink transmit data throughput in KiB/s for each NVLink, or + :const:`nvitop.NA` when not applicable. + """ + return [tx for tx, _ in self.nvlink_throughput(interval=interval)] + + def nvlink_mean_tx_throughput( + self, + interval: float | None = None, + ) -> int | NaType: # in KiB/s + """The mean NVLink transmit data throughput for all NVLinks in KiB/s. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: Union[int, NaType] + The mean NVLink transmit data throughput for all NVLinks in KiB/s, or + :const:`nvitop.NA` when not applicable. + """ + return self.nvlink_mean_throughput(interval=interval).tx + + def nvlink_rx_throughput( + self, + interval: float | None = None, + ) -> list[int | NaType]: # in KiB/s + """The current NVLink receive data throughput for each NVLink in KiB/s. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: Union[int, NaType] + The current NVLink receive data throughput for each NVLink in KiB/s, or + :const:`nvitop.NA` when not applicable. + """ + return [rx for _, rx in self.nvlink_throughput(interval=interval)] + + def nvlink_mean_rx_throughput( + self, + interval: float | None = None, + ) -> int | NaType: # in KiB/s + """The mean NVLink receive data throughput for all NVLinks in KiB/s. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: Union[int, NaType] + The mean NVLink receive data throughput for all NVLinks in KiB/s, or + :const:`nvitop.NA` when not applicable. + """ + return self.nvlink_mean_throughput(interval=interval).rx + + def nvlink_tx_throughput_human( + self, + interval: float | None = None, + ) -> list[str | NaType]: # in human readable + """The current NVLink transmit data throughput for each NVLink in human readable format. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: Union[str, NaType] + The current NVLink transmit data throughput for each NVLink in human readable format, or + :const:`nvitop.NA` when not applicable. + """ + return [ + f'{bytes2human(tx * 1024)}/s' if libnvml.nvmlCheckReturn(tx, int) else NA + for tx in self.nvlink_tx_throughput(interval=interval) + ] + + def nvlink_mean_tx_throughput_human( + self, + interval: float | None = None, + ) -> str | NaType: # in human readable + """The mean NVLink transmit data throughput for all NVLinks in human readable format. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: Union[str, NaType] + The mean NVLink transmit data throughput for all NVLinks in human readable format, or + :const:`nvitop.NA` when not applicable. + """ + mean_tx = self.nvlink_mean_tx_throughput(interval=interval) + if libnvml.nvmlCheckReturn(mean_tx, int): + return f'{bytes2human(mean_tx * 1024)}/s' + return NA + + def nvlink_rx_throughput_human( + self, + interval: float | None = None, + ) -> list[str | NaType]: # in human readable + """The current NVLink receive data throughput for each NVLink in human readable format. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: Union[str, NaType] + The current NVLink receive data throughput for each NVLink in human readable format, or + :const:`nvitop.NA` when not applicable. + """ + return [ + f'{bytes2human(rx * 1024)}/s' if libnvml.nvmlCheckReturn(rx, int) else NA + for rx in self.nvlink_rx_throughput(interval=interval) + ] + + def nvlink_mean_rx_throughput_human( + self, + interval: float | None = None, + ) -> str | NaType: # in human readable + """The mean NVLink receive data throughput for all NVLinks in human readable format. + + This function is querying data counters between methods calls and thus is the NVLink + throughput over that interval. For the first call, the function is blocking for 20ms to get + the first data counters. + + Args: + interval (Optional[float]): + The interval in seconds between two calls to get the NVLink throughput. If + ``interval`` is a positive number, compares throughput counters before and after the + interval (blocking). If ``interval`` is :const`0.0` or :data:`None`, compares + throughput counters since the last call, returning immediately (non-blocking). + + Returns: Union[str, NaType] + The mean NVLink receive data throughput for all NVLinks in human readable format, or + :const:`nvitop.NA` when not applicable. + """ + mean_rx = self.nvlink_mean_rx_throughput(interval=interval) + if libnvml.nvmlCheckReturn(mean_rx, int): + return f'{bytes2human(mean_rx * 1024)}/s' + return NA + def display_active(self) -> str | NaType: """A flag that indicates whether a display is initialized on the GPU's (e.g. memory is allocated on the device for display). @@ -1758,6 +2177,11 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me 'power_usage', 'power_limit', 'power_status', + 'pcie_throughput', + 'pcie_tx_throughput', + 'pcie_rx_throughput', + 'pcie_tx_throughput_human', + 'pcie_rx_throughput_human', 'display_active', 'display_mode', 'current_driver_model', @@ -1974,6 +2398,8 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes self._memory_total_human: str = NA self._gpu_instance_id: int | NaType = NA self._compute_instance_id: int | NaType = NA + self._nvlink_link_count: int | None = None + self._nvlink_throughput_counters: tuple[tuple[int | NaType, int]] | None = None self._is_mig_device: bool = True self._cuda_index: int | None = None self._cuda_compute_capability: tuple[int, int] | NaType | None = None @@ -2230,7 +2656,7 @@ class CudaDevice(Device): *, nvml_index: int | tuple[int, int] | None = None, uuid: str | None = None, - ) -> CudaDevice: + ) -> Self: """Create a new instance of CudaDevice. The type of the result is determined by the given argument. @@ -2267,10 +2693,14 @@ class CudaDevice(Device): raise RuntimeError(f'CUDA Error: invalid device ordinal: {cuda_index!r}.') nvml_index = cuda_visible_devices[cuda_index] + if cls is not CudaDevice: + # Use the subclass type if the type is explicitly specified + return super().__new__(cls, index=nvml_index, uuid=uuid) + + # Auto subclass type inference logic goes here when `cls` is `CudaDevice` (e.g., calls `CudaDevice(...)`) if (nvml_index is not None and not isinstance(nvml_index, int)) or is_mig_device_uuid(uuid): return super().__new__(CudaMigDevice, index=nvml_index, uuid=uuid) # type: ignore[return-value] - - return super().__new__(cls, index=nvml_index, uuid=uuid) # type: ignore[return-value] + return super().__new__(CudaDevice, index=nvml_index, uuid=uuid) # type: ignore[return-value] def __init__( self, diff --git a/nvitop/api/libcuda.py b/nvitop/api/libcuda.py index ba5b9d2..c966916 100644 --- a/nvitop/api/libcuda.py +++ b/nvitop/api/libcuda.py @@ -33,6 +33,7 @@ from typing import ClassVar as _ClassVar if _TYPE_CHECKING: + from typing_extensions import Self as _Self # Python 3.11+ from typing_extensions import TypeAlias as _TypeAlias # Python 3.10+ @@ -41,7 +42,9 @@ class _struct_c_CUdevice_t(_ctypes.Structure): pass # opaque handle -_c_CUdevice_t: _TypeAlias = _ctypes.POINTER(_struct_c_CUdevice_t) # type: ignore[valid-type] +_c_CUdevice_t: _TypeAlias = _ctypes.POINTER( # type: ignore[valid-type] # noqa: PYI042 + _struct_c_CUdevice_t, +) _CUresult_t: _TypeAlias = _ctypes.c_uint @@ -237,11 +240,11 @@ class CUDAError(Exception): _errcode_to_name: _ClassVar[dict[int, str]] = {} value: int - def __new__(cls, value: int) -> CUDAError: + def __new__(cls, value: int) -> _Self: """Map value to a proper subclass of :class:`CUDAError`.""" if cls is CUDAError: # pylint: disable-next=self-cls-assignment - cls = CUDAError._value_class_mapping.get(value, cls) + cls = CUDAError._value_class_mapping.get(value, cls) # type: ignore[assignment] obj = Exception.__new__(cls) obj.value = value return obj diff --git a/nvitop/api/libcudart.py b/nvitop/api/libcudart.py index 995afdc..17ab5d1 100644 --- a/nvitop/api/libcudart.py +++ b/nvitop/api/libcudart.py @@ -26,11 +26,16 @@ import os as _os import platform as _platform import sys as _sys import threading as _threading +from typing import TYPE_CHECKING as _TYPE_CHECKING from typing import Any as _Any from typing import Callable as _Callable from typing import ClassVar as _ClassVar +if _TYPE_CHECKING: + from typing_extensions import Self as _Self # Python 3.11+ + + _cudaError_t = _ctypes.c_int # Error codes # @@ -283,11 +288,11 @@ class cudaError(Exception): _errcode_to_name: _ClassVar[dict[int, str]] = {} value: int - def __new__(cls, value: int) -> cudaError: + def __new__(cls, value: int) -> _Self: """Map value to a proper subclass of :class:`cudaError`.""" if cls is cudaError: # pylint: disable-next=self-cls-assignment - cls = cudaError._value_class_mapping.get(value, cls) + cls = cudaError._value_class_mapping.get(value, cls) # type: ignore[assignment] obj = Exception.__new__(cls) obj.value = value return obj diff --git a/nvitop/api/libnvml.py b/nvitop/api/libnvml.py index a7eff4d..b1618dd 100644 --- a/nvitop/api/libnvml.py +++ b/nvitop/api/libnvml.py @@ -28,6 +28,7 @@ import os as _os import re as _re import sys as _sys import threading as _threading +import time as _time from types import FunctionType as _FunctionType from types import ModuleType as _ModuleType from typing import TYPE_CHECKING as _TYPE_CHECKING @@ -41,11 +42,12 @@ import pynvml as _pynvml from pynvml import * # noqa: F403 # pylint: disable=wildcard-import,unused-wildcard-import from pynvml import nvmlDeviceGetPciInfo # appease mypy # noqa: F401 # pylint: disable=unused-import -from nvitop.api.utils import NA, UINT_MAX, ULONGLONG_MAX +from nvitop.api.utils import NA, UINT_MAX, ULONGLONG_MAX, NaType from nvitop.api.utils import colored as __colored if _TYPE_CHECKING: + from typing_extensions import Self as _Self # Python 3.11+ from typing_extensions import TypeAlias as _TypeAlias # Python 3.10+ @@ -55,6 +57,7 @@ __all__ = [ # will be updated in below 'ULONGLONG_MAX', 'nvmlCheckReturn', 'nvmlQuery', + 'nvmlQueryFieldValues', 'nvmlInit', 'nvmlInitWithFlags', 'nvmlShutdown', @@ -86,22 +89,22 @@ _errcode_to_string = NVMLError._errcode_to_string # pylint: disable=protected-a # 1. Put error classes in `__all__` first for _name, _attr in _vars_pynvml.items(): - if _name in ('nvmlInit', 'nvmlInitWithFlags', 'nvmlShutdown'): + if _name in {'nvmlInit', 'nvmlInitWithFlags', 'nvmlShutdown'}: continue if _name.startswith(('NVML_ERROR_', 'NVMLError_')): - __all__.append(_name) + __all__.append(_name) # noqa: PYI056 if _name.startswith('NVML_ERROR_'): _errcode_to_name[_attr] = _name _const_names.append(_name) # 2. Then the remaining members for _name, _attr in _vars_pynvml.items(): - if _name in ('nvmlInit', 'nvmlInitWithFlags', 'nvmlShutdown'): + if _name in {'nvmlInit', 'nvmlInitWithFlags', 'nvmlShutdown'}: continue if (_name.startswith('NVML_') and not _name.startswith('NVML_ERROR_')) or ( _name.startswith('nvml') and isinstance(_attr, _FunctionType) ): - __all__.append(_name) + __all__.append(_name) # noqa: PYI056 if _name.startswith('NVML_'): _const_names.append(_name) @@ -169,10 +172,10 @@ del ( _sphinx_doc, ) - # 5. Add explicit references to appease linters # pylint: disable=no-member -c_nvmlDevice_t: _TypeAlias = _pynvml.c_nvmlDevice_t +c_nvmlDevice_t: _TypeAlias = _pynvml.c_nvmlDevice_t # noqa: PYI042 +c_nvmlFieldValue_t: _TypeAlias = _pynvml.c_nvmlFieldValue_t # noqa: PYI042 NVML_SUCCESS: int = _pynvml.NVML_SUCCESS NVML_ERROR_INSUFFICIENT_SIZE: int = _pynvml.NVML_ERROR_INSUFFICIENT_SIZE NVMLError_FunctionNotFound: _TypeAlias = _pynvml.NVMLError_FunctionNotFound @@ -196,6 +199,20 @@ NVML_COMPUTEMODE_DEFAULT: int = _pynvml.NVML_COMPUTEMODE_DEFAULT NVML_COMPUTEMODE_EXCLUSIVE_THREAD: int = _pynvml.NVML_COMPUTEMODE_EXCLUSIVE_THREAD NVML_COMPUTEMODE_PROHIBITED: int = _pynvml.NVML_COMPUTEMODE_PROHIBITED NVML_COMPUTEMODE_EXCLUSIVE_PROCESS: int = _pynvml.NVML_COMPUTEMODE_EXCLUSIVE_PROCESS +NVML_PCIE_UTIL_TX_BYTES: int = _pynvml.NVML_PCIE_UTIL_TX_BYTES +NVML_PCIE_UTIL_RX_BYTES: int = _pynvml.NVML_PCIE_UTIL_RX_BYTES +NVML_NVLINK_MAX_LINKS: int = _pynvml.NVML_NVLINK_MAX_LINKS +NVML_FI_DEV_NVLINK_LINK_COUNT: int = _pynvml.NVML_FI_DEV_NVLINK_LINK_COUNT +NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_TX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_TX +NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_RX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_RX +NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_TX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_TX +NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_RX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_RX +NVML_VALUE_TYPE_DOUBLE: int = getattr(_pynvml, 'NVML_VALUE_TYPE_DOUBLE', 0) +NVML_VALUE_TYPE_UNSIGNED_INT: int = getattr(_pynvml, 'NVML_VALUE_TYPE_UNSIGNED_INT', 1) +NVML_VALUE_TYPE_UNSIGNED_LONG: int = getattr(_pynvml, 'NVML_VALUE_TYPE_UNSIGNED_LONG', 2) +NVML_VALUE_TYPE_UNSIGNED_LONG_LONG: int = getattr(_pynvml, 'NVML_VALUE_TYPE_UNSIGNED_LONG_LONG', 3) +NVML_VALUE_TYPE_SIGNED_LONG_LONG: int = getattr(_pynvml, 'NVML_VALUE_TYPE_SIGNED_LONG_LONG', 4) +NVML_VALUE_TYPE_SIGNED_INT: int = getattr(_pynvml, 'NVML_VALUE_TYPE_SIGNED_INT', 5) # pylint: enable=no-member # New members in `libnvml` ######################################################################### @@ -448,6 +465,51 @@ def nvmlQuery( return retval +def nvmlQueryFieldValues( + handle: c_nvmlDevice_t, + field_ids: list[int | tuple[int, int]], +) -> list[tuple[float | int | NaType, int]]: + """Query multiple field values from NVML. + + Request values for a list of fields for a device. This API allows multiple fields to be queried + at once. If any of the underlying fieldIds are populated by the same driver call, the results + for those field IDs will be populated from a single call rather than making a driver call for + each fieldId. + + Raises: + NVMLError_InvalidArgument: + If device or field_ids is invalid. + """ + field_values = nvmlQuery('nvmlDeviceGetFieldValues', handle, field_ids) + + if not nvmlCheckReturn(field_values): + timestamp = _time.time_ns() // 1000 + return [(NA, timestamp) for _ in range(len(field_ids))] + + values_with_timestamps: list[tuple[float | int | NaType, int]] = [] + for field_value in field_values: + timestamp = field_value.timestamp + if field_value.nvmlReturn != NVML_SUCCESS: + value = NA + timestamp = _time.time_ns() // 1000 + elif field_value.valueType == NVML_VALUE_TYPE_DOUBLE: + value = field_value.value.dVal + elif field_value.valueType == NVML_VALUE_TYPE_UNSIGNED_INT: + value = field_value.value.uiVal + elif field_value.valueType == NVML_VALUE_TYPE_UNSIGNED_LONG: + value = field_value.value.ulVal + elif field_value.valueType == NVML_VALUE_TYPE_UNSIGNED_LONG_LONG: + value = field_value.value.ullVal + elif field_value.valueType == NVML_VALUE_TYPE_SIGNED_LONG_LONG: + value = field_value.value.llVal + elif field_value.valueType == NVML_VALUE_TYPE_SIGNED_INT: + value = field_value.value.iVal + else: + value = NA + values_with_timestamps.append((value, timestamp)) + return values_with_timestamps + + def nvmlCheckReturn( retval: _Any, types: type | tuple[type, ...] | None = None, @@ -634,8 +696,6 @@ if not _pynvml_installation_corrupted: using specific MIG device handles. Raises: - NVMLError_InvalidArgument: - If the library has not been successfully initialized. NVMLError_Uninitialized: If NVML was not first initialized with :func:`nvmlInit`. NVMLError_NoPermission: @@ -663,8 +723,6 @@ if not _pynvml_installation_corrupted: using specific MIG device handles. Raises: - NVMLError_InvalidArgument: - If the library has not been successfully initialized. NVMLError_Uninitialized: If NVML was not first initialized with :func:`nvmlInit`. NVMLError_NoPermission: @@ -692,8 +750,6 @@ if not _pynvml_installation_corrupted: using specific MIG device handles. Raises: - NVMLError_InvalidArgument: - If the library has not been successfully initialized. NVMLError_Uninitialized: If NVML was not first initialized with :func:`nvmlInit`. NVMLError_NoPermission: @@ -724,12 +780,12 @@ if not _pynvml_installation_corrupted: class c_nvmlMemory_v1_t(_pynvml._PrintableStructure): # pylint: disable=protected-access _fields_: _ClassVar[list[tuple[str, type]]] = [ # Total physical device memory (in bytes). - ('total', _pynvml.c_ulonglong), + ('total', _ctypes.c_ulonglong), # Unallocated device memory (in bytes). - ('free', _pynvml.c_ulonglong), + ('free', _ctypes.c_ulonglong), # Allocated device memory (in bytes). # Note that the driver/GPU always sets aside a small amount of memory for bookkeeping. - ('used', _pynvml.c_ulonglong), + ('used', _ctypes.c_ulonglong), ] _fmt_: _ClassVar[dict[str, str]] = {'': '%d B'} @@ -737,16 +793,16 @@ if not _pynvml_installation_corrupted: class c_nvmlMemory_v2_t(_pynvml._PrintableStructure): # pylint: disable=protected-access _fields_: _ClassVar[list[tuple[str, type]]] = [ # Structure format version (must be 2). - ('version', _pynvml.c_uint), + ('version', _ctypes.c_uint), # Total physical device memory (in bytes). - ('total', _pynvml.c_ulonglong), + ('total', _ctypes.c_ulonglong), # Device memory (in bytes) reserved for system use (driver or firmware). - ('reserved', _pynvml.c_ulonglong), + ('reserved', _ctypes.c_ulonglong), # Unallocated device memory (in bytes). - ('free', _pynvml.c_ulonglong), + ('free', _ctypes.c_ulonglong), # Allocated device memory (in bytes). # Note that the driver/GPU always sets aside a small amount of memory for bookkeeping. - ('used', _pynvml.c_ulonglong), + ('used', _ctypes.c_ulonglong), ] _fmt_: _ClassVar[dict[str, str]] = {'': '%d B'} @@ -788,8 +844,6 @@ if not _pynvml_installation_corrupted: using specific MIG device handles. Raises: - NVMLError_InvalidArgument: - If the library has not been successfully initialized. NVMLError_Uninitialized: If NVML was not first initialized with :func:`nvmlInit`. NVMLError_NoPermission: @@ -852,12 +906,12 @@ class _CustomModule(_ModuleType): except AttributeError: return getattr(_pynvml, name) - def __enter__(self) -> _CustomModule: + def __enter__(self) -> _Self: """Entry of the context manager for ``with`` statement.""" _lazy_init() return self - def __exit__(self, *args: _Any, **kwargs: _Any) -> None: + def __exit__(self, *exc: object) -> None: """Shutdown the NVML context in the context manager for ``with`` statement.""" try: nvmlShutdown() diff --git a/nvitop/api/process.py b/nvitop/api/process.py index 74fccb0..004660d 100644 --- a/nvitop/api/process.py +++ b/nvitop/api/process.py @@ -43,6 +43,8 @@ from nvitop.api.utils import ( if TYPE_CHECKING: + from typing_extensions import Self # Python 3.11+ + from nvitop.api.device import Device @@ -191,7 +193,7 @@ class HostProcess(host.Process, metaclass=ABCMeta): _ident: tuple _lock: threading.RLock - def __new__(cls, pid: int | None = None) -> HostProcess: + def __new__(cls, pid: int | None = None) -> Self: """Return the cached instance of :class:`HostProcess`.""" if pid is None: pid = os.getpid() @@ -471,7 +473,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi gpu_cc_protected_memory: int | NaType | None = None, type: str | NaType | None = None, # pylint: disable=redefined-builtin # pylint: enable=unused-argument - ) -> GpuProcess: + ) -> Self: """Return the cached instance of :class:`GpuProcess`.""" if pid is None: pid = os.getpid() @@ -480,7 +482,7 @@ class GpuProcess: # pylint: disable=too-many-instance-attributes,too-many-publi try: instance = cls.INSTANCES[(pid, device)] if instance.is_running(): - return instance + return instance # type: ignore[return-value] except KeyError: pass diff --git a/nvitop/api/utils.py b/nvitop/api/utils.py index 3d9463d..8b5b909 100644 --- a/nvitop/api/utils.py +++ b/nvitop/api/utils.py @@ -29,7 +29,7 @@ import re import sys import time from collections.abc import KeysView -from typing import Any, Callable, Generator, Iterable, Iterator +from typing import Any, Callable, Generator, Iterable, Iterator, TypeVar from psutil import WINDOWS @@ -153,7 +153,8 @@ class NaType(str): nan """ - def __new__(cls) -> NaType: + # NOTE: Decorate this class with `@final` and remove `noqa` when we drop Python 3.7 support. + def __new__(cls) -> NaType: # noqa: PYI034 """Get the singleton instance (:const:`nvitop.NA`).""" if not hasattr(cls, '_instance'): cls._instance = super().__new__(cls, 'N/A') @@ -525,9 +526,9 @@ SIZE_PATTERN: re.Pattern = re.compile( """The regex pattern for human readable size.""" -# pylint: disable-next=too-many-return-statements +# pylint: disable-next=too-many-return-statements,too-many-branches def bytes2human( - b: int | float | NaType, + b: int | float | NaType, # noqa: PYI041 *, min_unit: int = 1, ) -> str: @@ -545,7 +546,11 @@ def bytes2human( return f'{b}B' if b < MiB and min_unit <= KiB: return f'{round(b / KiB)}KiB' - if b <= 20 * GiB and min_unit <= MiB: + if b < 100 * MiB and min_unit <= MiB: + return f'{round(b / MiB, 2):.2f}MiB' + if b < 1000 * MiB and min_unit <= MiB: + return f'{round(b / MiB, 1):.1f}MiB' + if b < 20 * GiB and min_unit <= MiB: return f'{round(b / MiB)}MiB' if b < 100 * GiB and min_unit <= GiB: return f'{round(b / GiB, 2):.2f}GiB' @@ -595,7 +600,7 @@ def human2bytes(s: int | str) -> int: def timedelta2human( - dt: int | float | datetime.timedelta | NaType, + dt: int | float | datetime.timedelta | NaType, # noqa: PYI041 *, round: bool = False, # pylint: disable=redefined-builtin ) -> str: @@ -615,7 +620,7 @@ def timedelta2human( return '{:d}:{:02d}'.format(*divmod(seconds, 60)) -def utilization2string(utilization: int | float | NaType) -> str: +def utilization2string(utilization: int | float | NaType) -> str: # noqa: PYI041 """Convert a utilization rate to string.""" if utilization != NA: if isinstance(utilization, int): @@ -713,8 +718,11 @@ class Snapshot: return KeysView(self) # type: ignore[arg-type] +Method = TypeVar('Method', bound=Callable[..., Any]) + + # Modified from psutil (https://github.com/giampaolo/psutil) -def memoize_when_activated(method: Callable[[Any], Any]) -> Callable[[Any], Any]: +def memoize_when_activated(method: Method) -> Method: """A memoize decorator which is disabled by default. It can be activated and deactivated on request. For efficiency reasons it can be used only @@ -722,17 +730,17 @@ def memoize_when_activated(method: Callable[[Any], Any]) -> Callable[[Any], Any] """ @functools.wraps(method) - def wrapped(self): # noqa: ANN001,ANN202 + def wrapped(self, *args, **kwargs): # noqa: ANN001,ANN002,ANN003,ANN202 try: # case 1: we previously entered oneshot() ctx ret = self._cache[method] # pylint: disable=protected-access except AttributeError: # case 2: we never entered oneshot() ctx - return method(self) + return method(self, *args, **kwargs) except KeyError: # case 3: we entered oneshot() ctx but there's no cache # for this entry yet - ret = method(self) + ret = method(self, *args, **kwargs) try: self._cache[method] = ret # pylint: disable=protected-access except AttributeError: @@ -758,4 +766,4 @@ def memoize_when_activated(method: Callable[[Any], Any]) -> Callable[[Any], Any] wrapped.cache_activate = cache_activate # type: ignore[attr-defined] wrapped.cache_deactivate = cache_deactivate # type: ignore[attr-defined] - return wrapped + return wrapped # type: ignore[return-value] diff --git a/nvitop/callbacks/pytorch_lightning.py b/nvitop/callbacks/pytorch_lightning.py index 0c49ce4..8af3f56 100644 --- a/nvitop/callbacks/pytorch_lightning.py +++ b/nvitop/callbacks/pytorch_lightning.py @@ -119,7 +119,11 @@ class GpuStatsLogger(Callback): # pylint: disable=too-many-instance-attributes f'The root device type is {trainer.strategy.root_device.type}.', ) - device_ids = trainer.data_parallel_device_ids + try: + device_ids = trainer.device_ids # pytorch-lightning >= 1.6.0 + except AttributeError: + device_ids = trainer.data_parallel_device_ids # pytorch-lightning < 1.6.0 + try: self._devices = get_devices_by_logical_ids(device_ids, unique=True) except (libnvml.NVMLError, RuntimeError) as ex: diff --git a/nvitop/gui/screens/main/process.py b/nvitop/gui/screens/main/process.py index f07d5e6..e548f9e 100644 --- a/nvitop/gui/screens/main/process.py +++ b/nvitop/gui/screens/main/process.py @@ -6,8 +6,8 @@ import itertools import threading import time -from collections import namedtuple from operator import attrgetter, xor +from typing import Any, Callable, NamedTuple from cachetools.func import ttl_cache @@ -29,7 +29,13 @@ from nvitop.gui.library import ( ) -Order = namedtuple('Order', ['key', 'reverse', 'offset', 'column', 'previous', 'next']) +class Order(NamedTuple): + key: Callable[[Any], Any] + reverse: bool + offset: int + column: str + previous: str + next: str class ProcessPanel(Displayable): # pylint: disable=too-many-instance-attributes