"""HuggingFace operations service. Handles all interactions with HuggingFace including model downloads, uploads, README generation, and repository management. Uses UK English spelling conventions throughout. """ from __future__ import annotations import re import shutil import subprocess import tempfile from pathlib import Path from typing import TYPE_CHECKING from helpers.config.quantisation_configs import QUANTISATION_CONFIGS from helpers.logger import logger from helpers.models.quantisation import QuantisationType if TYPE_CHECKING: from helpers.models.quantisation import ModelSource, QuantisationResult # Constants for file size formatting GIBIBYTE = 1024**3 class HuggingFaceService: """Manages HuggingFace repository operations. Provides methods for downloading models, uploading files, and managing repositories. Handles authentication, error recovery, and progress tracking for robust interaction with HuggingFace services. """ @staticmethod def get_username() -> str: """Get authenticated HuggingFace username. Retrieves the current user's HuggingFace username using the CLI. Requires prior authentication via `huggingface-cli login`. Returns: HuggingFace username. Raises: RuntimeError: If not authenticated or CLI not available. """ try: result = subprocess.run( ["huggingface-cli", "whoami"], capture_output=True, text=True, check=True, ) return result.stdout.strip() except (subprocess.CalledProcessError, FileNotFoundError) as err: msg = "Please log in to HuggingFace first: huggingface-cli login" raise RuntimeError(msg) from err @staticmethod def download_model( model_name: str, output_dir: Path, include_pattern: str | None = None ) -> None: """Download model from HuggingFace. Downloads a complete model or specific files matching a pattern. Creates the output directory if it doesn't exist. Supports filtered downloads for efficient bandwidth usage when only certain files are needed. """ logger.info(f"Downloading {model_name} to {output_dir}") cmd = [ "huggingface-cli", "download", model_name, "--local-dir", str(output_dir), ] if include_pattern: cmd.extend(["--include", include_pattern]) subprocess.run(cmd, check=True, capture_output=True, text=True) logger.info("Download complete") @staticmethod def upload_file( repo_id: str, local_path: Path, repo_path: str | None = None, create_repo: bool = False, ) -> None: """Upload a file to HuggingFace repository. Uploads a single file to the specified repository path. Can create the repository if it doesn't exist. Uses git directly when possible to avoid automatic PR creation. Raises: CalledProcessError: If upload fails. """ repo_path = repo_path or local_path.name logger.info(f"Uploading {local_path.name} to {repo_id}/{repo_path}") # Try git-based upload first to avoid PR creation if HuggingFaceService._try_git_upload( repo_id, local_path, repo_path, create_repo=create_repo ): logger.info(f"Uploaded {repo_path} via git") return # Fallback to huggingface-cli logger.info("Git upload failed, trying huggingface-cli...") cmd = [ "huggingface-cli", "upload", repo_id, str(local_path), repo_path, "--revision", "main", # Explicitly push to main branch "--commit-message", f"Add {repo_path}", ] if create_repo: cmd.append("--create") try: subprocess.run(cmd, check=True, capture_output=True) logger.info(f"Uploaded {repo_path}") except subprocess.CalledProcessError: if create_repo: # Repository might already exist, retry without --create cmd = cmd[:-1] # Remove --create flag subprocess.run(cmd, check=True, capture_output=True, text=True) logger.info(f"Updated {repo_path}") else: raise @staticmethod def _try_git_upload( repo_id: str, local_path: Path, repo_path: str, *, create_repo: bool = False, ) -> bool: """Try to upload file using git directly to avoid PR creation. Returns: bool: True if upload successful, False if should fallback to CLI. """ try: with tempfile.TemporaryDirectory() as temp_dir: temp_path = Path(temp_dir) repo_url = f"https://huggingface.co/{repo_id}" # Clone repository logger.info(f"Cloning {repo_url}...") result = subprocess.run( ["git", "clone", repo_url, str(temp_path / "repo")], check=False, capture_output=True, text=True, ) if result.returncode != 0: if create_repo: # Repository doesn't exist, let huggingface-cli handle creation return False logger.warning(f"Clone failed: {result.stderr}") return False repo_dir = temp_path / "repo" target_file = repo_dir / repo_path # Ensure target directory exists target_file.parent.mkdir(parents=True, exist_ok=True) # Copy file shutil.copy2(local_path, target_file) # Check if there are any changes status_result = subprocess.run( ["git", "status", "--porcelain"], cwd=repo_dir, capture_output=True, text=True, check=True, ) if not status_result.stdout.strip(): logger.info(f"No changes detected for {repo_path}, file already up-to-date") return True # File is already up-to-date, no need to push # Git add, commit, push subprocess.run( ["git", "add", repo_path], cwd=repo_dir, check=True, capture_output=True, text=True, ) subprocess.run( ["git", "commit", "-m", f"Update {repo_path}"], cwd=repo_dir, check=True, capture_output=True, text=True, ) subprocess.run( ["git", "push"], cwd=repo_dir, check=True, capture_output=True, text=True, ) return True except subprocess.CalledProcessError as e: logger.warning(f"Git upload failed: {e}") return False except Exception as e: logger.warning(f"Git upload error: {e}") return False class ReadmeGenerator: """Generates README files for quantised models. Creates comprehensive README documentation including model cards, quantisation details, and status tracking. Supports both initial planning documentation and final result summaries. """ def generate( self, model_source: ModelSource, results: dict[QuantisationType, QuantisationResult], models_dir: Path, output_repo: str | None = None, ) -> Path: """Generate README file for quantised model repository. Creates a comprehensive README with frontmatter, quantisation table, and original model information. Handles status tracking for planned, processing, and completed quantisations. Returns: Path to generated README file. """ logger.info("Creating model card...") model_dir = models_dir / model_source.model_name readme_path = model_dir / "README.md" # Get original README content original_content = self._get_original_readme(model_source, model_dir) # Generate new README readme_content = self._generate_readme_content( model_source, results, original_content, output_repo ) readme_path.write_text(readme_content) return readme_path def _get_original_readme(self, model_source: ModelSource, model_dir: Path) -> dict[str, str]: """Extract original README and metadata. Downloads or reads the original model's README for inclusion in the quantised model documentation. Parses YAML frontmatter if present. Returns: Dictionary with readme content, licence, tags, and frontmatter. """ content = {"readme": "", "licence": "apache-2.0", "tags": "", "frontmatter": ""} # Check for preserved original README first original_readme_path = model_dir / "README.original.md" readme_path = model_dir / "README.md" if original_readme_path.exists(): # Use the preserved original content["readme"] = original_readme_path.read_text(encoding="utf-8") logger.info(f"Found preserved original README ({len(content['readme'])} characters)") elif readme_path.exists(): # First time - preserve the original and use it readme_content = readme_path.read_text(encoding="utf-8") # Check if this is already our generated README if ( f"{model_source.original_author}-{model_source.model_name}-GGUF" not in readme_content ): # This is the original - preserve it original_readme_path.write_text(readme_content, encoding="utf-8") content["readme"] = readme_content readme_len = len(content["readme"]) logger.info( f"Preserved original README as README.original.md ({readme_len} characters)" ) else: # This is our generated README, need to download the original logger.info("Found generated README, downloading original from source") content = self._download_readme(model_source) # Save the downloaded original for future use if content["readme"]: original_readme_path.write_text(content["readme"], encoding="utf-8") logger.info("Preserved downloaded original README as README.original.md") else: # No local README - download from source content = self._download_readme(model_source) # Save the downloaded original for future use if content["readme"]: original_readme_path.write_text(content["readme"], encoding="utf-8") logger.info("Preserved downloaded original README as README.original.md") # Parse frontmatter if present if content["readme"].startswith("---\n"): content = self._parse_frontmatter(content["readme"]) return content def _download_readme(self, model_source: ModelSource) -> dict[str, str]: """Download README from HuggingFace repository. Attempts to download just the README.md file from the source repository for efficient documentation extraction. Returns: Dictionary with readme content and default metadata. """ content = {"readme": "", "licence": "apache-2.0", "tags": "", "frontmatter": ""} with tempfile.TemporaryDirectory() as temp_dir: try: logger.info(f"Downloading README from {model_source.source_model}...") subprocess.run( [ "huggingface-cli", "download", model_source.source_model, "--include", "README.md", "--local-dir", temp_dir, ], check=True, capture_output=True, ) readme_path = Path(temp_dir) / "README.md" if readme_path.exists(): content["readme"] = readme_path.read_text(encoding="utf-8") logger.info(f"Downloaded README ({len(content['readme'])} characters)") except subprocess.CalledProcessError as e: logger.warning(f"Failed to download README: {e}") return content def _parse_frontmatter(self, readme_text: str) -> dict[str, str]: """Parse YAML frontmatter from README. Extracts metadata from YAML frontmatter including licence, tags, and other model card fields. Returns: Dictionary with separated content and metadata. """ lines = readme_text.split("\n") if lines[0] != "---": return { "readme": readme_text, "licence": "apache-2.0", "tags": "", "frontmatter": "", } frontmatter_end = -1 for i, line in enumerate(lines[1:], 1): if line == "---": frontmatter_end = i break if frontmatter_end == -1: return { "readme": readme_text, "licence": "apache-2.0", "tags": "", "frontmatter": "", } frontmatter = "\n".join(lines[1:frontmatter_end]) content = "\n".join(lines[frontmatter_end + 1 :]) # Extract licence licence_match = re.search(r"^license:\s*(.+)$", frontmatter, re.MULTILINE) licence_val = licence_match.group(1).strip().strip('"') if licence_match else "apache-2.0" # Extract tags tags = [] in_tags = False for line in frontmatter.split("\n"): if line.startswith("tags:"): in_tags = True continue if in_tags: if line.startswith("- "): tags.append(line[2:].strip()) elif line and not line.startswith(" "): break return { "readme": content, "licence": licence_val, "tags": ",".join(tags), "frontmatter": frontmatter, } def _generate_readme_content( self, model_source: ModelSource, results: dict[QuantisationType, QuantisationResult], original_content: dict[str, str], output_repo: str | None = None, ) -> str: """Generate complete README content with quantisation details. Creates the full README including YAML frontmatter, quantisation status table, and original model information. Returns: Complete README markdown content. """ # Build tags our_tags = [ "quantised", "gguf", "q3_k_m", "q3_k_l", "q3_k_xl", "q4_k_m", "q4_k_l", "q5_k_m", "q5_k_l", "q6_k", "q6_k_l", "q8_0", "bartowski-method", ] original_tags = original_content["tags"].split(",") if original_content["tags"] else [] all_tags = sorted(set(our_tags + original_tags)) # Build frontmatter frontmatter = f"""--- license: {original_content["licence"]} library_name: gguf base_model: {model_source.source_model} tags: """ for tag in all_tags: if tag.strip(): frontmatter += f"- {tag.strip()}\n" frontmatter += "---\n\n" # Build main content hf_url = f"https://huggingface.co/{model_source.source_model}" content = f"""# {model_source.original_author}-{model_source.model_name}-GGUF GGUF quantisations of [{model_source.source_model}]({hf_url}) using [Bartowski](https://huggingface.co/bartowski)'s method. Created with [llm-gguf-tools](https://git.tomfos.tr/tom/llm-gguf-tools) which replicates Bartowski's quantisation profiles. | Variant | Configuration | File Size | Status | |---|---|---|---| """ # Add results table - group by layer config patterns supported_types = [ QuantisationType.Q3_K_M, QuantisationType.Q3_K_L, QuantisationType.Q3_K_XL, QuantisationType.Q4_K_M, QuantisationType.Q4_K_L, QuantisationType.Q5_K_M, QuantisationType.Q5_K_L, QuantisationType.Q6_K, QuantisationType.Q6_K_L, QuantisationType.Q8_0, ] for quant_type in supported_types: result = results.get(quant_type) if not result: result = type("Result", (), {"status": "planned", "success": False})() config = QUANTISATION_CONFIGS.get(quant_type) file_size = self._format_file_size(result) status = self._format_status(result, model_source, quant_type, output_repo) # Get configuration description from the config itself config_desc = config.get_compact_config(QUANTISATION_CONFIGS) if config else f"{quant_type} all layers" content += f"| **{quant_type.value}** | {config_desc} | {file_size} | {status} |\n" content += """ **Key:** `E` = Embeddings, `O` = Output, `A` = Attention, `F` = FFN See [Bartowski Analysis](https://git.tomfos.tr/tom/llm-gguf-tools/src/branch/main/docs/bartowski_analysis.md) for detailed quantisation strategies and [Documentation](https://git.tomfos.tr/tom/llm-gguf-tools/src/branch/main/docs/) for more on the tools and methods I use. """ # Add original content if original_content["readme"]: content += "## Original Model Card\n\n---\n\n" + original_content["readme"] else: content += f"## Original Model\n\nQuantisation of [{model_source.source_model}](https://huggingface.co/{model_source.source_model})." return frontmatter + content def _format_file_size(self, result: QuantisationResult) -> str: """Format file size for README table. Returns: Formatted file size string or dash if not available. """ if hasattr(result, "file_size") and result.file_size: return result.file_size if hasattr(result, "success") and result.success and hasattr(result, "file_path"): # Try to get file size from path if available try: if result.file_path and Path(result.file_path).exists(): size_bytes = Path(result.file_path).stat().st_size size_gb = size_bytes / GIBIBYTE return f"{size_gb:.1f}GB" except Exception: pass return "-" def _format_status( self, result: QuantisationResult, model_source: ModelSource, quant_type: QuantisationType, output_repo: str | None, ) -> str: """Format status indicator for README table. Creates appropriate status indicator based on quantisation state including progress indicators, file sizes, and download links. Returns: Formatted status string for table cell. """ status_map = { "planned": "⏳ Queued", "processing": "🔄 Processing...", "uploading": "⬆️ Uploading...", "failed": "❌ Failed", } if hasattr(result, "status") and result.status in status_map: base_status = status_map[result.status] if result.status == "uploading" and hasattr(result, "file_size") and result.file_size: return f"{base_status} ({result.file_size})" if result.status == "completed" or (hasattr(result, "success") and result.success): return self._format_success_status(result, model_source, quant_type, output_repo) return base_status # Legacy support if hasattr(result, "success") and result.success: return self._format_success_status(result, model_source, quant_type, output_repo) return "❌ Failed" def _format_success_status( self, result: QuantisationResult, model_source: ModelSource, quant_type: QuantisationType, output_repo: str | None, ) -> str: """Format successful quantisation status with download link. Creates a download link if repository information is available, otherwise shows file size. Returns: Formatted success status string. """ if not output_repo: return ( f"✅ {result.file_size}" if hasattr(result, "file_size") and result.file_size else "✅ Available" ) filename = ( f"{model_source.original_author}-{model_source.model_name}-{quant_type.value}.gguf" ) url = f"https://huggingface.co/{output_repo}?show_file_info={filename}" if hasattr(result, "file_size") and result.file_size: return f"[✅ {result.file_size}]({url})" return f"[✅ Available]({url})"