| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- #!/usr/bin/env python3
- """
- Document analysis script using LLM to generate summaries and key insights.
- Groups pages into documents (like .eleventy.js) and analyzes each one.
- """
- import os
- import json
- import re
- from pathlib import Path
- from typing import Dict, List, Optional
- from collections import defaultdict
- from openai import OpenAI
- from tqdm import tqdm
- from dotenv import load_dotenv
- class DocumentAnalyzer:
- """Analyze grouped documents using LLM"""
- def __init__(self, api_url: str, api_key: str, model: str = "gpt-4o"):
- self.client = OpenAI(api_key=api_key, base_url=api_url)
- self.model = model
- self.results_dir = Path("./results")
- self.analyses_file = Path("./analyses.json")
- def normalize_doc_num(self, doc_num: Optional[str]) -> Optional[str]:
- """Normalize document number to handle LLM variations"""
- if not doc_num:
- return None
- return str(doc_num).lower().replace(r'[^a-z0-9-]', '-').replace(r'-+', '-').strip('-')
- def load_and_group_documents(self) -> List[Dict]:
- """Load all JSON files and group into documents (matching .eleventy.js logic)"""
- pages = []
- # Recursively read all JSON files
- for json_file in self.results_dir.glob("**/*.json"):
- try:
- with open(json_file, 'r', encoding='utf-8') as f:
- content = json.load(f)
- relative_path = json_file.relative_to(self.results_dir)
- pages.append({
- 'path': str(relative_path),
- 'filename': json_file.stem,
- 'folder': str(relative_path.parent) if relative_path.parent != Path('.') else 'root',
- **content
- })
- except Exception as e:
- print(f"Warning: Could not load {json_file}: {e}")
- print(f"Loaded {len(pages)} pages")
- # Group by normalized document number
- document_map = defaultdict(list)
- for page in pages:
- doc_num = page.get('document_metadata', {}).get('document_number')
- if not doc_num:
- # Use filename as fallback
- normalized = self.normalize_doc_num(page['filename']) or page['filename']
- else:
- normalized = self.normalize_doc_num(doc_num)
- document_map[normalized].append(page)
- # Helper to extract numeric page number
- def get_page_num(page):
- page_num = page.get('document_metadata', {}).get('page_number', 0) or 0
- if isinstance(page_num, int):
- return page_num
- # Handle formats like "24 of 66" or "24/66"
- if isinstance(page_num, str):
- # Extract first number
- match = re.search(r'(\d+)', page_num)
- if match:
- return int(match.group(1))
- return 0
- # Convert to sorted documents
- documents = []
- for normalized_num, doc_pages in document_map.items():
- # Sort pages by page number
- doc_pages.sort(key=get_page_num)
- # Get metadata
- first_page = doc_pages[0]
- raw_doc_nums = list(set(
- p.get('document_metadata', {}).get('document_number')
- for p in doc_pages
- if p.get('document_metadata', {}).get('document_number')
- ))
- # Combine full text from all pages
- full_text = '\n\n--- PAGE BREAK ---\n\n'.join(
- p.get('full_text', '') for p in doc_pages
- )
- # Collect all entities
- all_entities = {
- 'people': set(),
- 'organizations': set(),
- 'locations': set(),
- 'dates': set(),
- 'reference_numbers': set()
- }
- for page in doc_pages:
- if 'entities' in page and page['entities']:
- for key in all_entities.keys():
- if key in page['entities'] and page['entities'][key]:
- all_entities[key].update(page['entities'][key])
- documents.append({
- 'unique_id': normalized_num,
- 'document_number': raw_doc_nums[0] if len(raw_doc_nums) == 1 else normalized_num,
- 'page_count': len(doc_pages),
- 'full_text': full_text,
- 'document_metadata': first_page.get('document_metadata', {}),
- 'entities': {k: sorted(list(v)) for k, v in all_entities.items()}
- })
- print(f"Grouped into {len(documents)} documents")
- return sorted(documents, key=lambda d: d['document_number'])
- def get_analysis_prompt(self) -> str:
- """Get the system prompt for document analysis"""
- return """You are an expert legal document analyst specializing in court documents, depositions, and legal filings.
- Analyze the provided document and return a concise summary with key insights.
- Your analysis should include:
- 1. **Document Type**: What kind of document is this? (deposition, court filing, letter, email, affidavit, etc.)
- 2. **Key Topics**: What are the main subjects/topics discussed? (2-3 bullet points)
- 3. **Key People**: Who are the most important people mentioned and their roles?
- 4. **Significance**: Why is this document potentially important? What does it reveal or establish?
- 5. **Summary**: A 2-3 sentence summary of the document's content
- Be factual, concise, and focus on what makes this document notable or significant.
- Return ONLY valid JSON in this format:
- {
- "document_type": "string",
- "key_topics": ["topic1", "topic2", "topic3"],
- "key_people": [
- {"name": "person name", "role": "their role or significance in this doc"}
- ],
- "significance": "Why this document matters (1-2 sentences)",
- "summary": "Brief summary (2-3 sentences)"
- }"""
- def analyze_document(self, document: Dict) -> Optional[Dict]:
- """Analyze a single document using LLM"""
- try:
- # Limit text length for API (keep first ~8000 chars if too long)
- full_text = document['full_text']
- if len(full_text) > 8000:
- full_text = full_text[:8000] + "\n\n[... document continues ...]"
- response = self.client.chat.completions.create(
- model=self.model,
- messages=[
- {
- "role": "system",
- "content": self.get_analysis_prompt()
- },
- {
- "role": "user",
- "content": f"Analyze this document:\n\n{full_text}"
- }
- ],
- temperature=0.2,
- max_tokens=1000
- )
- content = response.choices[0].message.content.strip()
- # Extract JSON
- json_match = re.search(r'```(?:json)?\s*\n(.*?)\n```', content, re.DOTALL)
- if json_match:
- content = json_match.group(1).strip()
- else:
- json_match = re.search(r'\{.*\}', content, re.DOTALL)
- if json_match:
- content = json_match.group(0).strip()
- analysis = json.loads(content)
- return {
- 'document_id': document['unique_id'],
- 'document_number': document['document_number'],
- 'page_count': document['page_count'],
- 'analysis': analysis
- }
- except Exception as e:
- print(f"Error analyzing document {document['document_number']}: {e}")
- return None
- def analyze_all(self, limit: Optional[int] = None) -> List[Dict]:
- """Analyze all documents"""
- print("=" * 60)
- print("DOCUMENT ANALYSIS")
- print("=" * 60)
- # Load existing analyses to resume
- existing_analyses = {}
- if self.analyses_file.exists():
- try:
- with open(self.analyses_file, 'r', encoding='utf-8') as f:
- data = json.load(f)
- existing_analyses = {a['document_id']: a for a in data.get('analyses', [])}
- print(f"Found {len(existing_analyses)} existing analyses")
- except Exception as e:
- print(f"Could not load existing analyses: {e}")
- documents = self.load_and_group_documents()
- if limit:
- documents = documents[:limit]
- print(f"Limited to {limit} documents for this run")
- analyses = []
- skipped = 0
- for doc in tqdm(documents, desc="Analyzing documents"):
- # Skip if already analyzed
- if doc['unique_id'] in existing_analyses:
- analyses.append(existing_analyses[doc['unique_id']])
- skipped += 1
- continue
- analysis = self.analyze_document(doc)
- if analysis:
- analyses.append(analysis)
- # Save incrementally
- self.save_analyses(analyses)
- print(f"\n✅ Analyzed {len(analyses) - skipped} new documents")
- print(f" Skipped {skipped} already-analyzed documents")
- print(f" Total analyses: {len(analyses)}")
- return analyses
- def save_analyses(self, analyses: List[Dict]):
- """Save analyses to JSON file"""
- output = {
- 'total': len(analyses),
- 'analyses': analyses
- }
- with open(self.analyses_file, 'w', encoding='utf-8') as f:
- json.dump(output, f, indent=2, ensure_ascii=False)
- def main():
- load_dotenv()
- import argparse
- parser = argparse.ArgumentParser(description="Analyze documents using LLM")
- parser.add_argument("--api-url", help="OpenAI-compatible API base URL")
- parser.add_argument("--api-key", help="API key")
- parser.add_argument("--model", help="Model name")
- parser.add_argument("--limit", type=int, help="Limit number of documents to analyze")
- parser.add_argument("--force", action="store_true", help="Re-analyze all documents (ignore existing)")
- args = parser.parse_args()
- api_url = args.api_url or os.getenv("OPENAI_API_URL")
- api_key = args.api_key or os.getenv("OPENAI_API_KEY")
- model = args.model or os.getenv("OPENAI_MODEL", "gpt-4o")
- analyzer = DocumentAnalyzer(api_url, api_key, model)
- # Clear existing if force flag
- if args.force and analyzer.analyses_file.exists():
- analyzer.analyses_file.unlink()
- print("Removed existing analyses (--force mode)")
- analyses = analyzer.analyze_all(limit=args.limit)
- analyzer.save_analyses(analyses)
- print(f"\n✅ Saved analyses to {analyzer.analyses_file}")
- if __name__ == "__main__":
- main()
|