| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- #!/usr/bin/env python3
- import argparse
- import os
- import importlib
- from pathlib import Path
- from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig
- import torch
- import numpy as np
- ### If you want to dump RoPE activations, apply this monkey patch to the model
- ### class from Transformers that you are running (replace apertus.modeling_apertus
- ### with the proper package and class for your model
- ### === START ROPE DEBUG ===
- # from transformers.models.apertus.modeling_apertus import apply_rotary_pos_emb
- # orig_rope = apply_rotary_pos_emb
- # torch.set_printoptions(threshold=float('inf'))
- # torch.set_printoptions(precision=6, sci_mode=False)
- # def debug_rope(q, k, cos, sin, position_ids=None, unsqueeze_dim=1):
- # # log inputs
- # summarize(q, "RoPE.q_in")
- # summarize(k, "RoPE.k_in")
- # # call original
- # q_out, k_out = orig_rope(q, k, cos, sin, position_ids, unsqueeze_dim)
- # # log outputs
- # summarize(q_out, "RoPE.q_out")
- # summarize(k_out, "RoPE.k_out")
- # return q_out, k_out
- # # Patch it
- # import transformers.models.apertus.modeling_apertus as apertus_mod # noqa: E402
- # apertus_mod.apply_rotary_pos_emb = debug_rope
- ### == END ROPE DEBUG ===
- def summarize(tensor: torch.Tensor, name: str, max_seq: int = 3, max_vals: int = 3):
- """
- Print a tensor in llama.cpp debug style.
- Supports:
- - 2D tensors (seq, hidden)
- - 3D tensors (batch, seq, hidden)
- - 4D tensors (batch, seq, heads, dim_per_head) via flattening heads × dim_per_head
- Shows first and last max_vals of each vector per sequence position.
- """
- t = tensor.detach().to(torch.float32).cpu()
- # Determine dimensions
- if t.ndim == 3:
- _, s, _ = t.shape
- elif t.ndim == 2:
- _, s = 1, t.shape[0]
- t = t.unsqueeze(0)
- elif t.ndim == 4:
- _, s, _, _ = t.shape
- else:
- print(f"Skipping tensor due to unsupported dimensions: {t.ndim}")
- return
- ten_shape = t.shape
- print(f"ggml_debug: {name} = (f32) ... = {{{ten_shape}}}")
- print(" [")
- print(" [")
- # Determine indices for first and last sequences
- first_indices = list(range(min(s, max_seq)))
- last_indices = list(range(max(0, s - max_seq), s))
- # Check if there's an overlap between first and last indices or if we're at the edge case of s = 2 * max_seq
- has_overlap = bool(set(first_indices) & set(last_indices)) or (max_seq * 2 == s)
- # Combine indices
- if has_overlap:
- # If there's overlap, just use the combined unique indices
- indices = sorted(list(set(first_indices + last_indices)))
- separator_index = None
- else:
- # If no overlap, we'll add a separator between first and last sequences
- indices = first_indices + last_indices
- separator_index = len(first_indices)
- for i, si in enumerate(indices):
- # Add separator if needed
- if separator_index is not None and i == separator_index:
- print(" ...")
- # Extract appropriate slice
- vec = t[0, si]
- if vec.ndim == 2: # 4D case: flatten heads × dim_per_head
- flat = vec.flatten().tolist()
- else: # 2D or 3D case
- flat = vec.tolist()
- # First and last slices
- first = flat[:max_vals]
- last = flat[-max_vals:] if len(flat) >= max_vals else flat
- first_str = ", ".join(f"{v:12.4f}" for v in first)
- last_str = ", ".join(f"{v:12.4f}" for v in last)
- print(f" [{first_str}, ..., {last_str}]")
- print(" ],")
- print(" ]")
- print(f" sum = {t.sum().item():.6f}\n")
- def debug_hook(name):
- def fn(_m, input, output):
- if isinstance(input, torch.Tensor):
- summarize(input, name + "_in")
- elif isinstance(input, (tuple, list)) and isinstance(input[0], torch.Tensor):
- summarize(input[0], name + "_in")
- if isinstance(output, torch.Tensor):
- summarize(output, name + "_out")
- elif isinstance(output, (tuple, list)) and isinstance(output[0], torch.Tensor):
- summarize(output[0], name + "_out")
- return fn
- unreleased_model_name = os.getenv("UNRELEASED_MODEL_NAME")
- parser = argparse.ArgumentParser(description="Process model with specified path")
- parser.add_argument("--model-path", "-m", help="Path to the model")
- args = parser.parse_args()
- model_path = os.environ.get("MODEL_PATH", args.model_path)
- if model_path is None:
- parser.error(
- "Model path must be specified either via --model-path argument or MODEL_PATH environment variable"
- )
- config = AutoConfig.from_pretrained(model_path)
- print("Model type: ", config.model_type)
- print("Vocab size: ", config.vocab_size)
- print("Hidden size: ", config.hidden_size)
- print("Number of layers: ", config.num_hidden_layers)
- print("BOS token id: ", config.bos_token_id)
- print("EOS token id: ", config.eos_token_id)
- print("Loading model and tokenizer using AutoTokenizer:", model_path)
- tokenizer = AutoTokenizer.from_pretrained(model_path)
- config = AutoConfig.from_pretrained(model_path)
- if unreleased_model_name:
- model_name_lower = unreleased_model_name.lower()
- unreleased_module_path = (
- f"transformers.models.{model_name_lower}.modular_{model_name_lower}"
- )
- class_name = f"{unreleased_model_name}ForCausalLM"
- print(f"Importing unreleased model module: {unreleased_module_path}")
- try:
- model_class = getattr(
- importlib.import_module(unreleased_module_path), class_name
- )
- model = model_class.from_pretrained(
- model_path
- ) # Note: from_pretrained, not fromPretrained
- except (ImportError, AttributeError) as e:
- print(f"Failed to import or load model: {e}")
- exit(1)
- else:
- model = AutoModelForCausalLM.from_pretrained(
- model_path, device_map="auto", offload_folder="offload"
- )
- for name, module in model.named_modules():
- if len(list(module.children())) == 0: # only leaf modules
- module.register_forward_hook(debug_hook(name))
- model_name = os.path.basename(model_path)
- # Printing the Model class to allow for easier debugging. This can be useful
- # when working with models that have not been publicly released yet and this
- # migth require that the concrete class is imported and used directly instead
- # of using AutoModelForCausalLM.
- print(f"Model class: {model.__class__.__name__}")
- prompt = "Hello, my name is"
- input_ids = tokenizer(prompt, return_tensors="pt").input_ids
- print(f"Input tokens: {input_ids}")
- print(f"Input text: {repr(prompt)}")
- print(f"Tokenized: {tokenizer.convert_ids_to_tokens(input_ids[0])}")
- with torch.no_grad():
- outputs = model(input_ids.to(model.device))
- logits = outputs.logits
- # Extract logits for the last token (next token prediction)
- last_logits = logits[0, -1, :].cpu().numpy()
- print(f"Logits shape: {logits.shape}")
- print(f"Last token logits shape: {last_logits.shape}")
- print(f"Vocab size: {len(last_logits)}")
- data_dir = Path("data")
- data_dir.mkdir(exist_ok=True)
- bin_filename = data_dir / f"pytorch-{model_name}.bin"
- txt_filename = data_dir / f"pytorch-{model_name}.txt"
- # Save to file for comparison
- last_logits.astype(np.float32).tofile(bin_filename)
- # Also save as text file for easy inspection
- with open(txt_filename, "w") as f:
- for i, logit in enumerate(last_logits):
- f.write(f"{i}: {logit:.6f}\n")
- # Print some sample logits for quick verification
- print(f"First 10 logits: {last_logits[:10]}")
- print(f"Last 10 logits: {last_logits[-10:]}")
- # Show top 5 predicted tokens
- top_indices = np.argsort(last_logits)[-5:][::-1]
- print("Top 5 predictions:")
- for idx in top_indices:
- token = tokenizer.decode([idx])
- print(f" Token {idx} ({repr(token)}): {last_logits[idx]:.6f}")
- print(f"Saved bin logits to: {bin_filename}")
- print(f"Saved txt logist to: {txt_filename}")
|