- Add Ollama fallback support in vector search with Azure OpenAI as primary - Enhance prayer system with public/private visibility options and language filtering - Update OG image to use new biblical-guide-og-image.png - Improve prayer request management with better categorization - Remove deprecated ingest_json_pgvector.py script 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
358 lines
14 KiB
Python
358 lines
14 KiB
Python
import os, re, json, math, time, asyncio, glob
|
|
from typing import List, Dict, Tuple, Iterable
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from dotenv import load_dotenv
|
|
import httpx
|
|
import psycopg
|
|
from psycopg.rows import dict_row
|
|
|
|
load_dotenv()
|
|
|
|
AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/")
|
|
AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY")
|
|
AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview")
|
|
AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3")
|
|
EMBED_DIMS = int(os.getenv("EMBED_DIMS", "1536"))
|
|
DB_URL = os.getenv("DATABASE_URL")
|
|
BIBLE_JSON_DIR = os.getenv("BIBLE_JSON_DIR", "/root/biblical-guide/bibles/json")
|
|
VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible")
|
|
MIN_FILE_SIZE = int(os.getenv("MIN_FILE_SIZE", "512000")) # 500KB in bytes
|
|
|
|
assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_JSON_DIR, "Missing required env vars"
|
|
|
|
EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}"
|
|
|
|
def get_large_bible_files():
|
|
"""Get all bible JSON files larger than MIN_FILE_SIZE"""
|
|
bible_files = []
|
|
pattern = os.path.join(BIBLE_JSON_DIR, "*_bible.json")
|
|
|
|
for filepath in glob.glob(pattern):
|
|
file_size = os.path.getsize(filepath)
|
|
if file_size >= MIN_FILE_SIZE:
|
|
bible_files.append(filepath)
|
|
|
|
bible_files.sort()
|
|
return bible_files
|
|
|
|
@dataclass
|
|
class Verse:
|
|
testament: str
|
|
book: str
|
|
chapter: int
|
|
verse: int
|
|
text_raw: str
|
|
text_norm: str
|
|
|
|
def normalize_text(s: str) -> str:
|
|
s = re.sub(r"\s+", " ", s.strip())
|
|
s = s.replace(" ", " ")
|
|
return s
|
|
|
|
def parse_bible_json(json_file_path: str):
|
|
"""Parse a Bible JSON file and yield verse data"""
|
|
try:
|
|
with open(json_file_path, 'r', encoding='utf-8') as f:
|
|
bible_data = json.load(f)
|
|
|
|
bible_name = bible_data.get('name', 'Unknown Bible')
|
|
abbreviation = bible_data.get('abbreviation', 'UNKNOWN')
|
|
language = bible_data.get('language', 'unknown')
|
|
|
|
print(f"Processing: {bible_name} ({abbreviation}, {language})")
|
|
|
|
for book in bible_data.get('books', []):
|
|
book_name = book.get('name', 'Unknown Book')
|
|
testament = book.get('testament', 'Unknown')
|
|
|
|
# Convert testament to short form for consistency
|
|
if 'Old' in testament:
|
|
testament = 'OT'
|
|
elif 'New' in testament:
|
|
testament = 'NT'
|
|
|
|
for chapter in book.get('chapters', []):
|
|
chapter_num = chapter.get('chapterNum', 1)
|
|
|
|
for verse in chapter.get('verses', []):
|
|
verse_num = verse.get('verseNum', 1)
|
|
text_raw = verse.get('text', '')
|
|
|
|
if text_raw: # Only process non-empty verses
|
|
text_norm = normalize_text(text_raw)
|
|
yield {
|
|
"testament": testament,
|
|
"book": book_name,
|
|
"chapter": chapter_num,
|
|
"verse": verse_num,
|
|
"text_raw": text_raw,
|
|
"text_norm": text_norm,
|
|
"language": language,
|
|
"translation": abbreviation
|
|
}
|
|
|
|
except Exception as e:
|
|
print(f"Error processing {json_file_path}: {e}")
|
|
return
|
|
|
|
async def embed_batch(client, inputs):
|
|
payload = {"input": inputs}
|
|
headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"}
|
|
for attempt in range(6):
|
|
try:
|
|
r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60)
|
|
if r.status_code == 200:
|
|
data = r.json()
|
|
ordered = sorted(data["data"], key=lambda x: x["index"])
|
|
return [d["embedding"] for d in ordered]
|
|
elif r.status_code in (429, 500, 503):
|
|
backoff = 2 ** attempt + (0.1 * attempt)
|
|
print(f"Rate limited, waiting {backoff:.1f}s...")
|
|
await asyncio.sleep(backoff)
|
|
else:
|
|
raise RuntimeError(f"Embedding error {r.status_code}: {r.text}")
|
|
except Exception as e:
|
|
backoff = 2 ** attempt + (0.1 * attempt)
|
|
print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...")
|
|
await asyncio.sleep(backoff)
|
|
raise RuntimeError("Failed to embed after retries")
|
|
|
|
def safe_ident(s: str) -> str:
|
|
return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_")
|
|
|
|
def get_table_info(language: str, translation: str):
|
|
"""Get table name and fully qualified name for a specific bible version"""
|
|
table_basename = f"bv_{safe_ident(language)}_{safe_ident(translation)}"
|
|
table_fqn = f'"{VECTOR_SCHEMA}"."{table_basename}"'
|
|
return table_basename, table_fqn
|
|
|
|
def create_table_sql(table_fqn: str) -> str:
|
|
return f"""
|
|
CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}";
|
|
CREATE TABLE IF NOT EXISTS {table_fqn} (
|
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
|
testament TEXT NOT NULL,
|
|
book TEXT NOT NULL,
|
|
chapter INT NOT NULL,
|
|
verse INT NOT NULL,
|
|
language TEXT NOT NULL,
|
|
translation TEXT NOT NULL,
|
|
ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED,
|
|
text_raw TEXT NOT NULL,
|
|
text_norm TEXT NOT NULL,
|
|
tsv tsvector,
|
|
embedding vector({EMBED_DIMS}),
|
|
created_at TIMESTAMPTZ DEFAULT now(),
|
|
updated_at TIMESTAMPTZ DEFAULT now()
|
|
);
|
|
"""
|
|
|
|
def create_indexes_sql(table_fqn: str, table_basename: str) -> str:
|
|
return f"""
|
|
CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{table_basename} ON {table_fqn} (translation, language, book, chapter, verse);
|
|
CREATE INDEX IF NOT EXISTS idx_tsv_{table_basename} ON {table_fqn} USING GIN (tsv);
|
|
CREATE INDEX IF NOT EXISTS idx_book_ch_{table_basename} ON {table_fqn} (book, chapter);
|
|
CREATE INDEX IF NOT EXISTS idx_testament_{table_basename} ON {table_fqn} (testament);
|
|
CREATE INDEX IF NOT EXISTS idx_lang_trans_{table_basename} ON {table_fqn} (language, translation);
|
|
"""
|
|
|
|
def upsert_sql(table_fqn: str) -> str:
|
|
return f"""
|
|
INSERT INTO {table_fqn} (testament, book, chapter, verse, language, translation, text_raw, text_norm, tsv, embedding)
|
|
VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(language)s, %(translation)s, %(text_raw)s, %(text_norm)s,
|
|
to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s)
|
|
ON CONFLICT (translation, language, book, chapter, verse) DO UPDATE
|
|
SET text_raw=EXCLUDED.text_raw,
|
|
text_norm=EXCLUDED.text_norm,
|
|
tsv=EXCLUDED.tsv,
|
|
embedding=EXCLUDED.embedding,
|
|
updated_at=now();
|
|
"""
|
|
|
|
async def process_bible_file(bible_file_path: str, client):
|
|
"""Process a single Bible JSON file"""
|
|
print(f"\n=== Processing {os.path.basename(bible_file_path)} ===")
|
|
|
|
verses = list(parse_bible_json(bible_file_path))
|
|
if not verses:
|
|
print(f"No verses found in {bible_file_path}, skipping...")
|
|
return
|
|
|
|
print(f"Parsed {len(verses):,} verses")
|
|
|
|
# Get language and translation from first verse
|
|
first_verse = verses[0]
|
|
language = first_verse["language"]
|
|
translation = first_verse["translation"]
|
|
|
|
table_basename, table_fqn = get_table_info(language, translation)
|
|
|
|
# Create schema + table structure for this bible version
|
|
with psycopg.connect(DB_URL) as conn:
|
|
with conn.cursor() as cur:
|
|
print(f"Creating table {table_fqn} ...")
|
|
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
|
|
cur.execute(create_table_sql(table_fqn))
|
|
cur.execute(create_indexes_sql(table_fqn, table_basename))
|
|
conn.commit()
|
|
print("Schema/table ready")
|
|
|
|
# Process embeddings in batches
|
|
batch_size = 128
|
|
with psycopg.connect(DB_URL, autocommit=False) as conn:
|
|
with conn.cursor() as cur:
|
|
for i in range(0, len(verses), batch_size):
|
|
batch = verses[i:i+batch_size]
|
|
inputs = [v["text_norm"] for v in batch]
|
|
|
|
print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}")
|
|
embs = await embed_batch(client, inputs)
|
|
|
|
rows = []
|
|
for v, e in zip(batch, embs):
|
|
# Determine text search language based on language code
|
|
ts_lang = "simple" # default
|
|
if v["language"].lower().startswith("ro"):
|
|
ts_lang = "romanian"
|
|
elif v["language"].lower().startswith("en"):
|
|
ts_lang = "english"
|
|
elif v["language"].lower().startswith("es"):
|
|
ts_lang = "spanish"
|
|
elif v["language"].lower().startswith("fr"):
|
|
ts_lang = "french"
|
|
elif v["language"].lower().startswith("de"):
|
|
ts_lang = "german"
|
|
elif v["language"].lower().startswith("it"):
|
|
ts_lang = "italian"
|
|
|
|
rows.append({
|
|
**v,
|
|
"ts_lang": ts_lang,
|
|
"embedding": e
|
|
})
|
|
|
|
cur.executemany(upsert_sql(table_fqn), rows)
|
|
conn.commit()
|
|
print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}")
|
|
|
|
# Create IVFFLAT index after data is loaded
|
|
print("Creating IVFFLAT index...")
|
|
with psycopg.connect(DB_URL, autocommit=True) as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(f"VACUUM ANALYZE {table_fqn};")
|
|
cur.execute(f"""
|
|
CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{table_basename}
|
|
ON {table_fqn} USING ivfflat (embedding vector_cosine_ops)
|
|
WITH (lists = 200);
|
|
""")
|
|
|
|
print(f"✅ {translation} ({language}) completed successfully! Total verses: {len(verses):,}")
|
|
|
|
def update_status(status_data):
|
|
"""Update the status file for monitoring progress"""
|
|
status_file = "/root/biblical-guide/scripts/ingest_status.json"
|
|
try:
|
|
import json
|
|
from datetime import datetime
|
|
status_data["last_update"] = datetime.now().isoformat()
|
|
with open(status_file, 'w') as f:
|
|
json.dump(status_data, f, indent=2)
|
|
except Exception as e:
|
|
print(f"Warning: Could not update status file: {e}")
|
|
|
|
async def main():
|
|
start_time = time.time()
|
|
print("Starting Bible embedding ingestion for all large Bible files...")
|
|
print(f"Timestamp: {time.strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
# Get all Bible files larger than minimum size
|
|
bible_files = get_large_bible_files()
|
|
|
|
if not bible_files:
|
|
print(f"No Bible files found larger than {MIN_FILE_SIZE/1024:.0f}KB in {BIBLE_JSON_DIR}")
|
|
return
|
|
|
|
print(f"Found {len(bible_files)} Bible files to process (>{MIN_FILE_SIZE/1024:.0f}KB each)")
|
|
|
|
# Initialize status tracking
|
|
status = {
|
|
"status": "running",
|
|
"start_time": time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
"total_files": len(bible_files),
|
|
"processed": 0,
|
|
"successful": 0,
|
|
"failed": 0,
|
|
"current_file": "",
|
|
"errors": []
|
|
}
|
|
update_status(status)
|
|
|
|
# Process files one by one to avoid memory issues
|
|
async with httpx.AsyncClient(timeout=120.0) as client:
|
|
successful = 0
|
|
failed = 0
|
|
failed_files = []
|
|
|
|
for i, bible_file in enumerate(bible_files, 1):
|
|
try:
|
|
file_size_mb = os.path.getsize(bible_file) / (1024 * 1024)
|
|
filename = os.path.basename(bible_file)
|
|
|
|
print(f"\n[{i}/{len(bible_files)}] Processing {filename} ({file_size_mb:.1f}MB)")
|
|
print(f"Progress: {(i-1)/len(bible_files)*100:.1f}% complete")
|
|
|
|
# Update status
|
|
status["current_file"] = filename
|
|
status["processed"] = i - 1
|
|
status["successful"] = successful
|
|
status["failed"] = failed
|
|
update_status(status)
|
|
|
|
await process_bible_file(bible_file, client)
|
|
successful += 1
|
|
print(f"✅ Completed {filename}")
|
|
|
|
except Exception as e:
|
|
error_msg = f"Failed to process {os.path.basename(bible_file)}: {str(e)}"
|
|
print(f"❌ {error_msg}")
|
|
failed += 1
|
|
failed_files.append(os.path.basename(bible_file))
|
|
status["errors"].append({"file": os.path.basename(bible_file), "error": str(e), "timestamp": time.strftime('%Y-%m-%d %H:%M:%S')})
|
|
update_status(status)
|
|
continue
|
|
|
|
# Final summary
|
|
elapsed_time = time.time() - start_time
|
|
elapsed_hours = elapsed_time / 3600
|
|
|
|
print(f"\n=== Final Summary ===")
|
|
print(f"✅ Successfully processed: {successful} files")
|
|
print(f"❌ Failed to process: {failed} files")
|
|
print(f"📊 Total files: {len(bible_files)}")
|
|
print(f"⏱️ Total time: {elapsed_hours:.2f} hours ({elapsed_time:.0f} seconds)")
|
|
print(f"📈 Average: {elapsed_time/len(bible_files):.1f} seconds per file")
|
|
|
|
if failed_files:
|
|
print(f"\n❌ Failed files:")
|
|
for filename in failed_files:
|
|
print(f" - {filename}")
|
|
|
|
# Final status update
|
|
status.update({
|
|
"status": "completed",
|
|
"end_time": time.strftime('%Y-%m-%d %H:%M:%S'),
|
|
"processed": len(bible_files),
|
|
"successful": successful,
|
|
"failed": failed,
|
|
"duration_seconds": elapsed_time,
|
|
"current_file": ""
|
|
})
|
|
update_status(status)
|
|
|
|
print("\n🎉 All large Bible files have been processed!")
|
|
print(f"📋 Status file: /root/biblical-guide/scripts/ingest_status.json")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|