feat(api/device): add methods to query NVLink throughput

This commit is contained in:
Xuehai Pan 2023-08-05 01:37:28 +08:00
parent 2d479deecc
commit 4073d7da75
3 changed files with 249 additions and 9 deletions

View file

@ -149,3 +149,4 @@ Unallocated
KiB
tx
rx
ThroughputInfo

View file

@ -655,6 +655,8 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
self._bus_id: str = NA
self._memory_total: int | NaType = NA
self._memory_total_human: str = NA
self._nvlink_link_count: int | None = None
self._nvlink_throughput_counters: tuple[tuple[int | NaType, int]] | None = None
self._is_mig_device: bool | None = None
self._cuda_index: int | None = None
self._cuda_compute_capability: tuple[int, int] | NaType | None = None
@ -1505,6 +1507,189 @@ class Device: # pylint: disable=too-many-instance-attributes,too-many-public-me
return f'{bytes2human(rx_throughput << 10)}/s'
return NA
def nvlink_link_count(self) -> int:
"""The number of NVLinks that the GPU has.
Returns: Union[int, NaType]
The number of NVLinks that the GPU has.
"""
if self._nvlink_link_count is None:
((nvlink_link_count, _),) = libnvml.nvmlQueryFieldValues(
self.handle,
[libnvml.NVML_FI_DEV_NVLINK_LINK_COUNT],
)
if libnvml.nvmlCheckReturn(nvlink_link_count, int):
self._nvlink_link_count = nvlink_link_count # type: ignore[assignment]
else:
self._nvlink_link_count = 0
return self._nvlink_link_count # type: ignore[return-value]
@memoize_when_activated
def nvlink_throughput(self) -> list[ThroughputInfo]: # in KiB/s
"""The current NVLink throughput for each NVLink in KiB/s.
This function is querying data counters between methods calls and thus is the NVLink
throughput over that interval. For the first call, the function is blocking for 100ms to
get the first data counters.
Returns: List[ThroughputInfo(tx, rx)]
A list of named tuples with current NVLink throughput for each NVLink in KiB/s, the item
could be :const:`nvitop.NA` when not applicable.
"""
nvlink_link_count = self.nvlink_link_count()
def query_nvlink_throughput_counters() -> tuple[tuple[int | NaType, int]]:
return tuple( # type: ignore[return-value]
libnvml.nvmlQueryFieldValues(
self.handle,
[ # type: ignore[arg-type]
(libnvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_TX, i)
for i in range(nvlink_link_count)
]
+ [
(libnvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_RX, i)
for i in range(nvlink_link_count)
],
),
)
if self._nvlink_throughput_counters is None:
self._nvlink_throughput_counters = query_nvlink_throughput_counters()
time.sleep(0.1)
old_throughput_counters = self._nvlink_throughput_counters
new_throughput_counters = query_nvlink_throughput_counters()
throughputs: list[int | NaType] = []
for (old_counter, old_timestamp), (new_counter, new_timestamp) in zip(
old_throughput_counters,
new_throughput_counters,
):
if (
libnvml.nvmlCheckReturn(old_counter, int)
and libnvml.nvmlCheckReturn(new_counter, int)
and new_timestamp > old_timestamp
):
throughputs.append(
round(
1_000_000 * (new_counter - old_counter) / (new_timestamp - old_timestamp),
),
)
else:
throughputs.append(NA)
self._nvlink_throughput_counters = new_throughput_counters
return [
ThroughputInfo(tx=tx, rx=rx)
for tx, rx in zip(throughputs[:nvlink_link_count], throughputs[nvlink_link_count:])
]
def nvlink_mean_throughput(self) -> ThroughputInfo: # in KiB/s
"""The mean NVLink throughput for all NVLinks in KiB/s.
This function is querying data counters between methods calls and thus is the NVLink
throughput over that interval. For the first call, the function is blocking for 100ms to
get the first data counters.
Returns: ThroughputInfo(tx, rx)
A named tuple with the mean NVLink throughput for all NVLinks in KiB/s, the item could
be :const:`nvitop.NA` when not applicable.
"""
tx_throughputs = []
rx_throughputs = []
for tx, rx in self.nvlink_throughput():
if libnvml.nvmlCheckReturn(tx, int):
tx_throughputs.append(tx)
if libnvml.nvmlCheckReturn(rx, int):
rx_throughputs.append(rx)
return ThroughputInfo(
tx=round(sum(tx_throughputs) / len(tx_throughputs)) if tx_throughputs else NA,
rx=round(sum(rx_throughputs) / len(rx_throughputs)) if rx_throughputs else NA,
)
def nvlink_tx_throughput(self) -> list[int | NaType]: # in KiB/s
"""The current NVLink transmit data throughput in KiB/s for each NVLink.
Returns: List[Union[int, NaType]]
The current NVLink transmit data throughput in KiB/s for each NVLink, or
:const:`nvitop.NA` when not applicable.
"""
return [tx for tx, _ in self.nvlink_throughput()]
def nvlink_mean_tx_throughput(self) -> int | NaType: # in KiB/s
"""The mean NVLink transmit data throughput for all NVLinks in KiB/s.
Returns: Union[int, NaType]
The mean NVLink transmit data throughput for all NVLinks in KiB/s, or
:const:`nvitop.NA` when not applicable.
"""
return self.nvlink_mean_throughput().tx
def nvlink_rx_throughput(self) -> list[int | NaType]: # in KiB/s
"""The current NVLink receive data throughput for each NVLink in KiB/s.
Returns: Union[int, NaType]
The current NVLink receive data throughput for each NVLink in KiB/s, or
:const:`nvitop.NA` when not applicable.
"""
return [rx for _, rx in self.nvlink_throughput()]
def nvlink_mean_rx_throughput(self) -> int | NaType: # in KiB/s
"""The mean NVLink receive data throughput for all NVLinks in KiB/s.
Returns: Union[int, NaType]
The mean NVLink receive data throughput for all NVLinks in KiB/s, or
:const:`nvitop.NA` when not applicable.
"""
return self.nvlink_mean_throughput().rx
def nvlink_tx_throughput_human(self) -> list[str | NaType]: # in human readable
"""The current NVLink transmit data throughput for each NVLink in human readable format.
Returns: Union[str, NaType]
The current NVLink transmit data throughput for each NVLink in human readable format, or
:const:`nvitop.NA` when not applicable.
"""
return [
f'{bytes2human(tx << 10)}/s' if libnvml.nvmlCheckReturn(tx, int) else NA # type: ignore[operator]
for tx in self.nvlink_tx_throughput()
]
def nvlink_mean_tx_throughput_human(self) -> str | NaType: # in human readable
"""The mean NVLink transmit data throughput for all NVLinks in human readable format.
Returns: Union[str, NaType]
The mean NVLink transmit data throughput for all NVLinks in human readable format, or
:const:`nvitop.NA` when not applicable.
"""
mean_tx = self.nvlink_mean_tx_throughput()
if libnvml.nvmlCheckReturn(mean_tx, int):
return f'{bytes2human(mean_tx << 10)}/s' # type: ignore[operator]
return NA
def nvlink_rx_throughput_human(self) -> list[str | NaType]: # in human readable
"""The current NVLink receive data throughput for each NVLink in human readable format.
Returns: Union[str, NaType]
The current NVLink receive data throughput for each NVLink in human readable format, or
:const:`nvitop.NA` when not applicable.
"""
return [
f'{bytes2human(rx << 10)}/s' if libnvml.nvmlCheckReturn(rx, int) else NA # type: ignore[operator]
for rx in self.nvlink_rx_throughput()
]
def nvlink_mean_rx_throughput_human(self) -> str | NaType: # in human readable
"""The mean NVLink receive data throughput for all NVLinks in human readable format.
Returns: Union[str, NaType]
The mean NVLink receive data throughput for all NVLinks in human readable format, or
:const:`nvitop.NA` when not applicable.
"""
mean_rx = self.nvlink_mean_rx_throughput()
if libnvml.nvmlCheckReturn(mean_rx, int):
return f'{bytes2human(mean_rx << 10)}/s' # type: ignore[operator]
return NA
def display_active(self) -> str | NaType:
"""A flag that indicates whether a display is initialized on the GPU's (e.g. memory is allocated on the device for display).
@ -2068,6 +2253,8 @@ class MigDevice(Device): # pylint: disable=too-many-instance-attributes
self._memory_total_human: str = NA
self._gpu_instance_id: int | NaType = NA
self._compute_instance_id: int | NaType = NA
self._nvlink_link_count: int | None = None
self._nvlink_throughput_counters: tuple[tuple[int | NaType, int]] | None = None
self._is_mig_device: bool = True
self._cuda_index: int | None = None
self._cuda_compute_capability: tuple[int, int] | NaType | None = None

View file

@ -28,6 +28,7 @@ import os as _os
import re as _re
import sys as _sys
import threading as _threading
import time as _time
from types import FunctionType as _FunctionType
from types import ModuleType as _ModuleType
from typing import TYPE_CHECKING as _TYPE_CHECKING
@ -41,7 +42,7 @@ import pynvml as _pynvml
from pynvml import * # noqa: F403 # pylint: disable=wildcard-import,unused-wildcard-import
from pynvml import nvmlDeviceGetPciInfo # appease mypy # noqa: F401 # pylint: disable=unused-import
from nvitop.api.utils import NA, UINT_MAX, ULONGLONG_MAX
from nvitop.api.utils import NA, UINT_MAX, ULONGLONG_MAX, NaType
from nvitop.api.utils import colored as __colored
@ -55,6 +56,7 @@ __all__ = [ # will be updated in below
'ULONGLONG_MAX',
'nvmlCheckReturn',
'nvmlQuery',
'nvmlQueryFieldValues',
'nvmlInit',
'nvmlInitWithFlags',
'nvmlShutdown',
@ -173,6 +175,7 @@ del (
# 5. Add explicit references to appease linters
# pylint: disable=no-member
c_nvmlDevice_t: _TypeAlias = _pynvml.c_nvmlDevice_t
c_nvmlFieldValue_t: _TypeAlias = _pynvml.c_nvmlFieldValue_t
NVML_SUCCESS: int = _pynvml.NVML_SUCCESS
NVML_ERROR_INSUFFICIENT_SIZE: int = _pynvml.NVML_ERROR_INSUFFICIENT_SIZE
NVMLError_FunctionNotFound: _TypeAlias = _pynvml.NVMLError_FunctionNotFound
@ -198,6 +201,18 @@ NVML_COMPUTEMODE_PROHIBITED: int = _pynvml.NVML_COMPUTEMODE_PROHIBITED
NVML_COMPUTEMODE_EXCLUSIVE_PROCESS: int = _pynvml.NVML_COMPUTEMODE_EXCLUSIVE_PROCESS
NVML_PCIE_UTIL_TX_BYTES: int = _pynvml.NVML_PCIE_UTIL_TX_BYTES
NVML_PCIE_UTIL_RX_BYTES: int = _pynvml.NVML_PCIE_UTIL_RX_BYTES
NVML_NVLINK_MAX_LINKS: int = _pynvml.NVML_NVLINK_MAX_LINKS
NVML_FI_DEV_NVLINK_LINK_COUNT: int = _pynvml.NVML_FI_DEV_NVLINK_LINK_COUNT
NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_TX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_TX
NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_RX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_DATA_RX
NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_TX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_TX
NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_RX: int = _pynvml.NVML_FI_DEV_NVLINK_THROUGHPUT_RAW_RX
NVML_VALUE_TYPE_DOUBLE: int = _pynvml.NVML_VALUE_TYPE_DOUBLE
NVML_VALUE_TYPE_UNSIGNED_INT: int = _pynvml.NVML_VALUE_TYPE_UNSIGNED_INT
NVML_VALUE_TYPE_UNSIGNED_LONG: int = _pynvml.NVML_VALUE_TYPE_UNSIGNED_LONG
NVML_VALUE_TYPE_UNSIGNED_LONG_LONG: int = _pynvml.NVML_VALUE_TYPE_UNSIGNED_LONG_LONG
NVML_VALUE_TYPE_SIGNED_LONG_LONG: int = _pynvml.NVML_VALUE_TYPE_SIGNED_LONG_LONG
NVML_VALUE_TYPE_SIGNED_INT: int = _pynvml.NVML_VALUE_TYPE_SIGNED_INT
# pylint: enable=no-member
# New members in `libnvml` #########################################################################
@ -450,6 +465,51 @@ def nvmlQuery(
return retval
def nvmlQueryFieldValues(
handle: c_nvmlDevice_t,
field_ids: list[int | tuple[int, int]],
) -> list[tuple[float | int | NaType, int]]:
"""Query multiple field values from NVML.
Request values for a list of fields for a device. This API allows multiple fields to be queried
at once. If any of the underlying fieldIds are populated by the same driver call, the results
for those field IDs will be populated from a single call rather than making a driver call for
each fieldId.
Raises:
NVMLError_InvalidArgument:
If device or field_ids is invalid.
"""
field_values = nvmlQuery('nvmlDeviceGetFieldValues', handle, field_ids)
if not nvmlCheckReturn(field_values):
timestamp = _time.time_ns() // 1000
return [(NA, timestamp) for _ in range(len(field_ids))]
values_with_timestamps: list[tuple[float | int | NaType, int]] = []
for field_value in field_values:
timestamp = field_value.timestamp
if field_value.nvmlReturn != NVML_SUCCESS:
value = NA
timestamp = _time.time_ns() // 1000
elif field_value.valueType == NVML_VALUE_TYPE_DOUBLE:
value = field_value.value.dVal
elif field_value.valueType == NVML_VALUE_TYPE_UNSIGNED_INT:
value = field_value.value.uiVal
elif field_value.valueType == NVML_VALUE_TYPE_UNSIGNED_LONG:
value = field_value.value.ulVal
elif field_value.valueType == NVML_VALUE_TYPE_UNSIGNED_LONG_LONG:
value = field_value.value.ullVal
elif field_value.valueType == NVML_VALUE_TYPE_SIGNED_LONG_LONG:
value = field_value.value.llVal
elif field_value.valueType == NVML_VALUE_TYPE_SIGNED_INT:
value = field_value.value.iVal
else:
value = NA
values_with_timestamps.append((value, timestamp))
return values_with_timestamps
def nvmlCheckReturn(
retval: _Any,
types: type | tuple[type, ...] | None = None,
@ -636,8 +696,6 @@ if not _pynvml_installation_corrupted:
using specific MIG device handles.
Raises:
NVMLError_InvalidArgument:
If the library has not been successfully initialized.
NVMLError_Uninitialized:
If NVML was not first initialized with :func:`nvmlInit`.
NVMLError_NoPermission:
@ -665,8 +723,6 @@ if not _pynvml_installation_corrupted:
using specific MIG device handles.
Raises:
NVMLError_InvalidArgument:
If the library has not been successfully initialized.
NVMLError_Uninitialized:
If NVML was not first initialized with :func:`nvmlInit`.
NVMLError_NoPermission:
@ -694,8 +750,6 @@ if not _pynvml_installation_corrupted:
using specific MIG device handles.
Raises:
NVMLError_InvalidArgument:
If the library has not been successfully initialized.
NVMLError_Uninitialized:
If NVML was not first initialized with :func:`nvmlInit`.
NVMLError_NoPermission:
@ -790,8 +844,6 @@ if not _pynvml_installation_corrupted:
using specific MIG device handles.
Raises:
NVMLError_InvalidArgument:
If the library has not been successfully initialized.
NVMLError_Uninitialized:
If NVML was not first initialized with :func:`nvmlInit`.
NVMLError_NoPermission: