Files
biblical-guide.com/scripts/ingest_bible_pgvector.py
Andrei e4b815cb40 Add Ollama embedding support and improve prayer system with public/private visibility
- Add Ollama fallback support in vector search with Azure OpenAI as primary
- Enhance prayer system with public/private visibility options and language filtering
- Update OG image to use new biblical-guide-og-image.png
- Improve prayer request management with better categorization
- Remove deprecated ingest_json_pgvector.py script

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-28 19:25:49 +00:00

358 lines
14 KiB
Python

import os, re, json, math, time, asyncio, glob
from typing import List, Dict, Tuple, Iterable
from dataclasses import dataclass
from pathlib import Path
from dotenv import load_dotenv
import httpx
import psycopg
from psycopg.rows import dict_row
load_dotenv()
AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/")
AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY")
AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview")
AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3")
EMBED_DIMS = int(os.getenv("EMBED_DIMS", "1536"))
DB_URL = os.getenv("DATABASE_URL")
BIBLE_JSON_DIR = os.getenv("BIBLE_JSON_DIR", "/root/biblical-guide/bibles/json")
VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible")
MIN_FILE_SIZE = int(os.getenv("MIN_FILE_SIZE", "512000")) # 500KB in bytes
assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_JSON_DIR, "Missing required env vars"
EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}"
def get_large_bible_files():
"""Get all bible JSON files larger than MIN_FILE_SIZE"""
bible_files = []
pattern = os.path.join(BIBLE_JSON_DIR, "*_bible.json")
for filepath in glob.glob(pattern):
file_size = os.path.getsize(filepath)
if file_size >= MIN_FILE_SIZE:
bible_files.append(filepath)
bible_files.sort()
return bible_files
@dataclass
class Verse:
testament: str
book: str
chapter: int
verse: int
text_raw: str
text_norm: str
def normalize_text(s: str) -> str:
s = re.sub(r"\s+", " ", s.strip())
s = s.replace(" ", " ")
return s
def parse_bible_json(json_file_path: str):
"""Parse a Bible JSON file and yield verse data"""
try:
with open(json_file_path, 'r', encoding='utf-8') as f:
bible_data = json.load(f)
bible_name = bible_data.get('name', 'Unknown Bible')
abbreviation = bible_data.get('abbreviation', 'UNKNOWN')
language = bible_data.get('language', 'unknown')
print(f"Processing: {bible_name} ({abbreviation}, {language})")
for book in bible_data.get('books', []):
book_name = book.get('name', 'Unknown Book')
testament = book.get('testament', 'Unknown')
# Convert testament to short form for consistency
if 'Old' in testament:
testament = 'OT'
elif 'New' in testament:
testament = 'NT'
for chapter in book.get('chapters', []):
chapter_num = chapter.get('chapterNum', 1)
for verse in chapter.get('verses', []):
verse_num = verse.get('verseNum', 1)
text_raw = verse.get('text', '')
if text_raw: # Only process non-empty verses
text_norm = normalize_text(text_raw)
yield {
"testament": testament,
"book": book_name,
"chapter": chapter_num,
"verse": verse_num,
"text_raw": text_raw,
"text_norm": text_norm,
"language": language,
"translation": abbreviation
}
except Exception as e:
print(f"Error processing {json_file_path}: {e}")
return
async def embed_batch(client, inputs):
payload = {"input": inputs}
headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"}
for attempt in range(6):
try:
r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60)
if r.status_code == 200:
data = r.json()
ordered = sorted(data["data"], key=lambda x: x["index"])
return [d["embedding"] for d in ordered]
elif r.status_code in (429, 500, 503):
backoff = 2 ** attempt + (0.1 * attempt)
print(f"Rate limited, waiting {backoff:.1f}s...")
await asyncio.sleep(backoff)
else:
raise RuntimeError(f"Embedding error {r.status_code}: {r.text}")
except Exception as e:
backoff = 2 ** attempt + (0.1 * attempt)
print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...")
await asyncio.sleep(backoff)
raise RuntimeError("Failed to embed after retries")
def safe_ident(s: str) -> str:
return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_")
def get_table_info(language: str, translation: str):
"""Get table name and fully qualified name for a specific bible version"""
table_basename = f"bv_{safe_ident(language)}_{safe_ident(translation)}"
table_fqn = f'"{VECTOR_SCHEMA}"."{table_basename}"'
return table_basename, table_fqn
def create_table_sql(table_fqn: str) -> str:
return f"""
CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}";
CREATE TABLE IF NOT EXISTS {table_fqn} (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
testament TEXT NOT NULL,
book TEXT NOT NULL,
chapter INT NOT NULL,
verse INT NOT NULL,
language TEXT NOT NULL,
translation TEXT NOT NULL,
ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED,
text_raw TEXT NOT NULL,
text_norm TEXT NOT NULL,
tsv tsvector,
embedding vector({EMBED_DIMS}),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
"""
def create_indexes_sql(table_fqn: str, table_basename: str) -> str:
return f"""
CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{table_basename} ON {table_fqn} (translation, language, book, chapter, verse);
CREATE INDEX IF NOT EXISTS idx_tsv_{table_basename} ON {table_fqn} USING GIN (tsv);
CREATE INDEX IF NOT EXISTS idx_book_ch_{table_basename} ON {table_fqn} (book, chapter);
CREATE INDEX IF NOT EXISTS idx_testament_{table_basename} ON {table_fqn} (testament);
CREATE INDEX IF NOT EXISTS idx_lang_trans_{table_basename} ON {table_fqn} (language, translation);
"""
def upsert_sql(table_fqn: str) -> str:
return f"""
INSERT INTO {table_fqn} (testament, book, chapter, verse, language, translation, text_raw, text_norm, tsv, embedding)
VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(language)s, %(translation)s, %(text_raw)s, %(text_norm)s,
to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s)
ON CONFLICT (translation, language, book, chapter, verse) DO UPDATE
SET text_raw=EXCLUDED.text_raw,
text_norm=EXCLUDED.text_norm,
tsv=EXCLUDED.tsv,
embedding=EXCLUDED.embedding,
updated_at=now();
"""
async def process_bible_file(bible_file_path: str, client):
"""Process a single Bible JSON file"""
print(f"\n=== Processing {os.path.basename(bible_file_path)} ===")
verses = list(parse_bible_json(bible_file_path))
if not verses:
print(f"No verses found in {bible_file_path}, skipping...")
return
print(f"Parsed {len(verses):,} verses")
# Get language and translation from first verse
first_verse = verses[0]
language = first_verse["language"]
translation = first_verse["translation"]
table_basename, table_fqn = get_table_info(language, translation)
# Create schema + table structure for this bible version
with psycopg.connect(DB_URL) as conn:
with conn.cursor() as cur:
print(f"Creating table {table_fqn} ...")
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
cur.execute(create_table_sql(table_fqn))
cur.execute(create_indexes_sql(table_fqn, table_basename))
conn.commit()
print("Schema/table ready")
# Process embeddings in batches
batch_size = 128
with psycopg.connect(DB_URL, autocommit=False) as conn:
with conn.cursor() as cur:
for i in range(0, len(verses), batch_size):
batch = verses[i:i+batch_size]
inputs = [v["text_norm"] for v in batch]
print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}")
embs = await embed_batch(client, inputs)
rows = []
for v, e in zip(batch, embs):
# Determine text search language based on language code
ts_lang = "simple" # default
if v["language"].lower().startswith("ro"):
ts_lang = "romanian"
elif v["language"].lower().startswith("en"):
ts_lang = "english"
elif v["language"].lower().startswith("es"):
ts_lang = "spanish"
elif v["language"].lower().startswith("fr"):
ts_lang = "french"
elif v["language"].lower().startswith("de"):
ts_lang = "german"
elif v["language"].lower().startswith("it"):
ts_lang = "italian"
rows.append({
**v,
"ts_lang": ts_lang,
"embedding": e
})
cur.executemany(upsert_sql(table_fqn), rows)
conn.commit()
print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}")
# Create IVFFLAT index after data is loaded
print("Creating IVFFLAT index...")
with psycopg.connect(DB_URL, autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"VACUUM ANALYZE {table_fqn};")
cur.execute(f"""
CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{table_basename}
ON {table_fqn} USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 200);
""")
print(f"{translation} ({language}) completed successfully! Total verses: {len(verses):,}")
def update_status(status_data):
"""Update the status file for monitoring progress"""
status_file = "/root/biblical-guide/scripts/ingest_status.json"
try:
import json
from datetime import datetime
status_data["last_update"] = datetime.now().isoformat()
with open(status_file, 'w') as f:
json.dump(status_data, f, indent=2)
except Exception as e:
print(f"Warning: Could not update status file: {e}")
async def main():
start_time = time.time()
print("Starting Bible embedding ingestion for all large Bible files...")
print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
# Get all Bible files larger than minimum size
bible_files = get_large_bible_files()
if not bible_files:
print(f"No Bible files found larger than {MIN_FILE_SIZE/1024:.0f}KB in {BIBLE_JSON_DIR}")
return
print(f"Found {len(bible_files)} Bible files to process (>{MIN_FILE_SIZE/1024:.0f}KB each)")
# Initialize status tracking
status = {
"status": "running",
"start_time": time.strftime('%Y-%m-%d %H:%M:%S'),
"total_files": len(bible_files),
"processed": 0,
"successful": 0,
"failed": 0,
"current_file": "",
"errors": []
}
update_status(status)
# Process files one by one to avoid memory issues
async with httpx.AsyncClient(timeout=120.0) as client:
successful = 0
failed = 0
failed_files = []
for i, bible_file in enumerate(bible_files, 1):
try:
file_size_mb = os.path.getsize(bible_file) / (1024 * 1024)
filename = os.path.basename(bible_file)
print(f"\n[{i}/{len(bible_files)}] Processing {filename} ({file_size_mb:.1f}MB)")
print(f"Progress: {(i-1)/len(bible_files)*100:.1f}% complete")
# Update status
status["current_file"] = filename
status["processed"] = i - 1
status["successful"] = successful
status["failed"] = failed
update_status(status)
await process_bible_file(bible_file, client)
successful += 1
print(f"✅ Completed {filename}")
except Exception as e:
error_msg = f"Failed to process {os.path.basename(bible_file)}: {str(e)}"
print(f"{error_msg}")
failed += 1
failed_files.append(os.path.basename(bible_file))
status["errors"].append({"file": os.path.basename(bible_file), "error": str(e), "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')})
update_status(status)
continue
# Final summary
elapsed_time = time.time() - start_time
elapsed_hours = elapsed_time / 3600
print(f"\n=== Final Summary ===")
print(f"✅ Successfully processed: {successful} files")
print(f"❌ Failed to process: {failed} files")
print(f"📊 Total files: {len(bible_files)}")
print(f"⏱️ Total time: {elapsed_hours:.2f} hours ({elapsed_time:.0f} seconds)")
print(f"📈 Average: {elapsed_time/len(bible_files):.1f} seconds per file")
if failed_files:
print(f"\n❌ Failed files:")
for filename in failed_files:
print(f" - {filename}")
# Final status update
status.update({
"status": "completed",
"end_time": time.strftime('%Y-%m-%d %H:%M:%S'),
"processed": len(bible_files),
"successful": successful,
"failed": failed,
"duration_seconds": elapsed_time,
"current_file": ""
})
update_status(status)
print("\n🎉 All large Bible files have been processed!")
print(f"📋 Status file: /root/biblical-guide/scripts/ingest_status.json")
if __name__ == "__main__":
asyncio.run(main())