llm-gguf-tools/helpers/gguf/writer.py
2025-08-09 17:16:02 +01:00

374 lines
14 KiB
Python

"""GGUF file writing operations.
Provides high-level interface for creating GGUF files with metadata,
tensors, and tokeniser information.
"""
from __future__ import annotations
import json
import operator
import traceback
from pathlib import Path
from typing import TYPE_CHECKING, Any, Protocol
import gguf
from helpers.logger import logger
if TYPE_CHECKING:
import numpy as np
from helpers.models.conversion import ModelConfig
class VisionConfig(Protocol):
"""Protocol for vision model configuration."""
hidden_size: int
num_hidden_layers: int
num_attention_heads: int
intermediate_size: int
patch_size: int
spatial_merge_size: int
class GGUFWriter:
"""Manages GGUF file creation and metadata writing.
Provides high-level interface for GGUF file operations including metadata
configuration, tensor addition, and tokeniser integration. Encapsulates
low-level GGUF library interactions for consistent error handling.
"""
def __init__(self, output_path: Path, architecture: str) -> None:
"""Initialise GGUF writer with output path and architecture.
Creates the underlying GGUF writer instance and prepares for metadata
and tensor addition. Sets up the file structure for the specified
model architecture.
"""
self.output_path = output_path
self.architecture = architecture
self.writer = gguf.GGUFWriter(str(output_path), architecture)
logger.info(f"Created GGUF writer for {architecture} architecture")
def add_metadata(self, model_config: ModelConfig, model_name: str) -> None:
"""Add comprehensive metadata from model configuration.
Writes general model information, architectural parameters, and
quantisation settings to the GGUF file header. Handles both standard
and vision model configurations with appropriate parameter mapping.
"""
# General metadata
self.writer.add_name(model_name)
self.writer.add_description(f"Converted from {model_config.architectures[0]}")
self.writer.add_file_type(gguf.LlamaFileType.ALL_F32)
# Log architecture being used
logger.info(f"Setting GGUF architecture: {self.architecture}")
if self.architecture not in {"llama", "qwen2", "gemma", "phi3", "falcon", "gpt2"}:
logger.warning(f"Architecture '{self.architecture}' may not be supported by llama.cpp")
# Model parameters from config
params = model_config.to_gguf_params()
self.writer.add_context_length(params.context_length)
self.writer.add_embedding_length(params.embedding_length)
self.writer.add_block_count(params.block_count)
self.writer.add_feed_forward_length(params.feed_forward_length)
self.writer.add_head_count(params.attention_head_count)
self.writer.add_head_count_kv(params.attention_head_count_kv)
self.writer.add_layer_norm_rms_eps(params.attention_layer_norm_rms_epsilon)
self.writer.add_rope_freq_base(params.rope_freq_base)
self.writer.add_rope_dimension_count(params.rope_dimension_count)
logger.info(f"Added metadata: {params.block_count} layers, {params.context_length} context")
def add_vision_metadata(self, vision_config: VisionConfig | None) -> None:
"""Add vision model parameters to GGUF metadata.
Configures vision-specific parameters for multimodal models including
embedding dimensions, attention heads, and spatial processing settings.
"""
if not vision_config:
return
logger.info("Adding vision model parameters...")
self.writer.add_vision_embedding_length(vision_config.hidden_size)
self.writer.add_vision_block_count(vision_config.num_hidden_layers)
self.writer.add_vision_head_count(vision_config.num_attention_heads)
self.writer.add_vision_feed_forward_length(vision_config.intermediate_size)
self.writer.add_vision_patch_size(vision_config.patch_size)
self.writer.add_vision_spatial_merge_size(vision_config.spatial_merge_size)
if hasattr(vision_config, "rms_norm_eps") and vision_config.rms_norm_eps:
self.writer.add_vision_attention_layernorm_eps(vision_config.rms_norm_eps)
def add_tokeniser(self, tokeniser_config: dict[str, Any]) -> None:
"""Add tokeniser metadata to GGUF file.
Writes special token IDs and tokeniser model type to enable proper
text processing during inference. Uses sensible defaults for missing
configuration values.
"""
self.writer.add_bos_token_id(tokeniser_config.get("bos_token_id", 1))
self.writer.add_eos_token_id(tokeniser_config.get("eos_token_id", 2))
self.writer.add_unk_token_id(tokeniser_config.get("unk_token_id", 0))
self.writer.add_pad_token_id(tokeniser_config.get("pad_token_id", 0))
# Add BOS/EOS token addition flags if available
if "add_bos_token" in tokeniser_config:
self.writer.add_add_bos_token(tokeniser_config["add_bos_token"])
if "add_eos_token" in tokeniser_config:
self.writer.add_add_eos_token(tokeniser_config["add_eos_token"])
# Note: tokenizer_model is set by add_tokeniser_vocabulary based on actual tokenizer type
logger.info("Added tokeniser configuration")
def add_tokeniser_vocabulary(self, model_path: Path) -> None:
"""Add full tokeniser vocabulary to GGUF file.
Loads and embeds the complete tokeniser vocabulary including tokens,
merges, and scores to enable standalone model usage without external
tokeniser files. Supports BPE, Unigram, and WordPiece tokenizers.
"""
tokenizer_path = model_path / "tokenizer.json"
if not tokenizer_path.exists():
logger.warning("tokenizer.json not found, skipping vocabulary embedding")
return
try:
with Path(tokenizer_path).open(encoding="utf-8") as f:
tokenizer_data = json.load(f)
model_data = tokenizer_data.get("model", {})
model_type = model_data.get("type", "")
# Get pre-tokenizer information
pre_tokenizer = tokenizer_data.get("pre_tokenizer", {})
pre_tokenizer_type = self._get_pre_tokenizer_type(pre_tokenizer)
# Get added tokens
added_tokens = tokenizer_data.get("added_tokens", [])
if model_type == "BPE":
self._add_bpe_tokenizer(model_data, added_tokens, pre_tokenizer_type)
elif model_type == "Unigram":
self._add_unigram_tokenizer(model_data, added_tokens)
elif model_type == "WordPiece":
self._add_wordpiece_tokenizer(model_data, added_tokens)
else:
logger.warning(f"Unsupported tokenizer type: {model_type}")
# Try to add as generic tokenizer
self._add_generic_tokenizer(model_data, tokenizer_data)
except Exception as e:
logger.error(f"Failed to load tokeniser vocabulary: {e}")
logger.error(traceback.format_exc())
def _get_pre_tokenizer_type(self, pre_tokenizer: dict[str, Any]) -> str:
"""Determine pre-tokenizer type from configuration.
Returns:
Pre-tokenizer type.
"""
if not pre_tokenizer:
return "default"
# Check for various pre-tokenizer types
pre_type = pre_tokenizer.get("type", "")
if "ByteLevel" in str(pre_type):
return "llama3"
if "Metaspace" in str(pre_type):
return "default"
return "default"
def _add_bpe_tokenizer(
self,
model_data: dict[str, Any],
added_tokens: list[dict[str, Any]],
pre_tokenizer_type: str,
) -> None:
"""Add BPE tokenizer to GGUF file."""
vocab = model_data.get("vocab", {})
merges = model_data.get("merges", [])
# Set tokenizer model based on pre-tokenizer type
if pre_tokenizer_type == "llama3":
self.writer.add_tokenizer_model("gpt2")
self.writer.add_tokenizer_pre("llama3")
else:
self.writer.add_tokenizer_model("gpt2")
# Create token list with scores
tokens = []
scores = []
toktypes = []
# Add vocabulary tokens
for token_str, _token_id in sorted(vocab.items(), key=operator.itemgetter(1)):
tokens.append(token_str)
scores.append(0.0) # BPE doesn't use scores
# Determine token type
is_added = any(t.get("content") == token_str for t in added_tokens)
if is_added:
toktypes.append(gguf.TokenType.USER_DEFINED)
else:
toktypes.append(gguf.TokenType.NORMAL)
# Add to writer
self.writer.add_token_list(tokens)
self.writer.add_token_scores(scores)
self.writer.add_token_types(toktypes)
# Add merges
if merges:
self.writer.add_token_merges(merges)
logger.info(f"Added BPE tokenizer: {len(tokens)} tokens, {len(merges)} merges")
def _add_unigram_tokenizer(
self,
model_data: dict[str, Any],
added_tokens: list[dict[str, Any]],
) -> None:
"""Add Unigram tokenizer to GGUF file."""
vocab = model_data.get("vocab", [])
self.writer.add_tokenizer_model("unigram")
# Create token list with scores
tokens = []
scores = []
toktypes = []
# Add vocabulary tokens
for token_data in vocab:
if isinstance(token_data, list) and len(token_data) >= 2:
token_str, score = token_data[0], token_data[1]
else:
continue
tokens.append(token_str)
scores.append(float(score))
# Determine token type
is_added = any(t.get("content") == token_str for t in added_tokens)
if is_added:
toktypes.append(gguf.TokenType.USER_DEFINED)
else:
toktypes.append(gguf.TokenType.NORMAL)
# Add to writer
self.writer.add_token_list(tokens)
self.writer.add_token_scores(scores)
self.writer.add_token_types(toktypes)
logger.info(f"Added Unigram tokenizer: {len(tokens)} tokens")
def _add_wordpiece_tokenizer(
self,
model_data: dict[str, Any],
added_tokens: list[dict[str, Any]],
) -> None:
"""Add WordPiece tokenizer to GGUF file."""
vocab = model_data.get("vocab", {})
self.writer.add_tokenizer_model("bert")
# Create token list
tokens = []
scores = []
toktypes = []
# Add vocabulary tokens
for token_str, _token_id in sorted(vocab.items(), key=operator.itemgetter(1)):
tokens.append(token_str)
scores.append(0.0) # WordPiece doesn't use scores
# Determine token type
is_added = any(t.get("content") == token_str for t in added_tokens)
if is_added:
toktypes.append(gguf.TokenType.USER_DEFINED)
else:
toktypes.append(gguf.TokenType.NORMAL)
# Add to writer
self.writer.add_token_list(tokens)
self.writer.add_token_scores(scores)
self.writer.add_token_types(toktypes)
logger.info(f"Added WordPiece tokenizer: {len(tokens)} tokens")
def _add_generic_tokenizer(
self,
model_data: dict[str, Any],
tokenizer_data: dict[str, Any],
) -> None:
"""Add generic tokenizer as fallback."""
logger.warning("Using generic tokenizer fallback")
# Try to extract vocabulary from various possible locations
vocab = model_data.get("vocab", tokenizer_data.get("vocab", {}))
if not vocab:
logger.error("No vocabulary found in tokenizer")
return
self.writer.add_tokenizer_model("gpt2") # Default to GPT-2 style
# Create basic token list
tokens = []
scores = []
toktypes = []
if isinstance(vocab, dict):
# Dict-style vocab
for token_str, _token_id in sorted(vocab.items(), key=operator.itemgetter(1)):
tokens.append(token_str)
scores.append(0.0)
toktypes.append(gguf.TokenType.NORMAL)
elif isinstance(vocab, list):
# List-style vocab
for item in vocab:
if isinstance(item, str):
tokens.append(item)
scores.append(0.0)
toktypes.append(gguf.TokenType.NORMAL)
elif isinstance(item, list) and len(item) >= 1:
tokens.append(str(item[0]))
scores.append(float(item[1]) if len(item) > 1 else 0.0)
toktypes.append(gguf.TokenType.NORMAL)
if tokens:
self.writer.add_token_list(tokens)
self.writer.add_token_scores(scores)
self.writer.add_token_types(toktypes)
logger.info(f"Added generic tokenizer: {len(tokens)} tokens")
else:
logger.error("Failed to extract tokens from vocabulary")
def add_tensor(self, name: str, data: np.ndarray) -> None:
"""Add tensor to GGUF file.
Accepts a tensor name following GGUF naming conventions and its
corresponding numpy array data. The tensor is stored for writing
when the file is finalised.
"""
self.writer.add_tensor(name, data)
def write(self) -> None:
"""Finalise and write GGUF file to disk.
Writes header, key-value data, and tensors to the output file,
completing the GGUF creation process.
"""
logger.info(f"Writing GGUF file to {self.output_path}...")
self.writer.write_header_to_file()
self.writer.write_kv_data_to_file()
self.writer.write_tensors_to_file()
self.writer.close()
logger.info("✅ GGUF file written successfully")