import os, re, json, math, time, asyncio from typing import List, Dict, Tuple, Iterable from dataclasses import dataclass from pathlib import Path from dotenv import load_dotenv import httpx import psycopg from psycopg.rows import dict_row load_dotenv() AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/") AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY") AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview") AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3") EMBED_DIMS = int(os.getenv("EMBED_DIMS", "3072")) DB_URL = os.getenv("DATABASE_URL") BIBLE_MD_PATH = os.getenv("BIBLE_MD_PATH") LANG_CODE = os.getenv("LANG_CODE", "ro") TRANSLATION = os.getenv("TRANSLATION_CODE", "FIDELA") VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible") assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_MD_PATH, "Missing required env vars" EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}" BOOKS_OT = [ "Geneza","Exodul","Leviticul","Numeri","Deuteronom","Iosua","Judecători","Rut", "1 Samuel","2 Samuel","1 Imparati","2 Imparati","1 Cronici","2 Cronici","Ezra","Neemia","Estera", "Iov","Psalmii","Proverbe","Eclesiastul","Cântarea Cântărilor","Isaia","Ieremia","Plângerile", "Ezechiel","Daniel","Osea","Ioel","Amos","Obadia","Iona","Mica","Naum","Habacuc","Țefania","Hagai","Zaharia","Maleahi" ] BOOKS_NT = [ "Matei","Marcu","Luca","Ioan","Faptele Apostolilor","Romani","1 Corinteni","2 Corinteni", "Galateni","Efeseni","Filipeni","Coloseni","1 Tesaloniceni","2 Tesaloniceni","1 Timotei","2 Timotei", "Titus","Filimon","Evrei","Iacov","1 Petru","2 Petru","1 Ioan","2 Ioan","3 Ioan","Iuda","Revelaţia" ] BOOK_CANON = {b:("OT" if b in BOOKS_OT else "NT") for b in BOOKS_OT + BOOKS_NT} @dataclass class Verse: testament: str book: str chapter: int verse: int text_raw: str text_norm: str def normalize_text(s: str) -> str: s = re.sub(r"\s+", " ", s.strip()) s = s.replace(" ", " ") return s BOOK_RE = re.compile(r"^(?P[A-ZĂÂÎȘȚ][^\n]+?)\s*$") CH_RE = re.compile(r"^(?i:Capitolul|CApitoLuL)\s+(?P\d+)\b") VERSE_RE = re.compile(r"^(?P\d+)\s+(?P.+)$") def parse_bible_md(md_text: str): cur_book, cur_ch = None, None testament = None is_in_bible_content = False for line in md_text.splitlines(): line = line.rstrip() # Start processing after "VECHIUL TESTAMENT" or when we find book markers if line == 'VECHIUL TESTAMENT' or line == 'TESTAMENT' or '…' in line: is_in_bible_content = True if not is_in_bible_content: continue # Book detection: … BookName … book_match = re.match(r'^…\s*(.+?)\s*…$', line) if book_match: bname = book_match.group(1).strip() if bname in BOOK_CANON: cur_book = bname testament = BOOK_CANON[bname] cur_ch = None print(f"Found book: {bname}") continue # Chapter detection: Capitolul X or CApitoLuL X m_ch = CH_RE.match(line) if m_ch and cur_book: cur_ch = int(m_ch.group("ch")) print(f" Chapter {cur_ch}") continue # Verse detection: starts with number m_v = VERSE_RE.match(line) if m_v and cur_book and cur_ch: vnum = int(m_v.group("v")) body = m_v.group("body").strip() # Remove paragraph markers body = re.sub(r'^¶\s*', '', body) raw = body norm = normalize_text(body) yield { "testament": testament, "book": cur_book, "chapter": cur_ch, "verse": vnum, "text_raw": raw, "text_norm": norm } async def embed_batch(client, inputs): payload = {"input": inputs} headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"} for attempt in range(6): try: r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60) if r.status_code == 200: data = r.json() ordered = sorted(data["data"], key=lambda x: x["index"]) return [d["embedding"] for d in ordered] elif r.status_code in (429, 500, 503): backoff = 2 ** attempt + (0.1 * attempt) print(f"Rate limited, waiting {backoff:.1f}s...") await asyncio.sleep(backoff) else: raise RuntimeError(f"Embedding error {r.status_code}: {r.text}") except Exception as e: backoff = 2 ** attempt + (0.1 * attempt) print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...") await asyncio.sleep(backoff) raise RuntimeError("Failed to embed after retries") def safe_ident(s: str) -> str: return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_") TABLE_BASENAME = f"bv_{safe_ident(LANG_CODE)}_{safe_ident(TRANSLATION)}" TABLE_FQN = f'"{VECTOR_SCHEMA}"."{TABLE_BASENAME}"' def create_table_sql() -> str: return f""" CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}"; CREATE TABLE IF NOT EXISTS {TABLE_FQN} ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), testament TEXT NOT NULL, book TEXT NOT NULL, chapter INT NOT NULL, verse INT NOT NULL, ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED, text_raw TEXT NOT NULL, text_norm TEXT NOT NULL, tsv tsvector, embedding vector({EMBED_DIMS}), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now() ); """ def create_indexes_sql() -> str: return f""" CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter, verse); CREATE INDEX IF NOT EXISTS idx_tsv_{TABLE_BASENAME} ON {TABLE_FQN} USING GIN (tsv); CREATE INDEX IF NOT EXISTS idx_book_ch_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter); CREATE INDEX IF NOT EXISTS idx_testament_{TABLE_BASENAME} ON {TABLE_FQN} (testament); """ def upsert_sql() -> str: return f""" INSERT INTO {TABLE_FQN} (testament, book, chapter, verse, text_raw, text_norm, tsv, embedding) VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(text_raw)s, %(text_norm)s, to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s) ON CONFLICT (book, chapter, verse) DO UPDATE SET text_raw=EXCLUDED.text_raw, text_norm=EXCLUDED.text_norm, tsv=EXCLUDED.tsv, embedding=EXCLUDED.embedding, updated_at=now(); """ async def main(): print("Starting Bible embedding ingestion...") md_text = Path(BIBLE_MD_PATH).read_text(encoding="utf-8", errors="ignore") verses = list(parse_bible_md(md_text)) print(f"Parsed verses: {len(verses)}") batch_size = 128 # First create the schema + table structure for this language/version with psycopg.connect(DB_URL) as conn: with conn.cursor() as cur: print(f"Creating schema '{VECTOR_SCHEMA}' and table {TABLE_FQN} ...") cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") cur.execute(create_table_sql()) cur.execute(create_indexes_sql()) conn.commit() print("Schema/table ready") # Now process embeddings async with httpx.AsyncClient() as client: with psycopg.connect(DB_URL, autocommit=False) as conn: with conn.cursor() as cur: for i in range(0, len(verses), batch_size): batch = verses[i:i+batch_size] inputs = [v["text_norm"] for v in batch] print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}") embs = await embed_batch(client, inputs) rows = [] for v, e in zip(batch, embs): rows.append({ **v, "ts_lang": "romanian" if LANG_CODE.lower().startswith("ro") else ("english" if LANG_CODE.lower().startswith("en") else "simple"), "embedding": e }) cur.executemany(upsert_sql(), rows) conn.commit() print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}") # Create IVFFLAT index after data is loaded print("Creating IVFFLAT index...") with psycopg.connect(DB_URL, autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f"VACUUM ANALYZE {TABLE_FQN};") cur.execute(f""" CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{TABLE_BASENAME} ON {TABLE_FQN} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 200); """) print("✅ Bible embedding ingestion completed successfully!") # Helpful pgAdmin queries: print("\nRun these sample queries in pgAdmin:") print(f"SELECT count(*) FROM {TABLE_FQN};") print(f"SELECT book, chapter, verse, left(text_raw, 80) AS preview FROM {TABLE_FQN} ORDER BY book, chapter, verse LIMIT 10;") print(f"SELECT * FROM {TABLE_FQN} WHERE book='Geneza' AND chapter=1 AND verse=1;") if __name__ == "__main__": asyncio.run(main())