Debugging
All checks were successful
Build Vast.ai Ollama Benchmark Image / Build and Push (push) Successful in 6m4s

This commit is contained in:
Tom Foster 2025-07-28 21:15:37 +01:00
parent 86e9de9e75
commit 430dc059d4
5 changed files with 409 additions and 115 deletions

View file

@ -7,14 +7,11 @@ using pexpect for real-time output streaming and interactive session management.
from __future__ import annotations
import logging
import select
import time
from typing import TYPE_CHECKING
import paramiko
if TYPE_CHECKING:
from paramiko import ChannelFile
logger = logging.getLogger(__name__)
@ -80,11 +77,11 @@ class SSHInteractiveSession:
msg = f"Failed to connect: {e}"
raise RuntimeError(msg) from e
def execute_command_interactive(self, command: str, timeout: int = 600) -> None:
"""Execute a command with real-time output.
def _ensure_connected(self) -> None:
"""Ensure SSH client is connected and reconnect if necessary.
Raises:
RuntimeError: If the command fails.
RuntimeError: If the SSH session is not properly initialised.
"""
if not self.client:
self.connect()
@ -99,13 +96,8 @@ class SSHInteractiveSession:
self.close()
self.connect()
logger.debug("Executing command: %s", command)
_stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
output = stdout.read().decode("utf-8")
error_output = stderr.read().decode("utf-8")
exit_status = stdout.channel.recv_exit_status()
# Debug log sanitised SSH response
def _log_debug_response(self, output: str, error_output: str, exit_status: int) -> None:
"""Log debug information about command response."""
sanitised_output = self._clean_output(output)
sanitised_error = self._clean_output(error_output)
logger.debug(
@ -115,6 +107,37 @@ class SSHInteractiveSession:
sanitised_error[:500],
)
def _log_command_output(self, output: str, error_output: str) -> None:
"""Log the output from a command execution."""
if output.strip():
for line in output.strip().split("\n"):
if line.strip():
logger.info(line.strip())
if error_output.strip():
for line in error_output.strip().split("\n"):
if line.strip():
logger.warning(line.strip())
def _check_command_success(self, command: str, exit_status: int) -> None:
"""Check if command succeeded and raise error if not.
Raises:
RuntimeError: If the command fails.
"""
if exit_status != 0:
error_msg = f"Command '{command}' failed with exit code {exit_status}."
logger.error(error_msg)
raise RuntimeError(error_msg)
def _check_capture_command_success(
self, command: str, exit_status: int, output: str, error_output: str
) -> None:
"""Check if capture command succeeded and raise error if not.
Raises:
RuntimeError: If the command fails.
"""
if exit_status != 0:
error_msg = f"Command '{command}' failed with exit code {exit_status}."
logger.error(error_msg)
@ -122,8 +145,26 @@ class SSHInteractiveSession:
logger.error("Error Output:\n%s", error_output)
raise RuntimeError(error_msg)
# For interactive commands, we don't return output, just ensure it ran successfully.
# The original _execute_command would have logged it.
def execute_command_interactive(self, command: str, timeout: int = 600) -> None:
"""Execute a command with real-time output.
Raises:
RuntimeError: If the SSH session is not established or command fails.
"""
self._ensure_connected()
if not self.client:
msg = "SSH session is not properly initialised after connection attempt."
raise RuntimeError(msg)
logger.debug("Executing command: %s", command)
_stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
output = stdout.read().decode("utf-8")
error_output = stderr.read().decode("utf-8")
exit_status = stdout.channel.recv_exit_status()
self._log_debug_response(output, error_output, exit_status)
self._log_command_output(output, error_output)
self._check_command_success(command, exit_status)
def execute_command_capture(self, command: str, timeout: int = 30) -> str:
"""Execute a command and capture its output.
@ -134,19 +175,11 @@ class SSHInteractiveSession:
Raises:
RuntimeError: If the SSH session is not established or command fails.
"""
self._ensure_connected()
if not self.client:
self.connect()
if not self.client:
msg = "SSH session is not properly initialised."
msg = "SSH session is not properly initialised after connection attempt."
raise RuntimeError(msg)
# Check if connection is still alive
transport = self.client.get_transport()
if not transport or not transport.is_active():
logger.warning("SSH connection lost, reconnecting...")
self.close()
self.connect()
logger.debug("Capturing output for command: %s", command)
_stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
output = stdout.read().decode("utf-8")
@ -182,37 +215,131 @@ class SSHInteractiveSession:
Raises:
RuntimeError: If the SSH session is not established.
"""
self._ensure_connected()
if not self.client:
self.connect()
if not self.client:
msg = "SSH session is not properly initialised."
msg = "SSH session is not properly initialised after connection attempt."
raise RuntimeError(msg)
# Check if connection is still alive
transport = self.client.get_transport()
if not transport or not transport.is_active():
logger.warning("SSH connection lost, reconnecting...")
self.close()
self.connect()
logger.debug("Executing background command: %s", command)
# Execute command without waiting for output
self.client.exec_command(command)
# Add a small delay to allow the SSH client to process any immediate protocol messages
time.sleep(0.1)
# Return immediately without reading stdout/stderr or waiting for exit status
def _stream_output_chunk(self, stdout: ChannelFile, stderr: ChannelFile) -> None:
"""Process and log a chunk of streaming output."""
if stdout.channel.recv_ready():
chunk = stdout.read(1024).decode("utf-8", errors="replace")
if chunk:
# Log each chunk for real-time progress
cleaned_chunk = self._clean_output(chunk)
logger.info(cleaned_chunk.rstrip())
if stderr.channel.recv_ready():
chunk = stderr.read(1024).decode("utf-8", errors="replace")
if chunk:
cleaned_chunk = self._clean_output(chunk)
logger.warning(cleaned_chunk.rstrip())
def _log_output_line(self, line: str, is_stderr: bool) -> None:
"""Log a single output line appropriately."""
if is_stderr:
logger.warning(line.rstrip())
else:
logger.info(line.rstrip())
def _process_chunk_data(
self, chunk: str, line_buffer: str, last_line: str, is_stderr: bool
) -> tuple[str, str]:
"""Process a chunk of data and return updated buffer and last line.
Returns:
A tuple containing the updated line_buffer and last_line.
"""
for char in chunk:
if char == "\r":
# Carriage return: clear buffer, next output will overwrite current line
if line_buffer.strip() and line_buffer.strip() != last_line:
self._log_output_line(line_buffer, is_stderr)
last_line = line_buffer.strip()
line_buffer = ""
elif char == "\n":
# Newline: process the accumulated line
if line_buffer.strip() and line_buffer.strip() != last_line:
self._log_output_line(line_buffer, is_stderr)
last_line = line_buffer.strip()
line_buffer = ""
else:
line_buffer += char
return line_buffer, last_line
def _process_stream_output(
self,
channel: paramiko.Channel,
is_stderr: bool,
last_stdout_line: str,
last_stderr_line: str,
line_buffer: str,
) -> tuple[str, str, str]:
"""Helper to process output from a channel, handling carriage returns and duplicates.
Returns:
A tuple containing the updated last_stdout_line, last_stderr_line, and line_buffer.
"""
if is_stderr:
chunk = channel.recv_stderr(1024).decode("utf-8", errors="replace")
last_line = last_stderr_line
else:
chunk = channel.recv(1024).decode("utf-8", errors="replace")
last_line = last_stdout_line
if chunk:
line_buffer, last_line = self._process_chunk_data(
chunk, line_buffer, last_line, is_stderr
)
if is_stderr:
return last_stdout_line, last_line, line_buffer
return last_line, last_stderr_line, line_buffer
def _process_channel_streams(self, channel: paramiko.Channel) -> tuple[str, str, str, str]:
"""Process stdout and stderr streams from channel.
Returns:
Tuple of (last_stdout_line, last_stderr_line, stdout_buffer, stderr_buffer)
"""
last_stdout_line = ""
last_stderr_line = ""
stdout_buffer = ""
stderr_buffer = ""
while (
not channel.exit_status_ready() or channel.recv_ready() or channel.recv_stderr_ready()
):
rlist, _, _ = select.select([channel], [], [], 0.1)
if rlist:
if channel.recv_ready():
last_stdout_line, last_stderr_line, stdout_buffer = self._process_stream_output(
channel, False, last_stdout_line, last_stderr_line, stdout_buffer
)
if channel.recv_stderr_ready():
last_stdout_line, last_stderr_line, stderr_buffer = self._process_stream_output(
channel, True, last_stdout_line, last_stderr_line, stderr_buffer
)
elif not channel.exit_status_ready():
time.sleep(0.1)
return last_stdout_line, last_stderr_line, stdout_buffer, stderr_buffer
def _flush_remaining_output(
self, channel: paramiko.Channel, buffers: tuple[str, str, str, str]
) -> None:
"""Flush any remaining output after command completion."""
last_stdout_line, last_stderr_line, stdout_buffer, stderr_buffer = buffers
# Read remaining output
while channel.recv_ready() or channel.recv_stderr_ready():
if channel.recv_ready():
last_stdout_line, last_stderr_line, stdout_buffer = self._process_stream_output(
channel, False, last_stdout_line, last_stderr_line, stdout_buffer
)
if channel.recv_stderr_ready():
last_stdout_line, last_stderr_line, stderr_buffer = self._process_stream_output(
channel, True, last_stdout_line, last_stderr_line, stderr_buffer
)
# Process any remaining buffer content
if stdout_buffer.strip() and stdout_buffer.strip() != last_stdout_line:
logger.info(stdout_buffer.rstrip())
if stderr_buffer.strip() and stderr_buffer.strip() != last_stderr_line:
logger.warning(stderr_buffer.rstrip())
def execute_command_streaming(self, command: str, timeout: int = 600) -> None:
"""Execute a command with real-time output streaming.
@ -223,43 +350,28 @@ class SSHInteractiveSession:
Raises:
RuntimeError: If the SSH session is not established or command fails.
"""
self._ensure_connected()
if not self.client:
self.connect()
if not self.client:
msg = "SSH session is not properly initialised."
msg = "SSH session is not properly initialised after connection attempt."
raise RuntimeError(msg)
# Check if connection is still alive
transport = self.client.get_transport()
if not transport or not transport.is_active():
logger.warning("SSH connection lost, reconnecting...")
self.close()
self.connect()
logger.debug("Executing streaming command: %s", command)
_stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
# Stream output in real-time
while not stdout.channel.exit_status_ready():
self._stream_output_chunk(stdout, stderr)
# Small delay to prevent excessive polling
time.sleep(0.1)
# Make channels non-blocking
stdout.channel.setblocking(0)
stderr.channel.setblocking(0)
# Get final exit status
exit_status = stdout.channel.recv_exit_status()
channel = stdout.channel
# Get any remaining output
final_stdout = stdout.read().decode("utf-8", errors="replace")
final_stderr = stderr.read().decode("utf-8", errors="replace")
# Process streams and get final buffers
buffers = self._process_channel_streams(channel)
if final_stdout:
logger.info(self._clean_output(final_stdout).rstrip())
if final_stderr:
logger.warning(self._clean_output(final_stderr).rstrip())
# Flush any remaining output
self._flush_remaining_output(channel, buffers)
if exit_status != 0:
error_msg = f"Command '{command}' failed with exit code {exit_status}."
raise RuntimeError(error_msg)
exit_status = channel.recv_exit_status()
self._check_command_success(command, exit_status)
def close(self) -> None:
"""Close the SSH and SFTP sessions."""

View file

@ -10,6 +10,8 @@ from typing import Any
import requests
from helpers.logger import logger
class OllamaClient:
"""Simple Ollama API client."""
@ -19,21 +21,50 @@ class OllamaClient:
self.base_url = base_url
self.session = requests.Session()
def generate(self, model: str, prompt: str, context_length: int) -> dict[str, Any]:
def generate(
self, model: str, prompt: str, context_length: int, timeout: int = 60
) -> dict[str, Any]:
"""Generate text using Ollama API.
Returns:
Ollama response as JSON.
Raises:
requests.Timeout: If request times out.
"""
response = self.session.post(
f"{self.base_url}/api/generate",
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_ctx": context_length},
},
timeout=300,
payload = {
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_ctx": context_length},
}
url = f"{self.base_url}/api/generate"
logger.debug("📤 Sending POST request to %s", url)
logger.debug(
"📊 Payload: model=%s, prompt_length=%d, num_ctx=%d, timeout=%ds",
model,
len(prompt),
context_length,
timeout,
)
response.raise_for_status()
return response.json() or {}
try:
response = self.session.post(url, json=payload, timeout=timeout)
logger.debug("📥 Response status: %d", response.status_code)
response.raise_for_status()
result = response.json() or {}
logger.debug(
"📋 Response stats: eval_count=%d, total_duration=%dms",
result.get("eval_count", 0),
result.get("total_duration", 0) // 1_000_000,
)
except requests.Timeout:
logger.error("⏱️ Ollama request timed out after %d seconds", timeout)
logger.error(
" Model: %s, Context: %d, Prompt length: %d", model, context_length, len(prompt)
)
raise
else:
return result

View file

@ -44,8 +44,6 @@ class OllamaManager:
Returns:
Ollama version string for results folder naming.
"""
logger.info("🧠 Configuring Ollama service...")
# Start Ollama service with environment variables in background
logger.info("⚡ Starting Ollama server in background")
start_command = (
@ -62,7 +60,6 @@ class OllamaManager:
instance.execute_command_background(start_command)
# Wait for Ollama server to be ready and get version
logger.info("⏰ Waiting for server readiness")
version = self._get_ollama_version(instance)
# Pull the model with streaming output for real-time progress monitoring
@ -88,10 +85,14 @@ class OllamaManager:
actual_response = response_data.get("response", "No response field found")
logger.debug("Test generation response: %r", actual_response)
except (json.JSONDecodeError, AttributeError):
logger.debug(
"Test generation response (raw): %s",
response[:100] + "..." if len(response) > 100 else response,
# Truncate response for debug logging
max_debug_length = 100
truncated_response = (
response[:max_debug_length] + "..."
if len(response) > max_debug_length
else response
)
logger.debug("Test generation response (raw): %s", truncated_response)
logger.info("🎉 Test generation complete - model should now be in VRAM")
logger.info("🚀 Ollama ready for testing with model preloaded")
@ -109,7 +110,7 @@ class OllamaManager:
Raises:
RuntimeError: If the server doesn't become ready within the timeout.
"""
logger.info("⏳ Waiting for Ollama server to start...")
logger.info("⏳ Waiting for Ollama server...")
max_attempts = 12 # 60 seconds total (5 second intervals)
debug_threshold = 2 # After 10 seconds (2 attempts), check logs

View file

@ -105,6 +105,7 @@ class BenchmarkRunner:
model=self.config.model,
prompt="Hello",
context_length=self.config.context_start,
timeout=30, # Short timeout for warmup
)
warmup_tokens = warmup_response.get("eval_count", 0)
logger.info("✅ Ollama connection verified - generated %s tokens", warmup_tokens)
@ -116,6 +117,115 @@ class BenchmarkRunner:
# Save system info
self.results_manager.save_system_info()
def _diagnose_ollama_failure(self) -> None:
"""Diagnose Ollama failure with comprehensive system checks."""
logger.info("🔍 Diagnosing Ollama status...")
try:
# Check if Ollama process is still running
ps_result = subprocess.run(
["ps", "aux"], check=False, capture_output=True, text=True, timeout=5
)
ollama_processes = [
line for line in ps_result.stdout.split("\n") if "ollama" in line.lower()
]
if ollama_processes:
logger.info("🔄 Ollama processes found:")
for proc in ollama_processes:
logger.info(" %s", proc.strip())
else:
logger.warning("⚠️ No Ollama processes found!")
# Try to check Ollama API status
logger.info("🌐 Checking Ollama API status...")
status_result = subprocess.run(
["curl", "-s", "http://localhost:11434/api/tags"],
check=False,
capture_output=True,
text=True,
timeout=5,
)
if status_result.returncode == 0:
logger.info("✅ Ollama API responded: %s", status_result.stdout[:200])
else:
logger.error("❌ Ollama API not responding")
# Check system resource usage
self._log_system_resources()
# Try to get Ollama logs from multiple sources
self._check_ollama_logs()
# Check GPU status
self._check_gpu_status()
except Exception as diag_e:
logger.warning("⚠️ Could not run diagnostics: %s", diag_e)
def _log_system_resources(self) -> None:
"""Log current system resource usage."""
logger.info("💾 System resources:")
free_result = subprocess.run(
["free", "-h"], check=False, capture_output=True, text=True, timeout=5
)
if free_result.returncode == 0:
for line in free_result.stdout.strip().split("\n"):
logger.info(" %s", line)
def _check_ollama_logs(self) -> None:
"""Check Ollama logs from multiple sources."""
logger.info("📋 Searching for Ollama logs...")
# Check systemd logs
log_result = subprocess.run(
["journalctl", "-u", "ollama", "-n", "50", "--no-pager"],
check=False,
capture_output=True,
text=True,
timeout=5,
)
if log_result.returncode == 0 and log_result.stdout.strip():
logger.info("📜 Systemd Ollama logs:")
for line in log_result.stdout.strip().split("\n")[-10:]:
logger.info(" %s", line)
else:
logger.info("📜 No systemd logs found")
# Check for Ollama logs in common locations
log_locations = [
Path("/var/log/ollama.log"),
Path("/root/.ollama/logs/server.log"),
Path("/home/ollama/.ollama/logs/server.log"),
]
for log_path in log_locations:
try:
with log_path.open(encoding="utf-8") as f:
lines = f.readlines()
if lines:
logger.info("📜 Ollama logs from %s:", log_path)
for line in lines[-5:]:
logger.info(" %s", line.strip())
break
except (FileNotFoundError, PermissionError):
continue
def _check_gpu_status(self) -> None:
"""Check current GPU status and utilization."""
nvidia_result = subprocess.run(
[
"nvidia-smi",
"--query-gpu=utilization.gpu,memory.used,memory.total",
"--format=csv,noheader,nounits",
],
check=False,
capture_output=True,
text=True,
timeout=5,
)
if nvidia_result.returncode == 0:
logger.info("🎮 GPU status: %s", nvidia_result.stdout.strip())
def run_single_test(self, scenario: str, context_length: int, run_number: int) -> TestResult:
"""Run a single benchmark test with comprehensive monitoring.
@ -141,6 +251,9 @@ class BenchmarkRunner:
prompt = PromptGenerator.generate(scenario, context_length, self.config)
prompt_file = self.results_dir / f"prompt_{test_id}.txt"
prompt_file.write_text(prompt)
logger.debug(
"📝 Generated prompt with %d characters for %s scenario", len(prompt), scenario
)
# Setup GPU monitoring
monitor_file = self.results_dir / f"gpu_monitor_{test_id}.csv"
@ -148,13 +261,32 @@ class BenchmarkRunner:
# Run test with monitoring
start_time = time.time()
logger.debug(
"🚀 Sending request to Ollama API at %s", datetime.now(tz=UTC).strftime("%H:%M:%S.%f")
)
logger.debug(
"📡 Model: %s, Prompt length: %d chars, Context length: %d",
self.config.model,
len(prompt),
context_length,
)
with gpu_monitor.monitor():
ollama_response = self.ollama_client.generate(
model=self.config.model, prompt=prompt, context_length=context_length
)
try:
ollama_response = self.ollama_client.generate(
model=self.config.model, prompt=prompt, context_length=context_length
)
except Exception as e:
logger.error("❌ Ollama generation failed: %s", str(e))
self._diagnose_ollama_failure()
raise
end_time = time.time()
logger.debug(
"✅ Received response from Ollama at %s (took %.2fs)",
datetime.now(tz=UTC).strftime("%H:%M:%S.%f"),
end_time - start_time,
)
# Save full response
response_file = self.results_dir / f"ollama_response_{test_id}.json"

View file

@ -49,26 +49,44 @@ class BenchmarkExecutor:
Args:
instance: Target instance for benchmark execution.
"""
benchmark_command = (
# Export environment variables individually for clarity and debugging
# Commands to set up environment variables and change directory
setup_commands = (
f"set -e && " # Exit on any error
f"echo '🔍 Starting benchmark command execution...' && "
f"cd /app && "
f"TEST_ITERATIONS={self.config.test_iterations} "
f"CONTEXT_START={self.config.context_start} "
f"CONTEXT_END={self.config.context_end} "
f"CONTEXT_MULTIPLIER={self.config.context_multiplier} "
f"OLLAMA_MODEL={self.config.ollama_model} "
f"OLLAMA_FLASH_ATTENTION={self.config.ollama_flash_attention} "
f"OLLAMA_KEEP_ALIVE={self.config.ollama_keep_alive} "
f"OLLAMA_KV_CACHE_TYPE={self.config.ollama_kv_cache_type} "
f"OLLAMA_MAX_LOADED_MODELS={self.config.ollama_max_loaded_models} "
f"OLLAMA_NUM_GPU={self.config.ollama_num_gpu} "
f"OLLAMA_NUM_PARALLEL={self.config.ollama_num_parallel} "
f"OUTPUT_PATH={self.config.remote_results_path} "
f"uv run scripts/llm_benchmark.py"
f"echo '📁 Changed to /app directory' && "
f"export PYTHONUNBUFFERED=1 && "
f"export TEST_ITERATIONS={self.config.test_iterations} && "
f"export CONTEXT_START={self.config.context_start} && "
f"export CONTEXT_END={self.config.context_end} && "
f"export CONTEXT_MULTIPLIER={self.config.context_multiplier} && "
f"export OLLAMA_MODEL='{self.config.ollama_model}' && "
f"export OLLAMA_FLASH_ATTENTION={self.config.ollama_flash_attention} && "
f"export OLLAMA_KEEP_ALIVE={self.config.ollama_keep_alive} && "
f"export OLLAMA_KV_CACHE_TYPE={self.config.ollama_kv_cache_type} && "
f"export OLLAMA_MAX_LOADED_MODELS={self.config.ollama_max_loaded_models} && "
f"export OLLAMA_NUM_GPU={self.config.ollama_num_gpu} && "
f"export OLLAMA_NUM_PARALLEL={self.config.ollama_num_parallel} && "
f"export OUTPUT_PATH={self.config.remote_results_path} && "
f"echo '⚙️ Environment variables exported'"
)
logger.info("⚡ Starting benchmark execution...")
logger.debug("Benchmark command: %s", benchmark_command)
instance.execute_command_streaming(benchmark_command, timeout=1800) # 30 minutes
# Command to run the benchmark script
benchmark_script_command = (
"set -e && " # Exit on any error
"cd /app && "
"echo '🚀 Starting benchmark script...' && "
"uv run scripts/llm_benchmark.py"
)
logger.info("⚡ Starting setup commands execution...")
logger.debug("Setup command: %s", setup_commands)
instance.execute_command_streaming(setup_commands, timeout=60) # Short timeout for setup
logger.info("⚡ Starting benchmark script execution...")
logger.debug("Benchmark script command: %s", benchmark_script_command)
instance.execute_command_streaming(benchmark_script_command, timeout=1800) # 30 minutes
class RemoteBenchmarkOrchestrator: