Files
biblical-guide.com/scripts/ingest_bible_pgvector.py

239 lines
9.4 KiB
Python

import os, re, json, math, time, asyncio
from typing import List, Dict, Tuple, Iterable
from dataclasses import dataclass
from pathlib import Path
from dotenv import load_dotenv
import httpx
import psycopg
from psycopg.rows import dict_row
load_dotenv()
AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/")
AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY")
AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview")
AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3")
EMBED_DIMS = int(os.getenv("EMBED_DIMS", "3072"))
DB_URL = os.getenv("DATABASE_URL")
BIBLE_MD_PATH = os.getenv("BIBLE_MD_PATH")
LANG_CODE = os.getenv("LANG_CODE", "ro")
TRANSLATION = os.getenv("TRANSLATION_CODE", "FIDELA")
VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible")
assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_MD_PATH, "Missing required env vars"
EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}"
BOOKS_OT = [
"Geneza","Exodul","Leviticul","Numeri","Deuteronom","Iosua","Judecători","Rut",
"1 Samuel","2 Samuel","1 Imparati","2 Imparati","1 Cronici","2 Cronici","Ezra","Neemia","Estera",
"Iov","Psalmii","Proverbe","Eclesiastul","Cântarea Cântărilor","Isaia","Ieremia","Plângerile",
"Ezechiel","Daniel","Osea","Ioel","Amos","Obadia","Iona","Mica","Naum","Habacuc","Țefania","Hagai","Zaharia","Maleahi"
]
BOOKS_NT = [
"Matei","Marcu","Luca","Ioan","Faptele Apostolilor","Romani","1 Corinteni","2 Corinteni",
"Galateni","Efeseni","Filipeni","Coloseni","1 Tesaloniceni","2 Tesaloniceni","1 Timotei","2 Timotei",
"Titus","Filimon","Evrei","Iacov","1 Petru","2 Petru","1 Ioan","2 Ioan","3 Ioan","Iuda","Revelaţia"
]
BOOK_CANON = {b:("OT" if b in BOOKS_OT else "NT") for b in BOOKS_OT + BOOKS_NT}
@dataclass
class Verse:
testament: str
book: str
chapter: int
verse: int
text_raw: str
text_norm: str
def normalize_text(s: str) -> str:
s = re.sub(r"\s+", " ", s.strip())
s = s.replace(" ", " ")
return s
BOOK_RE = re.compile(r"^(?P<book>[A-ZĂÂÎȘȚ][^\n]+?)\s*$")
CH_RE = re.compile(r"^(?i:Capitolul|CApitoLuL)\s+(?P<ch>\d+)\b")
VERSE_RE = re.compile(r"^(?P<v>\d+)\s+(?P<body>.+)$")
def parse_bible_md(md_text: str):
cur_book, cur_ch = None, None
testament = None
is_in_bible_content = False
for line in md_text.splitlines():
line = line.rstrip()
# Start processing after "VECHIUL TESTAMENT" or when we find book markers
if line == 'VECHIUL TESTAMENT' or line == 'TESTAMENT' or '' in line:
is_in_bible_content = True
if not is_in_bible_content:
continue
# Book detection: … BookName …
book_match = re.match(r'^…\s*(.+?)\s*…$', line)
if book_match:
bname = book_match.group(1).strip()
if bname in BOOK_CANON:
cur_book = bname
testament = BOOK_CANON[bname]
cur_ch = None
print(f"Found book: {bname}")
continue
# Chapter detection: Capitolul X or CApitoLuL X
m_ch = CH_RE.match(line)
if m_ch and cur_book:
cur_ch = int(m_ch.group("ch"))
print(f" Chapter {cur_ch}")
continue
# Verse detection: starts with number
m_v = VERSE_RE.match(line)
if m_v and cur_book and cur_ch:
vnum = int(m_v.group("v"))
body = m_v.group("body").strip()
# Remove paragraph markers
body = re.sub(r'\s*', '', body)
raw = body
norm = normalize_text(body)
yield {
"testament": testament, "book": cur_book, "chapter": cur_ch, "verse": vnum,
"text_raw": raw, "text_norm": norm
}
async def embed_batch(client, inputs):
payload = {"input": inputs}
headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"}
for attempt in range(6):
try:
r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60)
if r.status_code == 200:
data = r.json()
ordered = sorted(data["data"], key=lambda x: x["index"])
return [d["embedding"] for d in ordered]
elif r.status_code in (429, 500, 503):
backoff = 2 ** attempt + (0.1 * attempt)
print(f"Rate limited, waiting {backoff:.1f}s...")
await asyncio.sleep(backoff)
else:
raise RuntimeError(f"Embedding error {r.status_code}: {r.text}")
except Exception as e:
backoff = 2 ** attempt + (0.1 * attempt)
print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...")
await asyncio.sleep(backoff)
raise RuntimeError("Failed to embed after retries")
def safe_ident(s: str) -> str:
return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_")
TABLE_BASENAME = f"bv_{safe_ident(LANG_CODE)}_{safe_ident(TRANSLATION)}"
TABLE_FQN = f'"{VECTOR_SCHEMA}"."{TABLE_BASENAME}"'
def create_table_sql() -> str:
return f"""
CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}";
CREATE TABLE IF NOT EXISTS {TABLE_FQN} (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
testament TEXT NOT NULL,
book TEXT NOT NULL,
chapter INT NOT NULL,
verse INT NOT NULL,
ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED,
text_raw TEXT NOT NULL,
text_norm TEXT NOT NULL,
tsv tsvector,
embedding vector({EMBED_DIMS}),
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
"""
def create_indexes_sql() -> str:
return f"""
CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter, verse);
CREATE INDEX IF NOT EXISTS idx_tsv_{TABLE_BASENAME} ON {TABLE_FQN} USING GIN (tsv);
CREATE INDEX IF NOT EXISTS idx_book_ch_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter);
CREATE INDEX IF NOT EXISTS idx_testament_{TABLE_BASENAME} ON {TABLE_FQN} (testament);
"""
def upsert_sql() -> str:
return f"""
INSERT INTO {TABLE_FQN} (testament, book, chapter, verse, text_raw, text_norm, tsv, embedding)
VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(text_raw)s, %(text_norm)s,
to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s)
ON CONFLICT (book, chapter, verse) DO UPDATE
SET text_raw=EXCLUDED.text_raw,
text_norm=EXCLUDED.text_norm,
tsv=EXCLUDED.tsv,
embedding=EXCLUDED.embedding,
updated_at=now();
"""
async def main():
print("Starting Bible embedding ingestion...")
md_text = Path(BIBLE_MD_PATH).read_text(encoding="utf-8", errors="ignore")
verses = list(parse_bible_md(md_text))
print(f"Parsed verses: {len(verses)}")
batch_size = 128
# First create the schema + table structure for this language/version
with psycopg.connect(DB_URL) as conn:
with conn.cursor() as cur:
print(f"Creating schema '{VECTOR_SCHEMA}' and table {TABLE_FQN} ...")
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
cur.execute(create_table_sql())
cur.execute(create_indexes_sql())
conn.commit()
print("Schema/table ready")
# Now process embeddings
async with httpx.AsyncClient() as client:
with psycopg.connect(DB_URL, autocommit=False) as conn:
with conn.cursor() as cur:
for i in range(0, len(verses), batch_size):
batch = verses[i:i+batch_size]
inputs = [v["text_norm"] for v in batch]
print(f"Generating embeddings for batch {i//batch_size + 1}/{(len(verses) + batch_size - 1)//batch_size}")
embs = await embed_batch(client, inputs)
rows = []
for v, e in zip(batch, embs):
rows.append({
**v,
"ts_lang": "romanian" if LANG_CODE.lower().startswith("ro") else ("english" if LANG_CODE.lower().startswith("en") else "simple"),
"embedding": e
})
cur.executemany(upsert_sql(), rows)
conn.commit()
print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}")
# Create IVFFLAT index after data is loaded
print("Creating IVFFLAT index...")
with psycopg.connect(DB_URL, autocommit=True) as conn:
with conn.cursor() as cur:
cur.execute(f"VACUUM ANALYZE {TABLE_FQN};")
cur.execute(f"""
CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{TABLE_BASENAME}
ON {TABLE_FQN} USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 200);
""")
print("✅ Bible embedding ingestion completed successfully!")
# Helpful pgAdmin queries:
print("\nRun these sample queries in pgAdmin:")
print(f"SELECT count(*) FROM {TABLE_FQN};")
print(f"SELECT book, chapter, verse, left(text_raw, 80) AS preview FROM {TABLE_FQN} ORDER BY book, chapter, verse LIMIT 10;")
print(f"SELECT * FROM {TABLE_FQN} WHERE book='Geneza' AND chapter=1 AND verse=1;")
if __name__ == "__main__":
asyncio.run(main())