| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- #!/usr/bin/env python3
- import json
- import base64
- import time
- import urllib.request
- import os
- import sys
- import subprocess
- from pathlib import Path
- from typing import List, Dict, Any, Optional
- from rich.console import Console
- from rich.panel import Panel
- from rich.live import Live
- from rich.table import Table
- from rich.progress import Progress, SpinnerColumn, TextColumn
- # --- EMOJIS ---
- EMOJI_CONFIG = "⚙️"
- EMOJI_IMAGE = "🖼️"
- EMOJI_PROMPT = "💬"
- EMOJI_START = "🚀"
- EMOJI_EXIT = "🚪"
- EMOJI_SUCCESS = "✅"
- EMOJI_FAIL = "❌"
- EMOJI_SKIP = "⏭️"
- EMOJI_LOG = "📝"
- # --- CONFIGURATION ---
- CONFIG_FILE = "config.json"
- DEFAULT_CONFIG = {
- "image_source": "directory", # 'directory' or 'specific_files'
- "image_dir": ".",
- "specific_files": [],
- "image_ext": ".jpg",
- "model": "moondream",
- "api_url": "http://localhost:11434/api/generate",
- "prompt": "Describe this image in a single, descriptive sentence.",
- }
- console = Console()
- def run_gum_command(command: List[str]) -> Optional[str]:
- """Runs a gum command and returns its stripped output."""
- try:
- result = subprocess.run(
- ["gum"] + command,
- capture_output=True,
- text=True,
- check=True,
- )
- return result.stdout.strip()
- except (subprocess.CalledProcessError, FileNotFoundError) as e:
- console.print(
- f"[bold red]Error running 'gum'. Is it installed and in your PATH? ({e})[/bold red]"
- )
- return None
- def load_config() -> Dict[str, Any]:
- """Loads configuration from JSON file or returns default."""
- if Path(CONFIG_FILE).exists():
- with open(CONFIG_FILE, "r") as f:
- try:
- config = json.load(f)
- # Ensure all keys from default are present
- for key, value in DEFAULT_CONFIG.items():
- config.setdefault(key, value)
- return config
- except json.JSONDecodeError:
- return DEFAULT_CONFIG
- return DEFAULT_CONFIG
- def save_config(config: Dict[str, Any]):
- """Saves configuration to JSON file."""
- with open(CONFIG_FILE, "w") as f:
- json.dump(config, f, indent=4)
- def get_image_files(config: Dict[str, Any]) -> List[Path]:
- """Gets a list of image files based on the configuration."""
- if config["image_source"] == "directory":
- source_dir = Path(config["image_dir"])
- if not source_dir.is_dir():
- console.print(
- f"[bold red]Error: Image directory '{source_dir}' not found.[/bold red]"
- )
- return []
- return sorted(list(source_dir.glob(f"*{config['image_ext']}")))
- elif config["image_source"] == "specific_files":
- return [Path(f) for f in config["specific_files"]]
- return []
- def get_caption(
- image_path: Path,
- config: Dict[str, Any]
- ) -> Optional[str]:
- """Sends image to Ollama and returns the caption."""
- try:
- with open(image_path, "rb") as img_f:
- image_data = base64.b64encode(img_f.read()).decode("utf-8")
- payload = {
- "model": config["model"],
- "prompt": config["prompt"],
- "images": [image_data],
- "stream": False,
- }
- req = urllib.request.Request(
- config["api_url"],
- data=json.dumps(payload).encode("utf-8"),
- headers={"Content-Type": "application/json"},
- )
- with urllib.request.urlopen(req) as response:
- result = json.loads(response.read().decode("utf-8"))
- return result.get("response", "").strip()
- except Exception as e:
- return f"ERROR: {e}"
- def show_main_menu(config: Dict[str, Any]):
- """Displays the main interactive menu."""
- while True:
- console.print(
- Panel(
- f"[bold cyan]Ollama Image Captionizer[/bold cyan]\n\n"
- f"{EMOJI_CONFIG} [bold]Current Settings:[/bold]\n"
- f" - [yellow]Model[/yellow]: {config['model']}\n"
- f" - [yellow]Image Source[/yellow]: {config['image_source']}\n"
- f" - [yellow]Image Ext[/yellow]: {config['image_ext']}\n"
- f" - [yellow]Image Dir/Files[/yellow]: {config['image_dir'] if config['image_source'] == 'directory' else f'{len(config['specific_files'])} files'}\n\n"
- f"{EMOJI_PROMPT} [bold]Prompt:[/bold] \"{config['prompt']}"",
- title="Main Menu",
- border_style="green",
- expand=False,
- )
- )
- choice = run_gum_command(
- [
- "choose",
- f"{EMOJI_IMAGE} Set Image Source",
- f"{EMOJI_PROMPT} Edit Prompt",
- f"{EMOJI_START} Start Captioning",
- f"{EMOJI_EXIT} Exit",
- ]
- )
- if choice and choice.startswith(EMOJI_IMAGE):
- set_image_source(config)
- elif choice and choice.startswith(EMOJI_PROMPT):
- new_prompt = run_gum_command(
- [
- "input",
- "--value",
- config["prompt"],
- "--header",
- "Enter the new prompt",
- ]
- )
- if new_prompt is not None:
- config["prompt"] = new_prompt
- save_config(config)
- elif choice and choice.startswith(EMOJI_START):
- files_to_process = get_image_files(config)
- if files_to_process:
- process_images(files_to_process, config)
- else:
- console.print("[bold yellow]No image files found to process.[/bold yellow]")
- time.sleep(2)
- elif choice and choice.startswith(EMOJI_EXIT):
- console.print("[bold magenta]Goodbye![/bold magenta]")
- sys.exit(0)
- elif choice is None: # Gum was cancelled
- console.print("[bold magenta]Goodbye![/bold magenta]")
- sys.exit(0)
- def set_image_source(config: Dict[str, Any]):
- """Menu to set the image source."""
- choice = run_gum_command(
- [
- "choose",
- "Process a directory of images",
- "Select specific image files",
- ]
- )
- if choice and choice.startswith("Process"):
- new_dir = run_gum_command(
- [
- "input",
- "--value",
- config["image_dir"],
- "--header",
- "Enter the directory path",
- ]
- )
- if new_dir is not None:
- config["image_source"] = "directory"
- config["image_dir"] = new_dir
- save_config(config)
- elif choice and choice.startswith("Select"):
- files_str = run_gum_command(["file", "--multiple", "--file", config["image_dir"]])
- if files_str:
- files = files_str.split("\n")
- config["image_source"] = "specific_files"
- config["specific_files"] = files
- save_config(config)
- def process_images(image_files: List[Path], config: Dict[str, Any]):
- """Processes the list of images and displays progress."""
- log_table = Table(
- title=f"{EMOJI_LOG} Captioning Log",
- expand=True,
- border_style="blue",
- )
- log_table.add_column("File", style="cyan", no_wrap=True)
- log_table.add_column("Status", style="magenta")
- log_table.add_column("Caption/Error", style="green")
- with Live(log_table, refresh_per_second=4, console=console) as live:
- for image_path in image_files:
- output_file = image_path.with_suffix(".txt")
- if not image_path.exists():
- log_table.add_row(
- str(image_path.name),
- f"{EMOJI_SKIP} Skipped",
- "[yellow]Image file not found.",
- )
- continue
-
- with Progress(
- SpinnerColumn(),
- TextColumn("[progress.description]{task.description}"),
- transient=True,
- ) as progress:
- progress.add_task(f"Processing {image_path.name}", total=None)
- caption = get_caption(image_path, config)
-
- if caption and not caption.startswith("ERROR:"):
- if len(caption.split()) <= 1:
- status = f"{EMOJI_FAIL} Warning"
- details = f"[yellow]Single-word caption: '{caption}'"
- else:
- status = f"{EMOJI_SUCCESS} Success"
- details = f'"{caption[:60].replace(os.linesep, " ")}"...'
-
- with open(output_file, "w") as out_f:
- out_f.write(caption)
- log_table.add_row(str(image_path.name), status, details)
- elif not caption:
- log_table.add_row(
- str(image_path.name),
- f"{EMOJI_FAIL} Failed",
- "[red]Model returned an empty string.",
- )
- else: # Error case
- log_table.add_row(
- str(image_path.name),
- f"{EMOJI_FAIL} Error",
- f"[bold red]{caption}",
- )
- live.update(log_table)
- time.sleep(0.5) #- Rate limit
- console.print("[bold green]\nAll processing complete.[/bold green]")
- console.print("Press Enter to return to the main menu.")
- input()
- def main():
- """Main function to run the captionizer."""
- try:
- config = load_config()
- show_main_menu(config)
- except KeyboardInterrupt:
- console.print("\n[bold magenta]Exiting gracefully. Goodbye![/bold magenta]")
- except Exception as e:
- console.print(f"[bold red]An unexpected error occurred: {e}[/bold red]")
- if __name__ == "__main__":
- main()
|