// Package device provides cross-device tensor operations and placement management. // It serves as the central hub for device-aware computation in the makarna engine. package device import ( "fmt" "sync" "unsafe" "makarna/pkg/backend/cpu" "makarna/pkg/backend/cuda" "makarna/pkg/tensor" ) // WeightCache caches GPU copies of weights to avoid repeated H2D transfers. // Thread-safe for concurrent layer execution. type WeightCache struct { mu sync.RWMutex cache map[string]*cuda.Tensor // key: "layer_idx:weight_name" gpuID int } // NewWeightCache creates a new weight cache for a specific GPU. func NewWeightCache(gpuID int) *WeightCache { return &WeightCache{ cache: make(map[string]*cuda.Tensor), gpuID: gpuID, } } // Get retrieves a cached GPU tensor, returning nil if not cached. func (wc *WeightCache) Get(key string) *cuda.Tensor { wc.mu.RLock() defer wc.mu.RUnlock() return wc.cache[key] } // Put adds a GPU tensor to the cache. func (wc *WeightCache) Put(key string, t *cuda.Tensor) { wc.mu.Lock() defer wc.mu.Unlock() wc.cache[key] = t } // Clear frees all cached GPU tensors. func (wc *WeightCache) Clear() { wc.mu.Lock() defer wc.mu.Unlock() wc.cache = make(map[string]*cuda.Tensor) } // EnsureOn returns a tensor on the requested placement, copying if needed. // For CPU tensors going to CUDA, this creates a NEW tensor each time. // Use EnsureOnCached for weight tensors that should be cached. func EnsureOn(t tensor.Tensor, target tensor.DevicePlacement) (tensor.Tensor, error) { if twp, ok := t.(tensor.TensorWithPlacement); ok { if twp.Placement() == target.Normalize() { return t, nil } } switch target.Type { case tensor.CPU: return toCPU(t) case tensor.CUDA: return toCUDA(t, target.GPU) default: return nil, fmt.Errorf("unsupported target device %v", target.Type) } } // EnsureOnCached is like EnsureOn but uses a cache for weight tensors. // The key should uniquely identify the weight (e.g., "layer_0:wq"). func EnsureOnCached(t tensor.Tensor, target tensor.DevicePlacement, cache *WeightCache, key string) (tensor.Tensor, error) { if target.Type != tensor.CUDA { return EnsureOn(t, target) } if cache == nil { return EnsureOn(t, target) } // Check cache first if cached := cache.Get(key); cached != nil { return cached, nil } // Not cached, create and cache result, err := toCUDA(t, target.GPU) if err != nil { return nil, err } cudaTensor, ok := result.(*cuda.Tensor) if ok { cache.Put(key, cudaTensor) } return result, nil } // CUDAAvailable returns whether CUDA is available. func CUDAAvailable() bool { return cuda.Available() } func toCPU(t tensor.Tensor) (tensor.Tensor, error) { if c, ok := t.(*cpu.Tensor); ok { return c, nil } switch src := t.(type) { case *cuda.Tensor: if !cuda.Available() { return nil, fmt.Errorf("CUDA not available") } out := cpu.NewTensor(src.Shape(), nil) host := out.DataFloat32() if err := src.CopyToHost(host); err != nil { return nil, fmt.Errorf("copy to host failed: %w", err) } return out, nil default: return nil, fmt.Errorf("toCPU: unsupported tensor type %T", t) } } func toCUDA(t tensor.Tensor, gpu int) (tensor.Tensor, error) { if !cuda.Available() { return nil, fmt.Errorf("CUDA not available - build with -tags=cuda") } switch src := t.(type) { case *cuda.Tensor: if src.GPU() == gpu { return src, nil } if src.DType() != tensor.Float32 { return nil, fmt.Errorf("cross-GPU tensor copy only supports float32, got %v", src.DType()) } out, err := cuda.NewTensor(src.Shape(), src.DType(), gpu) if err != nil { return nil, err } size := uintptr(src.Shape().NumElements() * src.DType().Size()) if err := cuda.MemcpyD2D(out.Data().(unsafe.Pointer), src.Data().(unsafe.Pointer), size, gpu); err != nil { // Conservative fallback: stage via host. host := make([]float32, src.Shape().NumElements()) if err2 := src.CopyToHost(host); err2 != nil { out.Free() return nil, fmt.Errorf("cross-GPU copy D2H failed: %w", err2) } if err2 := out.CopyFrom(host); err2 != nil { out.Free() return nil, fmt.Errorf("cross-GPU copy H2D failed: %w", err2) } } return out, nil } // For quantized tensors, we need dequantization first if t.DType() != tensor.Float32 { return nil, fmt.Errorf("toCUDA: only float32 currently supported, got %v", t.DType()) } out, err := cuda.NewTensor(t.Shape(), t.DType(), gpu) if err != nil { return nil, err } switch s := t.(type) { case *cpu.Tensor: if err := out.CopyFrom(s.DataFloat32()); err != nil { return nil, err } default: return nil, fmt.Errorf("toCUDA: unsupported source type %T", t) } return out, nil } // DeviceDispatcher manages per-device operations and caching. type DeviceDispatcher struct { layerDevices []tensor.DevicePlacement weightCaches map[int]*WeightCache // gpuID -> cache mu sync.RWMutex } // NewDeviceDispatcher creates a dispatcher with the given layer placements. func NewDeviceDispatcher(layerDevices []tensor.DevicePlacement) *DeviceDispatcher { dd := &DeviceDispatcher{ layerDevices: layerDevices, weightCaches: make(map[int]*WeightCache), } // Pre-create caches for each GPU mentioned for _, p := range layerDevices { if p.Type == tensor.CUDA { if _, exists := dd.weightCaches[p.GPU]; !exists { dd.weightCaches[p.GPU] = NewWeightCache(p.GPU) } } } return dd } // LayerPlacement returns the device placement for a layer. func (dd *DeviceDispatcher) LayerPlacement(layerIdx int) tensor.DevicePlacement { if layerIdx >= 0 && layerIdx < len(dd.layerDevices) { return dd.layerDevices[layerIdx] } return tensor.DevicePlacement{Type: tensor.CPU, GPU: -1} } // GetWeightCache returns the weight cache for a GPU, creating one if needed. func (dd *DeviceDispatcher) GetWeightCache(gpuID int) *WeightCache { dd.mu.Lock() defer dd.mu.Unlock() if cache, exists := dd.weightCaches[gpuID]; exists { return cache } cache := NewWeightCache(gpuID) dd.weightCaches[gpuID] = cache return cache } // IsLayerOnGPU returns true if the layer should run on GPU. func (dd *DeviceDispatcher) IsLayerOnGPU(layerIdx int) bool { p := dd.LayerPlacement(layerIdx) return p.Type == tensor.CUDA } // NumGPULayers counts how many layers are placed on GPU. func (dd *DeviceDispatcher) NumGPULayers() int { count := 0 for _, p := range dd.layerDevices { if p.Type == tensor.CUDA { count++ } } return count } // Clear frees all cached resources. func (dd *DeviceDispatcher) Clear() { dd.mu.Lock() defer dd.mu.Unlock() for _, cache := range dd.weightCaches { cache.Clear() } }