Source code for bittensor.utils._register_cuda
import binascii
import hashlib
import math
from typing import Tuple
import numpy as np
from Crypto.Hash import keccak
from contextlib import redirect_stdout
import io
[docs]
def solve_cuda(
nonce_start: np.int64,
update_interval: np.int64,
tpb: int,
block_and_hotkey_hash_bytes: bytes,
difficulty: int,
limit: int,
dev_id: int = 0,
) -> Tuple[np.int64, bytes]:
"""
Solves the PoW problem using CUDA.
Args:
nonce_start: int64
Starting nonce.
update_interval: int64
Number of nonces to solve before updating block information.
tpb: int
Threads per block.
block_and_hotkey_hash_bytes: bytes
Keccak(Bytes of the block hash + bytes of the hotkey) 64 bytes.
difficulty: int256
Difficulty of the PoW problem.
limit: int256
Upper limit of the nonce.
dev_id: int (default=0)
The CUDA device ID
Returns:
Tuple[int64, bytes]
Tuple of the nonce and the seal corresponding to the solution.
Returns -1 for nonce if no solution is found.
"""
try:
import cubit
except ImportError:
raise ImportError("Please install cubit")
upper = int(limit // difficulty)
upper_bytes = upper.to_bytes(32, byteorder="little", signed=False)
def _hex_bytes_to_u8_list(hex_bytes: bytes):
hex_chunks = [
int(hex_bytes[i : i + 2], 16) for i in range(0, len(hex_bytes), 2)
]
return hex_chunks
def _create_seal_hash(block_and_hotkey_hash_hex: bytes, nonce: int) -> bytes:
nonce_bytes = binascii.hexlify(nonce.to_bytes(8, "little"))
pre_seal = nonce_bytes + block_and_hotkey_hash_hex
seal_sh256 = hashlib.sha256(bytearray(_hex_bytes_to_u8_list(pre_seal))).digest()
kec = keccak.new(digest_bits=256)
seal = kec.update(seal_sh256).digest()
return seal
def _seal_meets_difficulty(seal: bytes, difficulty: int):
seal_number = int.from_bytes(seal, "big")
product = seal_number * difficulty
limit = int(math.pow(2, 256)) - 1
return product < limit
# Call cython function
# int blockSize, uint64 nonce_start, uint64 update_interval, const unsigned char[:] limit,
# const unsigned char[:] block_bytes, int dev_id
block_and_hotkey_hash_hex = binascii.hexlify(block_and_hotkey_hash_bytes)[:64]
solution = cubit.solve_cuda(
tpb,
nonce_start,
update_interval,
upper_bytes,
block_and_hotkey_hash_hex,
dev_id,
) # 0 is first GPU
seal = None
if solution != -1:
seal = _create_seal_hash(block_and_hotkey_hash_hex, solution)
if _seal_meets_difficulty(seal, difficulty):
return solution, seal
else:
return -1, b"\x00" * 32
return solution, seal
[docs]
def reset_cuda():
"""
Resets the CUDA environment.
"""
try:
import cubit
except ImportError:
raise ImportError("Please install cubit")
cubit.reset_cuda()
[docs]
def log_cuda_errors() -> str:
"""
Logs any CUDA errors.
"""
try:
import cubit
except ImportError:
raise ImportError("Please install cubit")
f = io.StringIO()
with redirect_stdout(f):
cubit.log_cuda_errors()
s = f.getvalue()
return s