llm-gguf-tools/helpers/services/filesystem.py
2025-08-07 18:29:12 +01:00

174 lines
5.7 KiB
Python

"""Filesystem operations service.
Provides unified filesystem operations including file discovery, size
calculation, and path management. Consolidates common filesystem patterns
used across quantisation and conversion workflows.
"""
from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import Any
from helpers.logger import logger
BYTES_PER_UNIT = 1024.0
class FilesystemService:
"""Handles filesystem operations with consistent error handling.
Provides methods for file discovery, size formatting, and JSON loading
with proper error handling and logging. Ensures consistent behaviour
across different tools and workflows.
"""
@staticmethod
def get_file_size(file_path: Path) -> str:
"""Get human-readable file size using system utilities.
Attempts to use `du -h` for human-readable output, falling back to
Python calculation if the system command fails. Provides consistent
size formatting across the toolset.
Returns:
Human-readable file size string (e.g., "1.5G", "750M").
"""
try:
result = subprocess.run(
["du", "-h", str(file_path)], capture_output=True, text=True, check=True
)
return result.stdout.split()[0]
except (subprocess.CalledProcessError, FileNotFoundError):
# Fallback to Python calculation
try:
size_bytes: float = float(file_path.stat().st_size)
for unit in ["B", "K", "M", "G", "T"]:
if size_bytes < BYTES_PER_UNIT:
return f"{size_bytes:.1f}{unit}"
size_bytes /= BYTES_PER_UNIT
except Exception:
return "Unknown"
else:
return f"{size_bytes:.1f}P"
@staticmethod
def load_json_config(config_path: Path) -> dict[str, Any]:
"""Load and parse JSON configuration file.
Provides consistent JSON loading with proper error handling and
encoding specification. Used for loading model configurations,
tokeniser settings, and other JSON-based metadata.
Returns:
Parsed JSON content as dictionary.
Raises:
FileNotFoundError: If config file doesn't exist.
"""
if not config_path.exists():
msg = f"Configuration file not found: {config_path}"
raise FileNotFoundError(msg)
with Path(config_path).open(encoding="utf-8") as f:
return json.load(f)
@staticmethod
def find_safetensor_files(model_path: Path) -> list[Path]:
"""Find all SafeTensor files in model directory using priority search.
Searches for tensor files in order of preference: single model.safetensors,
sharded model-*-of-*.safetensors files, then any *.safetensors files. This
approach handles both single-file and multi-shard model distributions whilst
ensuring predictable file ordering for conversion consistency.
Returns:
List of SafeTensor file paths in priority order.
Raises:
FileNotFoundError: If no SafeTensor files are found.
"""
# Check for single file
single_file = model_path / "model.safetensors"
if single_file.exists():
return [single_file]
# Check for sharded files
pattern = "model-*-of-*.safetensors"
sharded_files = sorted(model_path.glob(pattern))
if sharded_files:
return sharded_files
# Check for any safetensor files
any_files = sorted(model_path.glob("*.safetensors"))
if any_files:
return any_files
msg = f"No SafeTensor files found in {model_path}"
raise FileNotFoundError(msg)
@staticmethod
def find_gguf_files(model_path: Path, pattern: str | None = None) -> list[Path]:
"""Find GGUF files in directory, optionally filtered by pattern.
Searches for GGUF files with optional pattern matching. Prioritises
multi-part files (00001-of-*) over single files for proper handling
of large models split across multiple files.
Returns:
List of GGUF file paths, sorted with multi-part files first.
"""
if pattern:
gguf_files = list(model_path.glob(f"*{pattern}*.gguf"))
else:
gguf_files = list(model_path.glob("*.gguf"))
# Sort to prioritise 00001-of-* files
gguf_files.sort(
key=lambda x: (
"00001-of-" not in x.name, # False sorts before True
x.name,
)
)
return gguf_files
@staticmethod
def ensure_directory(path: Path) -> Path:
"""Ensure directory exists, creating if necessary.
Creates directory and all parent directories if they don't exist.
Returns the path for method chaining convenience.
Returns:
The directory path.
"""
path.mkdir(parents=True, exist_ok=True)
return path
@staticmethod
def cleanup_directory(path: Path, pattern: str = "*") -> int:
"""Remove files matching pattern from directory.
Safely removes files matching the specified glob pattern. Returns
count of files removed for logging purposes.
Returns:
Number of files removed.
"""
if not path.exists():
return 0
files_removed = 0
for file_path in path.glob(pattern):
if file_path.is_file():
try:
file_path.unlink()
files_removed += 1
except Exception as e:
logger.warning(f"Failed to remove {file_path}: {e}")
return files_removed