| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- // Package device provides cross-device tensor operations and placement management.
- // It serves as the central hub for device-aware computation in the makarna engine.
- package device
- import (
- "fmt"
- "sync"
- "unsafe"
- "makarna/pkg/backend/cpu"
- "makarna/pkg/backend/cuda"
- "makarna/pkg/tensor"
- )
- // WeightCache caches GPU copies of weights to avoid repeated H2D transfers.
- // Thread-safe for concurrent layer execution.
- type WeightCache struct {
- mu sync.RWMutex
- cache map[string]*cuda.Tensor // key: "layer_idx:weight_name"
- gpuID int
- }
- // NewWeightCache creates a new weight cache for a specific GPU.
- func NewWeightCache(gpuID int) *WeightCache {
- return &WeightCache{
- cache: make(map[string]*cuda.Tensor),
- gpuID: gpuID,
- }
- }
- // Get retrieves a cached GPU tensor, returning nil if not cached.
- func (wc *WeightCache) Get(key string) *cuda.Tensor {
- wc.mu.RLock()
- defer wc.mu.RUnlock()
- return wc.cache[key]
- }
- // Put adds a GPU tensor to the cache.
- func (wc *WeightCache) Put(key string, t *cuda.Tensor) {
- wc.mu.Lock()
- defer wc.mu.Unlock()
- wc.cache[key] = t
- }
- // Clear frees all cached GPU tensors.
- func (wc *WeightCache) Clear() {
- wc.mu.Lock()
- defer wc.mu.Unlock()
- wc.cache = make(map[string]*cuda.Tensor)
- }
- // EnsureOn returns a tensor on the requested placement, copying if needed.
- // For CPU tensors going to CUDA, this creates a NEW tensor each time.
- // Use EnsureOnCached for weight tensors that should be cached.
- func EnsureOn(t tensor.Tensor, target tensor.DevicePlacement) (tensor.Tensor, error) {
- if twp, ok := t.(tensor.TensorWithPlacement); ok {
- if twp.Placement() == target.Normalize() {
- return t, nil
- }
- }
- switch target.Type {
- case tensor.CPU:
- return toCPU(t)
- case tensor.CUDA:
- return toCUDA(t, target.GPU)
- default:
- return nil, fmt.Errorf("unsupported target device %v", target.Type)
- }
- }
- // EnsureOnCached is like EnsureOn but uses a cache for weight tensors.
- // The key should uniquely identify the weight (e.g., "layer_0:wq").
- func EnsureOnCached(t tensor.Tensor, target tensor.DevicePlacement, cache *WeightCache, key string) (tensor.Tensor, error) {
- if target.Type != tensor.CUDA {
- return EnsureOn(t, target)
- }
- if cache == nil {
- return EnsureOn(t, target)
- }
- // Check cache first
- if cached := cache.Get(key); cached != nil {
- return cached, nil
- }
- // Not cached, create and cache
- result, err := toCUDA(t, target.GPU)
- if err != nil {
- return nil, err
- }
- cudaTensor, ok := result.(*cuda.Tensor)
- if ok {
- cache.Put(key, cudaTensor)
- }
- return result, nil
- }
- // CUDAAvailable returns whether CUDA is available.
- func CUDAAvailable() bool {
- return cuda.Available()
- }
- func toCPU(t tensor.Tensor) (tensor.Tensor, error) {
- if c, ok := t.(*cpu.Tensor); ok {
- return c, nil
- }
- switch src := t.(type) {
- case *cuda.Tensor:
- if !cuda.Available() {
- return nil, fmt.Errorf("CUDA not available")
- }
- out := cpu.NewTensor(src.Shape(), nil)
- host := out.DataFloat32()
- if err := src.CopyToHost(host); err != nil {
- return nil, fmt.Errorf("copy to host failed: %w", err)
- }
- return out, nil
- default:
- return nil, fmt.Errorf("toCPU: unsupported tensor type %T", t)
- }
- }
- func toCUDA(t tensor.Tensor, gpu int) (tensor.Tensor, error) {
- if !cuda.Available() {
- return nil, fmt.Errorf("CUDA not available - build with -tags=cuda")
- }
- switch src := t.(type) {
- case *cuda.Tensor:
- if src.GPU() == gpu {
- return src, nil
- }
- if src.DType() != tensor.Float32 {
- return nil, fmt.Errorf("cross-GPU tensor copy only supports float32, got %v", src.DType())
- }
- out, err := cuda.NewTensor(src.Shape(), src.DType(), gpu)
- if err != nil {
- return nil, err
- }
- size := uintptr(src.Shape().NumElements() * src.DType().Size())
- if err := cuda.MemcpyD2D(out.Data().(unsafe.Pointer), src.Data().(unsafe.Pointer), size, gpu); err != nil {
- // Conservative fallback: stage via host.
- host := make([]float32, src.Shape().NumElements())
- if err2 := src.CopyToHost(host); err2 != nil {
- out.Free()
- return nil, fmt.Errorf("cross-GPU copy D2H failed: %w", err2)
- }
- if err2 := out.CopyFrom(host); err2 != nil {
- out.Free()
- return nil, fmt.Errorf("cross-GPU copy H2D failed: %w", err2)
- }
- }
- return out, nil
- }
- // For quantized tensors, we need dequantization first
- if t.DType() != tensor.Float32 {
- return nil, fmt.Errorf("toCUDA: only float32 currently supported, got %v", t.DType())
- }
- out, err := cuda.NewTensor(t.Shape(), t.DType(), gpu)
- if err != nil {
- return nil, err
- }
- switch s := t.(type) {
- case *cpu.Tensor:
- if err := out.CopyFrom(s.DataFloat32()); err != nil {
- return nil, err
- }
- default:
- return nil, fmt.Errorf("toCUDA: unsupported source type %T", t)
- }
- return out, nil
- }
- // DeviceDispatcher manages per-device operations and caching.
- type DeviceDispatcher struct {
- layerDevices []tensor.DevicePlacement
- weightCaches map[int]*WeightCache // gpuID -> cache
- mu sync.RWMutex
- }
- // NewDeviceDispatcher creates a dispatcher with the given layer placements.
- func NewDeviceDispatcher(layerDevices []tensor.DevicePlacement) *DeviceDispatcher {
- dd := &DeviceDispatcher{
- layerDevices: layerDevices,
- weightCaches: make(map[int]*WeightCache),
- }
- // Pre-create caches for each GPU mentioned
- for _, p := range layerDevices {
- if p.Type == tensor.CUDA {
- if _, exists := dd.weightCaches[p.GPU]; !exists {
- dd.weightCaches[p.GPU] = NewWeightCache(p.GPU)
- }
- }
- }
- return dd
- }
- // LayerPlacement returns the device placement for a layer.
- func (dd *DeviceDispatcher) LayerPlacement(layerIdx int) tensor.DevicePlacement {
- if layerIdx >= 0 && layerIdx < len(dd.layerDevices) {
- return dd.layerDevices[layerIdx]
- }
- return tensor.DevicePlacement{Type: tensor.CPU, GPU: -1}
- }
- // GetWeightCache returns the weight cache for a GPU, creating one if needed.
- func (dd *DeviceDispatcher) GetWeightCache(gpuID int) *WeightCache {
- dd.mu.Lock()
- defer dd.mu.Unlock()
- if cache, exists := dd.weightCaches[gpuID]; exists {
- return cache
- }
- cache := NewWeightCache(gpuID)
- dd.weightCaches[gpuID] = cache
- return cache
- }
- // IsLayerOnGPU returns true if the layer should run on GPU.
- func (dd *DeviceDispatcher) IsLayerOnGPU(layerIdx int) bool {
- p := dd.LayerPlacement(layerIdx)
- return p.Type == tensor.CUDA
- }
- // NumGPULayers counts how many layers are placed on GPU.
- func (dd *DeviceDispatcher) NumGPULayers() int {
- count := 0
- for _, p := range dd.layerDevices {
- if p.Type == tensor.CUDA {
- count++
- }
- }
- return count
- }
- // Clear frees all cached resources.
- func (dd *DeviceDispatcher) Clear() {
- dd.mu.Lock()
- defer dd.mu.Unlock()
- for _, cache := range dd.weightCaches {
- cache.Clear()
- }
- }
|