|
|
@@ -0,0 +1,289 @@
|
|
|
+#!/usr/bin/env python3
|
|
|
+
|
|
|
+import json
|
|
|
+import base64
|
|
|
+import time
|
|
|
+import urllib.request
|
|
|
+import os
|
|
|
+import sys
|
|
|
+import subprocess
|
|
|
+from pathlib import Path
|
|
|
+from typing import List, Dict, Any, Optional
|
|
|
+
|
|
|
+from rich.console import Console
|
|
|
+from rich.panel import Panel
|
|
|
+from rich.live import Live
|
|
|
+from rich.table import Table
|
|
|
+from rich.progress import Progress, SpinnerColumn, TextColumn
|
|
|
+
|
|
|
+# --- EMOJIS ---
|
|
|
+EMOJI_CONFIG = "⚙️"
|
|
|
+EMOJI_IMAGE = "🖼️"
|
|
|
+EMOJI_PROMPT = "💬"
|
|
|
+EMOJI_START = "🚀"
|
|
|
+EMOJI_EXIT = "🚪"
|
|
|
+EMOJI_SUCCESS = "✅"
|
|
|
+EMOJI_FAIL = "❌"
|
|
|
+EMOJI_SKIP = "⏭️"
|
|
|
+EMOJI_LOG = "📝"
|
|
|
+
|
|
|
+# --- CONFIGURATION ---
|
|
|
+CONFIG_FILE = "config.json"
|
|
|
+DEFAULT_CONFIG = {
|
|
|
+ "image_source": "directory", # 'directory' or 'specific_files'
|
|
|
+ "image_dir": ".",
|
|
|
+ "specific_files": [],
|
|
|
+ "image_ext": ".jpg",
|
|
|
+ "model": "moondream",
|
|
|
+ "api_url": "http://localhost:11434/api/generate",
|
|
|
+ "prompt": "Describe this image in a single, descriptive sentence.",
|
|
|
+}
|
|
|
+
|
|
|
+console = Console()
|
|
|
+
|
|
|
+
|
|
|
+def run_gum_command(command: List[str]) -> Optional[str]:
|
|
|
+ """Runs a gum command and returns its stripped output."""
|
|
|
+ try:
|
|
|
+ result = subprocess.run(
|
|
|
+ ["gum"] + command,
|
|
|
+ capture_output=True,
|
|
|
+ text=True,
|
|
|
+ check=True,
|
|
|
+ )
|
|
|
+ return result.stdout.strip()
|
|
|
+ except (subprocess.CalledProcessError, FileNotFoundError) as e:
|
|
|
+ console.print(
|
|
|
+ f"[bold red]Error running 'gum'. Is it installed and in your PATH? ({e})[/bold red]"
|
|
|
+ )
|
|
|
+ return None
|
|
|
+
|
|
|
+
|
|
|
+def load_config() -> Dict[str, Any]:
|
|
|
+ """Loads configuration from JSON file or returns default."""
|
|
|
+ if Path(CONFIG_FILE).exists():
|
|
|
+ with open(CONFIG_FILE, "r") as f:
|
|
|
+ try:
|
|
|
+ config = json.load(f)
|
|
|
+ # Ensure all keys from default are present
|
|
|
+ for key, value in DEFAULT_CONFIG.items():
|
|
|
+ config.setdefault(key, value)
|
|
|
+ return config
|
|
|
+ except json.JSONDecodeError:
|
|
|
+ return DEFAULT_CONFIG
|
|
|
+ return DEFAULT_CONFIG
|
|
|
+
|
|
|
+def save_config(config: Dict[str, Any]):
|
|
|
+ """Saves configuration to JSON file."""
|
|
|
+ with open(CONFIG_FILE, "w") as f:
|
|
|
+ json.dump(config, f, indent=4)
|
|
|
+
|
|
|
+def get_image_files(config: Dict[str, Any]) -> List[Path]:
|
|
|
+ """Gets a list of image files based on the configuration."""
|
|
|
+ if config["image_source"] == "directory":
|
|
|
+ source_dir = Path(config["image_dir"])
|
|
|
+ if not source_dir.is_dir():
|
|
|
+ console.print(
|
|
|
+ f"[bold red]Error: Image directory '{source_dir}' not found.[/bold red]"
|
|
|
+ )
|
|
|
+ return []
|
|
|
+ return sorted(list(source_dir.glob(f"*{config['image_ext']}")))
|
|
|
+ elif config["image_source"] == "specific_files":
|
|
|
+ return [Path(f) for f in config["specific_files"]]
|
|
|
+ return []
|
|
|
+
|
|
|
+def get_caption(
|
|
|
+ image_path: Path,
|
|
|
+ config: Dict[str, Any]
|
|
|
+) -> Optional[str]:
|
|
|
+ """Sends image to Ollama and returns the caption."""
|
|
|
+ try:
|
|
|
+ with open(image_path, "rb") as img_f:
|
|
|
+ image_data = base64.b64encode(img_f.read()).decode("utf-8")
|
|
|
+
|
|
|
+ payload = {
|
|
|
+ "model": config["model"],
|
|
|
+ "prompt": config["prompt"],
|
|
|
+ "images": [image_data],
|
|
|
+ "stream": False,
|
|
|
+ }
|
|
|
+
|
|
|
+ req = urllib.request.Request(
|
|
|
+ config["api_url"],
|
|
|
+ data=json.dumps(payload).encode("utf-8"),
|
|
|
+ headers={"Content-Type": "application/json"},
|
|
|
+ )
|
|
|
+
|
|
|
+ with urllib.request.urlopen(req) as response:
|
|
|
+ result = json.loads(response.read().decode("utf-8"))
|
|
|
+ return result.get("response", "").strip()
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ return f"ERROR: {e}"
|
|
|
+
|
|
|
+def show_main_menu(config: Dict[str, Any]):
|
|
|
+ """Displays the main interactive menu."""
|
|
|
+ while True:
|
|
|
+ console.print(
|
|
|
+ Panel(
|
|
|
+ f"[bold cyan]Ollama Image Captionizer[/bold cyan]\n\n"
|
|
|
+ f"{EMOJI_CONFIG} [bold]Current Settings:[/bold]\n"
|
|
|
+ f" - [yellow]Model[/yellow]: {config['model']}\n"
|
|
|
+ f" - [yellow]Image Source[/yellow]: {config['image_source']}\n"
|
|
|
+ f" - [yellow]Image Ext[/yellow]: {config['image_ext']}\n"
|
|
|
+ f" - [yellow]Image Dir/Files[/yellow]: {config['image_dir'] if config['image_source'] == 'directory' else f'{len(config['specific_files'])} files'}\n\n"
|
|
|
+ f"{EMOJI_PROMPT} [bold]Prompt:[/bold] \"{config['prompt']}"",
|
|
|
+ title="Main Menu",
|
|
|
+ border_style="green",
|
|
|
+ expand=False,
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ choice = run_gum_command(
|
|
|
+ [
|
|
|
+ "choose",
|
|
|
+ f"{EMOJI_IMAGE} Set Image Source",
|
|
|
+ f"{EMOJI_PROMPT} Edit Prompt",
|
|
|
+ f"{EMOJI_START} Start Captioning",
|
|
|
+ f"{EMOJI_EXIT} Exit",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
+ if choice and choice.startswith(EMOJI_IMAGE):
|
|
|
+ set_image_source(config)
|
|
|
+ elif choice and choice.startswith(EMOJI_PROMPT):
|
|
|
+ new_prompt = run_gum_command(
|
|
|
+ [
|
|
|
+ "input",
|
|
|
+ "--value",
|
|
|
+ config["prompt"],
|
|
|
+ "--header",
|
|
|
+ "Enter the new prompt",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ if new_prompt is not None:
|
|
|
+ config["prompt"] = new_prompt
|
|
|
+ save_config(config)
|
|
|
+ elif choice and choice.startswith(EMOJI_START):
|
|
|
+ files_to_process = get_image_files(config)
|
|
|
+ if files_to_process:
|
|
|
+ process_images(files_to_process, config)
|
|
|
+ else:
|
|
|
+ console.print("[bold yellow]No image files found to process.[/bold yellow]")
|
|
|
+ time.sleep(2)
|
|
|
+
|
|
|
+ elif choice and choice.startswith(EMOJI_EXIT):
|
|
|
+ console.print("[bold magenta]Goodbye![/bold magenta]")
|
|
|
+ sys.exit(0)
|
|
|
+ elif choice is None: # Gum was cancelled
|
|
|
+ console.print("[bold magenta]Goodbye![/bold magenta]")
|
|
|
+ sys.exit(0)
|
|
|
+
|
|
|
+def set_image_source(config: Dict[str, Any]):
|
|
|
+ """Menu to set the image source."""
|
|
|
+ choice = run_gum_command(
|
|
|
+ [
|
|
|
+ "choose",
|
|
|
+ "Process a directory of images",
|
|
|
+ "Select specific image files",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+
|
|
|
+ if choice and choice.startswith("Process"):
|
|
|
+ new_dir = run_gum_command(
|
|
|
+ [
|
|
|
+ "input",
|
|
|
+ "--value",
|
|
|
+ config["image_dir"],
|
|
|
+ "--header",
|
|
|
+ "Enter the directory path",
|
|
|
+ ]
|
|
|
+ )
|
|
|
+ if new_dir is not None:
|
|
|
+ config["image_source"] = "directory"
|
|
|
+ config["image_dir"] = new_dir
|
|
|
+ save_config(config)
|
|
|
+
|
|
|
+ elif choice and choice.startswith("Select"):
|
|
|
+ files_str = run_gum_command(["file", "--multiple", "--file", config["image_dir"]])
|
|
|
+ if files_str:
|
|
|
+ files = files_str.split("\n")
|
|
|
+ config["image_source"] = "specific_files"
|
|
|
+ config["specific_files"] = files
|
|
|
+ save_config(config)
|
|
|
+
|
|
|
+def process_images(image_files: List[Path], config: Dict[str, Any]):
|
|
|
+ """Processes the list of images and displays progress."""
|
|
|
+ log_table = Table(
|
|
|
+ title=f"{EMOJI_LOG} Captioning Log",
|
|
|
+ expand=True,
|
|
|
+ border_style="blue",
|
|
|
+ )
|
|
|
+ log_table.add_column("File", style="cyan", no_wrap=True)
|
|
|
+ log_table.add_column("Status", style="magenta")
|
|
|
+ log_table.add_column("Caption/Error", style="green")
|
|
|
+
|
|
|
+ with Live(log_table, refresh_per_second=4, console=console) as live:
|
|
|
+ for image_path in image_files:
|
|
|
+ output_file = image_path.with_suffix(".txt")
|
|
|
+
|
|
|
+ if not image_path.exists():
|
|
|
+ log_table.add_row(
|
|
|
+ str(image_path.name),
|
|
|
+ f"{EMOJI_SKIP} Skipped",
|
|
|
+ "[yellow]Image file not found.",
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ with Progress(
|
|
|
+ SpinnerColumn(),
|
|
|
+ TextColumn("[progress.description]{task.description}"),
|
|
|
+ transient=True,
|
|
|
+ ) as progress:
|
|
|
+ progress.add_task(f"Processing {image_path.name}", total=None)
|
|
|
+ caption = get_caption(image_path, config)
|
|
|
+
|
|
|
+ if caption and not caption.startswith("ERROR:"):
|
|
|
+ if len(caption.split()) <= 1:
|
|
|
+ status = f"{EMOJI_FAIL} Warning"
|
|
|
+ details = f"[yellow]Single-word caption: '{caption}'"
|
|
|
+ else:
|
|
|
+ status = f"{EMOJI_SUCCESS} Success"
|
|
|
+ details = f'"{caption[:60].replace(os.linesep, " ")}"...'
|
|
|
+
|
|
|
+ with open(output_file, "w") as out_f:
|
|
|
+ out_f.write(caption)
|
|
|
+
|
|
|
+ log_table.add_row(str(image_path.name), status, details)
|
|
|
+ elif not caption:
|
|
|
+ log_table.add_row(
|
|
|
+ str(image_path.name),
|
|
|
+ f"{EMOJI_FAIL} Failed",
|
|
|
+ "[red]Model returned an empty string.",
|
|
|
+ )
|
|
|
+ else: # Error case
|
|
|
+ log_table.add_row(
|
|
|
+ str(image_path.name),
|
|
|
+ f"{EMOJI_FAIL} Error",
|
|
|
+ f"[bold red]{caption}",
|
|
|
+ )
|
|
|
+ live.update(log_table)
|
|
|
+ time.sleep(0.5) #- Rate limit
|
|
|
+
|
|
|
+ console.print("[bold green]\nAll processing complete.[/bold green]")
|
|
|
+ console.print("Press Enter to return to the main menu.")
|
|
|
+ input()
|
|
|
+
|
|
|
+def main():
|
|
|
+ """Main function to run the captionizer."""
|
|
|
+ try:
|
|
|
+ config = load_config()
|
|
|
+ show_main_menu(config)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ console.print("\n[bold magenta]Exiting gracefully. Goodbye![/bold magenta]")
|
|
|
+ except Exception as e:
|
|
|
+ console.print(f"[bold red]An unexpected error occurred: {e}[/bold red]")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ main()
|