import os, json, re, asyncio from pathlib import Path from typing import List, Dict from dotenv import load_dotenv import httpx import psycopg load_dotenv() AZ_ENDPOINT = os.getenv("AZURE_OPENAI_ENDPOINT", "").rstrip("/") AZ_API_KEY = os.getenv("AZURE_OPENAI_KEY") AZ_API_VER = os.getenv("AZURE_OPENAI_API_VERSION", "2024-05-01-preview") AZ_DEPLOYMENT = os.getenv("AZURE_OPENAI_EMBED_DEPLOYMENT", "embed-3") EMBED_DIMS = int(os.getenv("EMBED_DIMS", "3072")) DB_URL = os.getenv("DATABASE_URL") VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible") LANG_CODE = os.getenv("LANG_CODE", "en") TRANSLATION = os.getenv("TRANSLATION_CODE", "WEB") JSON_DIR = os.getenv("JSON_DIR", f"data/en_bible/{TRANSLATION}") assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and JSON_DIR, "Missing required env vars" EMBED_URL = f"{AZ_ENDPOINT}/openai/deployments/{AZ_DEPLOYMENT}/embeddings?api-version={AZ_API_VER}" def safe_ident(s: str) -> str: return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_") TABLE_BASENAME = f"bv_{safe_ident(LANG_CODE)}_{safe_ident(TRANSLATION)}" TABLE_FQN = f'"{VECTOR_SCHEMA}"."{TABLE_BASENAME}"' def create_table_sql() -> str: return f""" CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}"; CREATE TABLE IF NOT EXISTS {TABLE_FQN} ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), testament TEXT NOT NULL, book TEXT NOT NULL, chapter INT NOT NULL, verse INT NOT NULL, ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED, text_raw TEXT NOT NULL, text_norm TEXT NOT NULL, tsv tsvector, embedding vector({EMBED_DIMS}), created_at TIMESTAMPTZ DEFAULT now(), updated_at TIMESTAMPTZ DEFAULT now() ); """ def create_indexes_sql() -> str: return f""" CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter, verse); CREATE INDEX IF NOT EXISTS idx_tsv_{TABLE_BASENAME} ON {TABLE_FQN} USING GIN (tsv); CREATE INDEX IF NOT EXISTS idx_book_ch_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter); CREATE INDEX IF NOT EXISTS idx_testament_{TABLE_BASENAME} ON {TABLE_FQN} (testament); """ def upsert_sql() -> str: return f""" INSERT INTO {TABLE_FQN} (testament, book, chapter, verse, text_raw, text_norm, tsv, embedding) VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(text_raw)s, %(text_norm)s, to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s) ON CONFLICT (book, chapter, verse) DO UPDATE SET text_raw=EXCLUDED.text_raw, text_norm=EXCLUDED.text_norm, tsv=EXCLUDED.tsv, embedding=EXCLUDED.embedding, updated_at=now(); """ def normalize(s: str) -> str: s = re.sub(r"\s+", " ", s.strip()) return s async def embed_batch(client: httpx.AsyncClient, inputs: List[str]) -> List[List[float]]: payload = {"input": inputs} headers = {"api-key": AZ_API_KEY, "Content-Type": "application/json"} for attempt in range(6): try: r = await client.post(EMBED_URL, headers=headers, json=payload, timeout=60) if r.status_code == 200: data = r.json() ordered = sorted(data["data"], key=lambda x: x["index"]) return [d["embedding"] for d in ordered] elif r.status_code in (429, 500, 502, 503): backoff = 2 ** attempt + (0.25 * attempt) print(f"Rate/Server limited ({r.status_code}), waiting {backoff:.1f}s...") await asyncio.sleep(backoff) else: raise RuntimeError(f"Embedding error {r.status_code}: {r.text}") except Exception as e: backoff = 2 ** attempt + (0.25 * attempt) print(f"Error on attempt {attempt + 1}: {e}, waiting {backoff:.1f}s...") await asyncio.sleep(backoff) raise RuntimeError("Failed to embed after retries") def load_json() -> List[Dict]: ot = json.loads(Path(Path(JSON_DIR)/'old_testament.json').read_text('utf-8')) nt = json.loads(Path(Path(JSON_DIR)/'new_testament.json').read_text('utf-8')) verses = [] for test in (ot, nt): testament = test.get('testament') for book in test.get('books', []): bname = book.get('name') for ch in book.get('chapters', []): cnum = int(ch.get('chapterNum')) for v in ch.get('verses', []): vnum = int(v.get('verseNum')) text = str(v.get('text') or '').strip() if text: verses.append({ 'testament': testament, 'book': bname, 'chapter': cnum, 'verse': vnum, 'text_raw': text, 'text_norm': normalize(text), }) return verses async def main(): print("Starting JSON embedding ingestion...", JSON_DIR) verses = load_json() print("Verses loaded:", len(verses)) batch_size = int(os.getenv('BATCH_SIZE', '128')) # Prepare schema/table with psycopg.connect(DB_URL) as conn: with conn.cursor() as cur: print(f"Ensuring schema/table {TABLE_FQN} ...") cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") cur.execute(create_table_sql()) cur.execute(create_indexes_sql()) conn.commit() async with httpx.AsyncClient() as client: with psycopg.connect(DB_URL, autocommit=False) as conn: with conn.cursor() as cur: for i in range(0, len(verses), batch_size): batch = verses[i:i+batch_size] inputs = [v['text_norm'] for v in batch] embs = await embed_batch(client, inputs) rows = [] ts_lang = 'english' if LANG_CODE.lower().startswith('en') else 'simple' for v, e in zip(batch, embs): rows.append({ **v, 'ts_lang': ts_lang, 'embedding': e }) cur.executemany(upsert_sql(), rows) conn.commit() print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}") print("Creating IVFFLAT index...") with psycopg.connect(DB_URL, autocommit=True) as conn: with conn.cursor() as cur: cur.execute(f"VACUUM ANALYZE {TABLE_FQN};") try: cur.execute(f""" CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{TABLE_BASENAME} ON {TABLE_FQN} USING ivfflat (embedding vector_cosine_ops) WITH (lists = 200); """) except Exception as e: print('IVFFLAT creation skipped (tune maintenance_work_mem):', e) print("✅ JSON embedding ingestion completed successfully!") if __name__ == '__main__': asyncio.run(main())