import os, re, json, math, time, asyncio from typing import List, Dict, Tuple, Iterable from dataclasses import dataclass from pathlib import Path from dotenv import load_dotenv import httpx import psycopg from psycopg.rows import dict_row load_dotenv() AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/") AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY") AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview") AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3") EMBED_DIMS = int(os.getenv("EMBED_DIMS", "3072")) DB_URL = os.getenv("DATABASE_URL") BIBLE_MD_PATH = os.getenv("BIBLE_MD_PATH") LANG_CODE = os.getenv("LANG_CODE", "ro") TRANSLATION = os.getenv("TRANSLATION_CODE", "FIDELA") assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_MD_PATH, "Missing required env vars" EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}" BOOKS_OT = [ "Geneza","Exodul","Leviticul","Numeri","Deuteronom","Iosua","Judecători","Rut", "1 Samuel","2 Samuel","1 Imparati","2 Imparati","1 Cronici","2 Cronici","Ezra","Neemia","Estera", "Iov","Psalmii","Proverbe","Eclesiastul","Cântarea Cântărilor","Isaia","Ieremia","Plângerile", "Ezechiel","Daniel","Osea","Ioel","Amos","Obadia","Iona","Mica","Naum","Habacuc","Țefania","Hagai","Zaharia","Maleahi" ] BOOKS_NT = [ "Matei","Marcu","Luca","Ioan","Faptele Apostolilor","Romani","1 Corinteni","2 Corinteni", "Galateni","Efeseni","Filipeni","Coloseni","1 Tesaloniceni","2 Tesaloniceni","1 Timotei","2 Timotei", "Titus","Filimon","Evrei","Iacov","1 Petru","2 Petru","1 Ioan","2 Ioan","3 Ioan","Iuda","Revelaţia" ] BOOK_CANON = {b:("OT" if b in BOOKS_OT else "NT") for b in BOOKS_OT + BOOKS_NT} @dataclass class Verse: testament: str book: str chapter: int verse: int text_raw: str text_norm: str def normalize_text(s: str) -> str: s = re.sub(r"\s+", " ", s.strip()) s = s.replace(" ", " ") return s BOOK_RE = re.compile(r"^(?P[A-ZĂÂÎȘȚ][^\n]+?)\s*$") CH_RE = re.compile(r"^(?i:Capitolul|CApitoLuL)\s+(?P\d+)\b") VERSE_RE = re.compile(r"^(?P\d+)\s+(?P.+)$") def parse_bible_md(md_text: str): cur_book, cur_ch = None, None testament = None is_in_bible_content = False for line in md_text.splitlines(): line = line.rstrip() # Start processing after "VECHIUL TESTAMENT" or when we find book markers if line == 'VECHIUL TESTAMENT' or line == 'TESTAMENT' or '…' in line: is_in_bible_content = True if not is_in_bible_content: continue # Book detection: … BookName … book_match = re.match(r'^…\s*(.+?)\s*…$', line) if book_match: bname = book_match.group(1).strip() if bname in BOOK_CANON: cur_book = bname testament = BOOK_CANON[bname] cur_ch = None print(f"Found book: {bname}") continue # Chapter detection: Capitolul X or CApitoLuL X m_ch = CH_RE.match(line) if m_ch and cur_book: cur_ch = int(m_ch.group("ch")) print(f" Chapter {cur_ch}") continue # Verse detection: starts with number m_v = VERSE_RE.match(line) if m_v and cur_book and cur_ch: vnum = int(m_v.group("v")) body = m_v.group("body").strip() # Remove paragraph markers body = re.sub(r'^¶\s*', '', body) raw = body norm = normalize_text(body) yield { "testament": testament, "book": cur_book, "chapter": cur_ch, "verse": vnum, "text_raw": raw, "text_norm": norm } async def embed_batch(client, inputs): payload = {"input": inputs} headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"} for attempt in range(6): try: r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60) if r.status_code == 200: data = r.json() ordered = sorted(data["data"], key=lambda x: x["index"]) return [d["embedding"] for d in ordered] elif r.status_code in (429, 500, 503): backoff = 2 ** attempt + (0.1 * attempt) print(f"Rate limited, waiting {backoff:.1f}s...") await asyncio.sleep(backoff) else: raise RuntimeError(f"Embedding error {r.status_code}: {r.text}") except Exception as e: backoff = 2 ** attempt + (0.1 * attempt) print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...") await asyncio.sleep(backoff) raise RuntimeError("Failed to embed after retries") # First, we need to create the table with proper SQL CREATE_TABLE_SQL = """ CREATE TABLE IF NOT EXISTS bible_passages ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), testament TEXT NOT NULL, book TEXT NOT NULL, chapter INT NOT NULL, verse INT NOT NULL, ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED, lang TEXT NOT NULL DEFAULT 'ro', translation TEXT NOT NULL DEFAULT 'FIDELA', text_raw TEXT NOT NULL, text_norm TEXT NOT NULL, tsv tsvector, embedding vector(1536), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now() ); """ CREATE_INDEXES_SQL = """ -- Uniqueness by canonical reference within translation/language CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_lang ON bible_passages (translation, lang, book, chapter, verse); -- Full-text index CREATE INDEX IF NOT EXISTS idx_tsv ON bible_passages USING GIN (tsv); -- Other indexes CREATE INDEX IF NOT EXISTS idx_book_ch ON bible_passages (book, chapter); CREATE INDEX IF NOT EXISTS idx_testament ON bible_passages (testament); """ UPSERT_SQL = """ INSERT INTO bible_passages (testament, book, chapter, verse, lang, translation, text_raw, text_norm, tsv, embedding) VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(lang)s, %(translation)s, %(text_raw)s, %(text_norm)s, to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s) ON CONFLICT (translation, lang, book, chapter, verse) DO UPDATE SET text_raw=EXCLUDED.text_raw, text_norm=EXCLUDED.text_norm, tsv=EXCLUDED.tsv, embedding=EXCLUDED.embedding, updated_at=now(); """ async def main(): print("Starting Bible embedding ingestion...") md_text = Path(BIBLE_MD_PATH).read_text(encoding="utf-8", errors="ignore") verses = list(parse_bible_md(md_text)) print(f"Parsed verses: {len(verses)}") batch_size = 128 # First create the table structure with psycopg.connect(DB_URL) as conn: with conn.cursor() as cur: print("Creating bible_passages table...") cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") cur.execute(CREATE_TABLE_SQL) cur.execute(CREATE_INDEXES_SQL) conn.commit() print("Table created successfully") # Now process embeddings async with httpx.AsyncClient() as client: with psycopg.connect(DB_URL, autocommit=False) as conn: with conn.cursor() as cur: for i in range(0, len(verses), batch_size): batch = verses[i:i+batch_size] inputs = [v["text_norm"] for v in batch] print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}") embs = await embed_batch(client, inputs) rows = [] for v, e in zip(batch, embs): rows.append({ **v, "lang": LANG_CODE, "translation": TRANSLATION, "ts_lang": "romanian", "embedding": e }) cur.executemany(UPSERT_SQL, rows) conn.commit() print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}") # Create IVFFLAT index after data is loaded print("Creating IVFFLAT index...") with psycopg.connect(DB_URL, autocommit=True) as conn: with conn.cursor() as cur: cur.execute("VACUUM ANALYZE bible_passages;") cur.execute(""" CREATE INDEX IF NOT EXISTS idx_vec_ivfflat ON bible_passages USING ivfflat (embedding vector_cosine_ops) WITH (lists = 200); """) print("✅ Bible embedding ingestion completed successfully!") if __name__ == "__main__": asyncio.run(main())