import os, re, json, math, time, asyncio, glob from typing import List, Dict, Tuple, Iterable from dataclasses import dataclass from pathlib import Path from dotenv import load_dotenv import httpx import psycopg from psycopg.rows import dict_row load_dotenv() AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/") AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY") AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview") AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3") EMBED_DIMS = int(os.getenv("EMBED_DIMS", "1536")) DB_URL = os.getenv("DATABASE_URL") BIBLE_JSON_DIR = os.getenv("BIBLE_JSON_DIR", "/root/biblical-guide/bibles/json") VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible") MIN_FILE_SIZE = int(os.getenv("MIN_FILE_SIZE", "512000")) # 500KB in bytes assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_JSON_DIR, "Missing required env vars" EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}" def get_large_bible_files(): """Get all bible JSON files larger than MIN_FILE_SIZE""" bible_files = [] pattern = os.path.join(BIBLE_JSON_DIR, "*_bible.json") for filepath in glob.glob(pattern): file_size = os.path.getsize(filepath) if file_size >= MIN_FILE_SIZE: bible_files.append(filepath) bible_files.sort() return bible_files @dataclass class Verse: testament: str book: str chapter: int verse: int text_raw: str text_norm: str def normalize_text(s: str) -> str: s = re.sub(r"\s+", " ", s.strip()) s = s.replace(" ", " ") return s def parse_bible_json(json_file_path: str): """Parse a Bible JSON file and yield verse data""" try: with open(json_file_path, 'r', encoding='utf-8') as f: bible_data = json.load(f) bible_name = bible_data.get('name', 'Unknown Bible') abbreviation = bible_data.get('abbreviation', 'UNKNOWN') language = bible_data.get('language', 'unknown') print(f"Processing: {bible_name} ({abbreviation}, {language})") for book in bible_data.get('books', []): book_name = book.get('name', 'Unknown Book') testament = book.get('testament', 'Unknown') # Convert testament to short form for consistency if 'Old' in testament: testament = 'OT' elif 'New' in testament: testament = 'NT' for chapter in book.get('chapters', []): chapter_num = chapter.get('chapterNum', 1) for verse in chapter.get('verses', []): verse_num = verse.get('verseNum', 1) text_raw = verse.get('text', '') if text_raw: # Only process non-empty verses text_norm = normalize_text(text_raw) yield { "testament": testament, "book": book_name, "chapter": chapter_num, "verse": verse_num, "text_raw": text_raw, "text_norm": text_norm, "language": language, "translation": abbreviation } except Exception as e: print(f"Error processing {json_file_path}: {e}") return async def embed_batch(client, inputs): payload = {"input": inputs} headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"} for attempt in range(6): try: r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60) if r.status_code == 200: data = r.json() ordered = sorted(data["data"], key=lambda x: x["index"]) return [d["embedding"] for d in ordered] elif r.status_code in (429, 500, 503): backoff = 2 ** attempt + (0.1 * attempt) print(f"Rate limited, waiting {backoff:.1f}s...") await asyncio.sleep(backoff) else: raise RuntimeError(f"Embedding error {r.status_code}: {r.text}") except Exception as e: backoff = 2 ** attempt + (0.1 * attempt) print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...") await asyncio.sleep(backoff) raise RuntimeError("Failed to embed after retries") def safe_ident(s: str) -> str: return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_") def get_table_info(language: str, translation: str): """Get table name and fully qualified name for a specific bible version""" table_basename = f"bv_{safe_ident(language)}_{safe_ident(translation)}" table_fqn = f'"{VECTOR_SCHEMA}"."{table_basename}"' return table_basename, table_fqn def create_table_sql(table_fqn: str) -> str: return f""" CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}"; CREATE TABLE IF NOT EXISTS {table_fqn} ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), testament TEXT NOT NULL, book TEXT NOT NULL, chapter INT NOT NULL, verse INT NOT NULL, language TEXT NOT NULL, translation TEXT NOT NULL, ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED, text_raw TEXT NOT NULL, text_norm TEXT NOT NULL, tsv tsvector, embedding vector({EMBED_DIMS}), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now() ); """ def create_indexes_sql(table_fqn: str, table_basename: str) -> str: return f""" CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{table_basename} ON {table_fqn} (translation, language, book, chapter, verse); CREATE INDEX IF NOT EXISTS idx_tsv_{table_basename} ON {table_fqn} USING GIN (tsv); CREATE INDEX IF NOT EXISTS idx_book_ch_{table_basename} ON {table_fqn} (book, chapter); CREATE INDEX IF NOT EXISTS idx_testament_{table_basename} ON {table_fqn} (testament); CREATE INDEX IF NOT EXISTS idx_lang_trans_{table_basename} ON {table_fqn} (language, translation); """ def upsert_sql(table_fqn: str) -> str: return f""" INSERT INTO {table_fqn} (testament, book, chapter, verse, language, translation, text_raw, text_norm, tsv, embedding) VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(language)s, %(translation)s, %(text_raw)s, %(text_norm)s, to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s) ON CONFLICT (translation, language, book, chapter, verse) DO UPDATE SET text_raw=EXCLUDED.text_raw, text_norm=EXCLUDED.text_norm, tsv=EXCLUDED.tsv, embedding=EXCLUDED.embedding, updated_at=now(); """ async def process_bible_file(bible_file_path: str, client): """Process a single Bible JSON file""" print(f"\n=== Processing {os.path.basename(bible_file_path)} ===") verses = list(parse_bible_json(bible_file_path)) if not verses: print(f"No verses found in {bible_file_path}, skipping...") return print(f"Parsed {len(verses):,} verses") # Get language and translation from first verse first_verse = verses[0] language = first_verse["language"] translation = first_verse["translation"] table_basename, table_fqn = get_table_info(language, translation) # Create schema + table structure for this bible version with psycopg.connect(DB_URL) as conn: with conn.cursor() as cur: print(f"Creating table {table_fqn} ...") cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") cur.execute(create_table_sql(table_fqn)) cur.execute(create_indexes_sql(table_fqn, table_basename)) conn.commit() print("Schema/table ready") # Process embeddings in batches batch_size = 128 with psycopg.connect(DB_URL, autocommit=False) as conn: with conn.cursor() as cur: for i in range(0, len(verses), batch_size): batch = verses[i:i+batch_size] inputs = [v["text_norm"] for v in batch] print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}") embs = await embed_batch(client, inputs) rows = [] for v, e in zip(batch, embs): # Determine text search language based on language code ts_lang = "simple" # default if v["language"].lower().startswith("ro"): ts_lang = "romanian" elif v["language"].lower().startswith("en"): ts_lang = "english" elif v["language"].lower().startswith("es"): ts_lang = "spanish" elif v["language"].lower().startswith("fr"): ts_lang = "french" elif v["language"].lower().startswith("de"): ts_lang = "german" elif v["language"].lower().startswith("it"): ts_lang = "italian" rows.append({ **v, "ts_lang": ts_lang, "embedding": e }) cur.executemany(upsert_sql(table_fqn), rows) conn.commit() print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}") # Create IVFFLAT index after data is loaded print("Creating IVFFLAT index...") with psycopg.connect(DB_URL, autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f"VACUUM ANALYZE {table_fqn};") cur.execute(f""" CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{table_basename} ON {table_fqn} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 200); """) print(f"✅ {translation} ({language}) completed successfully! Total verses: {len(verses):,}") def update_status(status_data): """Update the status file for monitoring progress""" status_file = "/root/biblical-guide/scripts/ingest_status.json" try: import json from datetime import datetime status_data["last_update"] = datetime.now().isoformat() with open(status_file, 'w') as f: json.dump(status_data, f, indent=2) except Exception as e: print(f"Warning: Could not update status file: {e}") async def main(): start_time = time.time() print("Starting Bible embedding ingestion for all large Bible files...") print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}") # Get all Bible files larger than minimum size bible_files = get_large_bible_files() if not bible_files: print(f"No Bible files found larger than {MIN_FILE_SIZE/1024:.0f}KB in {BIBLE_JSON_DIR}") return print(f"Found {len(bible_files)} Bible files to process (>{MIN_FILE_SIZE/1024:.0f}KB each)") # Initialize status tracking status = { "status": "running", "start_time": time.strftime('%Y-%m-%d %H:%M:%S'), "total_files": len(bible_files), "processed": 0, "successful": 0, "failed": 0, "current_file": "", "errors": [] } update_status(status) # Process files one by one to avoid memory issues async with httpx.AsyncClient(timeout=120.0) as client: successful = 0 failed = 0 failed_files = [] for i, bible_file in enumerate(bible_files, 1): try: file_size_mb = os.path.getsize(bible_file) / (1024 * 1024) filename = os.path.basename(bible_file) print(f"\n[{i}/{len(bible_files)}] Processing {filename} ({file_size_mb:.1f}MB)") print(f"Progress: {(i-1)/len(bible_files)*100:.1f}% complete") # Update status status["current_file"] = filename status["processed"] = i - 1 status["successful"] = successful status["failed"] = failed update_status(status) await process_bible_file(bible_file, client) successful += 1 print(f"✅ Completed {filename}") except Exception as e: error_msg = f"Failed to process {os.path.basename(bible_file)}: {str(e)}" print(f"❌ {error_msg}") failed += 1 failed_files.append(os.path.basename(bible_file)) status["errors"].append({"file": os.path.basename(bible_file), "error": str(e), "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')}) update_status(status) continue # Final summary elapsed_time = time.time() - start_time elapsed_hours = elapsed_time / 3600 print(f"\n=== Final Summary ===") print(f"✅ Successfully processed: {successful} files") print(f"❌ Failed to process: {failed} files") print(f"📊 Total files: {len(bible_files)}") print(f"⏱️ Total time: {elapsed_hours:.2f} hours ({elapsed_time:.0f} seconds)") print(f"📈 Average: {elapsed_time/len(bible_files):.1f} seconds per file") if failed_files: print(f"\n❌ Failed files:") for filename in failed_files: print(f" - {filename}") # Final status update status.update({ "status": "completed", "end_time": time.strftime('%Y-%m-%d %H:%M:%S'), "processed": len(bible_files), "successful": successful, "failed": failed, "duration_seconds": elapsed_time, "current_file": "" }) update_status(status) print("\n🎉 All large Bible files have been processed!") print(f"📋 Status file: /root/biblical-guide/scripts/ingest_status.json") if __name__ == "__main__": asyncio.run(main())