|
|
@@ -24,14 +24,16 @@ from abc import ABC, abstractmethod
|
|
|
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
|
|
|
from dataclasses import dataclass
|
|
|
from pathlib import Path
|
|
|
-from typing import TYPE_CHECKING, Any, Callable, ClassVar, IO, Iterable, Literal, Protocol, TypeVar, runtime_checkable, Optional
|
|
|
+from typing import TYPE_CHECKING, Any, Callable, IO, Iterable, Literal, TypeVar, Optional
|
|
|
|
|
|
import numpy as np
|
|
|
-from sentencepiece import SentencePieceProcessor
|
|
|
|
|
|
if 'NO_LOCAL_GGUF' not in os.environ:
|
|
|
- sys.path.insert(1, str(Path(__file__).parent / 'gguf-py'))
|
|
|
+ # use .parent.parent since we are in "examples" directory
|
|
|
+ sys.path.insert(1, str(Path(__file__).parent.parent / 'gguf-py'))
|
|
|
+
|
|
|
import gguf
|
|
|
+from gguf import BaseVocab, Vocab, NoVocab, BpeVocab, SentencePieceVocab, LlamaHfVocab
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
|
from typing_extensions import Self, TypeAlias
|
|
|
@@ -380,306 +382,6 @@ class Metadata:
|
|
|
return metadata
|
|
|
|
|
|
|
|
|
-#
|
|
|
-# vocab
|
|
|
-#
|
|
|
-
|
|
|
-
|
|
|
-@runtime_checkable
|
|
|
-class BaseVocab(Protocol):
|
|
|
- tokenizer_model: ClassVar[str]
|
|
|
- name: ClassVar[str]
|
|
|
-
|
|
|
-
|
|
|
-class NoVocab(BaseVocab):
|
|
|
- tokenizer_model = "no_vocab"
|
|
|
- name = "no_vocab"
|
|
|
-
|
|
|
- def __repr__(self) -> str:
|
|
|
- return "<NoVocab for a model without integrated vocabulary>"
|
|
|
-
|
|
|
-
|
|
|
-@runtime_checkable
|
|
|
-class Vocab(BaseVocab, Protocol):
|
|
|
- vocab_size: int
|
|
|
- added_tokens_dict: dict[str, int]
|
|
|
- added_tokens_list: list[str]
|
|
|
- fname_tokenizer: Path
|
|
|
-
|
|
|
- def __init__(self, base_path: Path): ...
|
|
|
- def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]: ...
|
|
|
-
|
|
|
-
|
|
|
-class BpeVocab(Vocab):
|
|
|
- tokenizer_model = "gpt2"
|
|
|
- name = "bpe"
|
|
|
-
|
|
|
- def __init__(self, base_path: Path):
|
|
|
- added_tokens: dict[str, int] = {}
|
|
|
-
|
|
|
- if (fname_tokenizer := base_path / 'vocab.json').exists():
|
|
|
- # "slow" tokenizer
|
|
|
- with open(fname_tokenizer, encoding="utf-8") as f:
|
|
|
- self.vocab = json.load(f)
|
|
|
-
|
|
|
- try:
|
|
|
- # FIXME: Verify that added tokens here _cannot_ overlap with the main vocab.
|
|
|
- with open(base_path / ADDED_TOKENS_FILE, encoding="utf-8") as f:
|
|
|
- added_tokens = json.load(f)
|
|
|
- except FileNotFoundError:
|
|
|
- pass
|
|
|
- else:
|
|
|
- # "fast" tokenizer
|
|
|
- fname_tokenizer = base_path / FAST_TOKENIZER_FILE
|
|
|
-
|
|
|
- # if this fails, FileNotFoundError propagates to caller
|
|
|
- with open(fname_tokenizer, encoding="utf-8") as f:
|
|
|
- tokenizer_json = json.load(f)
|
|
|
-
|
|
|
- tokenizer_model: dict[str, Any] = tokenizer_json['model']
|
|
|
- if (
|
|
|
- tokenizer_model['type'] != 'BPE' or tokenizer_model.get('byte_fallback', False)
|
|
|
- or tokenizer_json['decoder']['type'] != 'ByteLevel'
|
|
|
- ):
|
|
|
- raise FileNotFoundError('Cannot find GPT-2 BPE tokenizer')
|
|
|
-
|
|
|
- self.vocab = tokenizer_model["vocab"]
|
|
|
-
|
|
|
- if (added := tokenizer_json.get('added_tokens')) is not None:
|
|
|
- # Added tokens here can be duplicates of the main vocabulary.
|
|
|
- added_tokens = {item['content']: item['id']
|
|
|
- for item in added
|
|
|
- if item['content'] not in self.vocab}
|
|
|
-
|
|
|
- vocab_size = len(self.vocab)
|
|
|
- expected_ids = list(range(vocab_size, vocab_size + len(added_tokens)))
|
|
|
- actual_ids = sorted(added_tokens.values())
|
|
|
- if expected_ids != actual_ids:
|
|
|
- expected_end_id = vocab_size + len(actual_ids) - 1
|
|
|
- raise ValueError(f"Expected the {len(actual_ids)} added token ID(s) to be sequential in the range "
|
|
|
- f"{vocab_size} - {expected_end_id}; got {actual_ids}")
|
|
|
-
|
|
|
- items = sorted(added_tokens.items(), key=lambda text_idx: text_idx[1])
|
|
|
- self.added_tokens_dict = added_tokens
|
|
|
- self.added_tokens_list = [text for (text, idx) in items]
|
|
|
- self.vocab_size_base = vocab_size
|
|
|
- self.vocab_size = self.vocab_size_base + len(self.added_tokens_list)
|
|
|
- self.fname_tokenizer = fname_tokenizer
|
|
|
-
|
|
|
- def bpe_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- reverse_vocab = {id: encoded_tok for encoded_tok, id in self.vocab.items()}
|
|
|
-
|
|
|
- for i, _ in enumerate(self.vocab):
|
|
|
- yield reverse_vocab[i], 0.0, gguf.TokenType.NORMAL
|
|
|
-
|
|
|
- def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- for text in self.added_tokens_list:
|
|
|
- score = -1000.0
|
|
|
- yield text.encode("utf-8"), score, gguf.TokenType.CONTROL
|
|
|
-
|
|
|
- def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- yield from self.bpe_tokens()
|
|
|
- yield from self.added_tokens()
|
|
|
-
|
|
|
- def __repr__(self) -> str:
|
|
|
- return f"<BpeVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"
|
|
|
-
|
|
|
-
|
|
|
-class SentencePieceVocab(Vocab):
|
|
|
- tokenizer_model = "llama"
|
|
|
- name = "spm"
|
|
|
-
|
|
|
- def __init__(self, base_path: Path):
|
|
|
- added_tokens: dict[str, int] = {}
|
|
|
- if (fname_tokenizer := base_path / 'tokenizer.model').exists():
|
|
|
- # normal location
|
|
|
- try:
|
|
|
- with open(base_path / ADDED_TOKENS_FILE, encoding="utf-8") as f:
|
|
|
- added_tokens = json.load(f)
|
|
|
- except FileNotFoundError:
|
|
|
- pass
|
|
|
- elif not (fname_tokenizer := base_path.parent / 'tokenizer.model').exists():
|
|
|
- # not found in alternate location either
|
|
|
- raise FileNotFoundError('Cannot find tokenizer.model')
|
|
|
-
|
|
|
- self.sentencepiece_tokenizer = SentencePieceProcessor()
|
|
|
- self.sentencepiece_tokenizer.LoadFromFile(str(fname_tokenizer))
|
|
|
- vocab_size = self.sentencepiece_tokenizer.vocab_size()
|
|
|
-
|
|
|
- new_tokens = {id: piece for piece, id in added_tokens.items() if id >= vocab_size}
|
|
|
- expected_new_ids = list(range(vocab_size, vocab_size + len(new_tokens)))
|
|
|
- actual_new_ids = sorted(new_tokens.keys())
|
|
|
-
|
|
|
- if expected_new_ids != actual_new_ids:
|
|
|
- raise ValueError(f"Expected new token IDs {expected_new_ids} to be sequential; got {actual_new_ids}")
|
|
|
-
|
|
|
- # Token pieces that were added to the base vocabulary.
|
|
|
- self.added_tokens_dict = added_tokens
|
|
|
- self.added_tokens_list = [new_tokens[id] for id in actual_new_ids]
|
|
|
- self.vocab_size_base = vocab_size
|
|
|
- self.vocab_size = self.vocab_size_base + len(self.added_tokens_list)
|
|
|
- self.fname_tokenizer = fname_tokenizer
|
|
|
-
|
|
|
- def sentencepiece_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- tokenizer = self.sentencepiece_tokenizer
|
|
|
- for i in range(tokenizer.vocab_size()):
|
|
|
- piece = tokenizer.IdToPiece(i)
|
|
|
- text = piece.encode("utf-8")
|
|
|
- score: float = tokenizer.GetScore(i)
|
|
|
-
|
|
|
- toktype = gguf.TokenType.NORMAL
|
|
|
- if tokenizer.IsUnknown(i):
|
|
|
- toktype = gguf.TokenType.UNKNOWN
|
|
|
- if tokenizer.IsControl(i):
|
|
|
- toktype = gguf.TokenType.CONTROL
|
|
|
-
|
|
|
- # NOTE: I think added_tokens are user defined.
|
|
|
- # ref: https://github.com/google/sentencepiece/blob/master/src/sentencepiece_model.proto
|
|
|
- # if tokenizer.is_user_defined(i): toktype = gguf.TokenType.USER_DEFINED
|
|
|
-
|
|
|
- if tokenizer.IsUnused(i):
|
|
|
- toktype = gguf.TokenType.UNUSED
|
|
|
- if tokenizer.IsByte(i):
|
|
|
- toktype = gguf.TokenType.BYTE
|
|
|
-
|
|
|
- yield text, score, toktype
|
|
|
-
|
|
|
- def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- for text in self.added_tokens_list:
|
|
|
- score = -1000.0
|
|
|
- yield text.encode("utf-8"), score, gguf.TokenType.USER_DEFINED
|
|
|
-
|
|
|
- def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- yield from self.sentencepiece_tokens()
|
|
|
- yield from self.added_tokens()
|
|
|
-
|
|
|
- def __repr__(self) -> str:
|
|
|
- return f"<SentencePieceVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"
|
|
|
-
|
|
|
-
|
|
|
-class LlamaHfVocab(Vocab):
|
|
|
- tokenizer_model = "llama"
|
|
|
- name = "hfft"
|
|
|
-
|
|
|
- def __init__(self, base_path: Path):
|
|
|
- fname_tokenizer = base_path / FAST_TOKENIZER_FILE
|
|
|
- # if this fails, FileNotFoundError propagates to caller
|
|
|
- with open(fname_tokenizer, encoding='utf-8') as f:
|
|
|
- tokenizer_json = json.load(f)
|
|
|
-
|
|
|
- # pre-check so we know if we need transformers
|
|
|
- tokenizer_model: dict[str, Any] = tokenizer_json['model']
|
|
|
- is_llama3 = (
|
|
|
- tokenizer_model['type'] == 'BPE' and tokenizer_model.get('ignore_merges', False)
|
|
|
- and not tokenizer_model.get('byte_fallback', True)
|
|
|
- )
|
|
|
- if is_llama3:
|
|
|
- raise TypeError('Llama 3 must be converted with BpeVocab')
|
|
|
-
|
|
|
- if not is_llama3 and (
|
|
|
- tokenizer_model['type'] != 'BPE' or not tokenizer_model.get('byte_fallback', False)
|
|
|
- or tokenizer_json['decoder']['type'] != 'Sequence'
|
|
|
- ):
|
|
|
- raise FileNotFoundError('Cannot find Llama BPE tokenizer')
|
|
|
-
|
|
|
- try:
|
|
|
- from transformers import AutoTokenizer
|
|
|
- except ImportError as e:
|
|
|
- raise ImportError(
|
|
|
- "To use LlamaHfVocab, please install the `transformers` package. "
|
|
|
- "You can install it with `pip install transformers`."
|
|
|
- ) from e
|
|
|
-
|
|
|
- # Allow the tokenizer to default to slow or fast versions.
|
|
|
- # Explicitly set tokenizer to use local paths.
|
|
|
- self.tokenizer = AutoTokenizer.from_pretrained(
|
|
|
- base_path,
|
|
|
- cache_dir=base_path,
|
|
|
- local_files_only=True,
|
|
|
- )
|
|
|
- assert self.tokenizer.is_fast # assume tokenizer.json is used
|
|
|
-
|
|
|
- # Initialize lists and dictionaries for added tokens
|
|
|
- self.added_tokens_list = []
|
|
|
- self.added_tokens_dict = dict()
|
|
|
- self.added_tokens_ids = set()
|
|
|
-
|
|
|
- # Process added tokens
|
|
|
- for tok, tokidx in sorted(
|
|
|
- self.tokenizer.get_added_vocab().items(), key=lambda x: x[1]
|
|
|
- ):
|
|
|
- # Only consider added tokens that are not in the base vocabulary
|
|
|
- if tokidx >= self.tokenizer.vocab_size:
|
|
|
- self.added_tokens_list.append(tok)
|
|
|
- self.added_tokens_dict[tok] = tokidx
|
|
|
- self.added_tokens_ids.add(tokidx)
|
|
|
-
|
|
|
- # Store special tokens and their IDs
|
|
|
- self.specials = {
|
|
|
- tok: self.tokenizer.get_vocab()[tok]
|
|
|
- for tok in self.tokenizer.all_special_tokens
|
|
|
- }
|
|
|
- self.special_ids = set(self.tokenizer.all_special_ids)
|
|
|
-
|
|
|
- # Set vocabulary sizes
|
|
|
- self.vocab_size_base = self.tokenizer.vocab_size
|
|
|
- self.vocab_size = self.vocab_size_base + len(self.added_tokens_list)
|
|
|
-
|
|
|
- self.fname_tokenizer = fname_tokenizer
|
|
|
-
|
|
|
- def hf_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- reverse_vocab = {
|
|
|
- id: encoded_tok for encoded_tok, id in self.tokenizer.get_vocab().items()
|
|
|
- }
|
|
|
-
|
|
|
- for token_id in range(self.vocab_size_base):
|
|
|
- # Skip processing added tokens here
|
|
|
- if token_id in self.added_tokens_ids:
|
|
|
- continue
|
|
|
-
|
|
|
- # Convert token text to bytes
|
|
|
- token_text = reverse_vocab[token_id].encode("utf-8")
|
|
|
-
|
|
|
- # Yield token text, score, and type
|
|
|
- yield token_text, self.get_token_score(token_id), self.get_token_type(
|
|
|
- token_id, token_text, self.special_ids # Reuse already stored special IDs
|
|
|
- )
|
|
|
-
|
|
|
- def get_token_type(self, token_id: int, token_text: bytes, special_ids: set[int]) -> gguf.TokenType:
|
|
|
- # Special case for byte tokens
|
|
|
- if re.fullmatch(br"<0x[0-9A-Fa-f]{2}>", token_text):
|
|
|
- return gguf.TokenType.BYTE
|
|
|
-
|
|
|
- # Determine token type based on whether it's a special token
|
|
|
- return gguf.TokenType.CONTROL if token_id in special_ids else gguf.TokenType.NORMAL
|
|
|
-
|
|
|
- def get_token_score(self, token_id: int) -> float:
|
|
|
- # Placeholder for actual logic to determine the token's score
|
|
|
- # This needs to be implemented based on specific requirements
|
|
|
- return -1000.0 # Default score
|
|
|
-
|
|
|
- def added_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- for text in self.added_tokens_list:
|
|
|
- if text in self.specials:
|
|
|
- toktype = self.get_token_type(self.specials[text], b'', self.special_ids)
|
|
|
- score = self.get_token_score(self.specials[text])
|
|
|
- else:
|
|
|
- toktype = gguf.TokenType.USER_DEFINED
|
|
|
- score = -1000.0
|
|
|
-
|
|
|
- yield text.encode("utf-8"), score, toktype
|
|
|
-
|
|
|
- def has_newline_token(self):
|
|
|
- return "<0x0A>" in self.tokenizer.vocab or "\n" in self.tokenizer.vocab
|
|
|
-
|
|
|
- def all_tokens(self) -> Iterable[tuple[bytes, float, gguf.TokenType]]:
|
|
|
- yield from self.hf_tokens()
|
|
|
- yield from self.added_tokens()
|
|
|
-
|
|
|
- def __repr__(self) -> str:
|
|
|
- return f"<LlamaHfVocab with {self.vocab_size_base} base tokens and {len(self.added_tokens_list)} added tokens>"
|
|
|
-
|
|
|
-
|
|
|
#
|
|
|
# data loading
|
|
|
# TODO: reuse (probably move to gguf.py?)
|