Vector schema per version: add ai_bible schema with per-version tables (bv_<lang>_<abbr>) in Python ingest; dynamic table resolution in vector-search with fallback to legacy table; sample pgAdmin queries printed.

This commit is contained in:
andupetcu
2025-09-20 19:08:11 +03:00
parent 8b26d72c1c
commit 5ddf62e5cf
2 changed files with 168 additions and 90 deletions

View File

@@ -4,6 +4,44 @@ const pool = new Pool({
connectionString: process.env.DATABASE_URL, connectionString: process.env.DATABASE_URL,
}) })
const VECTOR_SCHEMA = process.env.VECTOR_SCHEMA || 'ai_bible'
function safeIdent(s: string): string {
return s.toLowerCase().replace(/[^a-z0-9_]+/g, '_').replace(/^_+|_+$/g, '')
}
// Resolve per-language default version and corresponding vector table name
// e.g. ai_bible.bv_ro_cornilescu
async function resolveVectorTable(language: string): Promise<{ table: string; exists: boolean }> {
const lang = safeIdent(language || 'ro')
const client = await pool.connect()
try {
// Get default version abbreviation from "BibleVersion"
const res = await client.query(
`SELECT "abbreviation" FROM "BibleVersion"
WHERE lower(language) = lower($1)
ORDER BY "isDefault" DESC, "createdAt" ASC
LIMIT 1`,
[language]
)
const abbr = res.rows?.[0]?.abbreviation || 'default'
const ab = safeIdent(abbr)
const table = `${VECTOR_SCHEMA}.bv_${lang}_${ab}`
// Check if table exists
const check = await client.query(
`SELECT EXISTS (
SELECT 1 FROM information_schema.tables
WHERE table_schema = $1 AND table_name = $2
) AS exists`,
[VECTOR_SCHEMA, `bv_${lang}_${ab}`]
)
return { table, exists: Boolean(check.rows?.[0]?.exists) }
} finally {
client.release()
}
}
export interface BibleVerse { export interface BibleVerse {
id: string id: string
ref: string ref: string
@@ -44,21 +82,29 @@ export async function searchBibleSemantic(
limit: number = 10 limit: number = 10
): Promise<BibleVerse[]> { ): Promise<BibleVerse[]> {
try { try {
const { table, exists } = await resolveVectorTable(language)
const queryEmbedding = await getEmbedding(query) const queryEmbedding = await getEmbedding(query)
const client = await pool.connect() const client = await pool.connect()
try { try {
const result = await client.query( const sql = exists
` ? `SELECT ref, book, chapter, verse, text_raw,
SELECT ref, book, chapter, verse, text_raw, 1 - (embedding <=> $1) AS similarity
FROM ${table}
WHERE embedding IS NOT NULL
ORDER BY embedding <=> $1
LIMIT $2`
: `SELECT ref, book, chapter, verse, text_raw,
1 - (embedding <=> $1) AS similarity 1 - (embedding <=> $1) AS similarity
FROM bible_passages FROM bible_passages
WHERE embedding IS NOT NULL AND lang = $3 WHERE embedding IS NOT NULL AND lang = $3
ORDER BY embedding <=> $1 ORDER BY embedding <=> $1
LIMIT $2 LIMIT $2`
`, const params = exists
[JSON.stringify(queryEmbedding), limit, language] ? [JSON.stringify(queryEmbedding), limit]
) : [JSON.stringify(queryEmbedding), limit, language]
const result = await client.query(sql, params)
return result.rows return result.rows
} finally { } finally {
@@ -76,6 +122,7 @@ export async function searchBibleHybrid(
limit: number = 10 limit: number = 10
): Promise<BibleVerse[]> { ): Promise<BibleVerse[]> {
try { try {
const { table, exists } = await resolveVectorTable(language)
const queryEmbedding = await getEmbedding(query) const queryEmbedding = await getEmbedding(query)
// Use appropriate text search configuration based on language // Use appropriate text search configuration based on language
@@ -83,9 +130,28 @@ export async function searchBibleHybrid(
const client = await pool.connect() const client = await pool.connect()
try { try {
const result = await client.query( const sql = exists
` ? `WITH vector_search AS (
WITH vector_search AS ( SELECT id, 1 - (embedding <=> $1) AS vector_sim
FROM ${table}
WHERE embedding IS NOT NULL
ORDER BY embedding <=> $1
LIMIT 100
),
text_search AS (
SELECT id, ts_rank(tsv, plainto_tsquery($4, $3)) AS text_rank
FROM ${table}
WHERE tsv @@ plainto_tsquery($4, $3)
)
SELECT bp.ref, bp.book, bp.chapter, bp.verse, bp.text_raw,
COALESCE(vs.vector_sim, 0) * 0.7 + COALESCE(ts.text_rank, 0) * 0.3 AS combined_score
FROM ${table} bp
LEFT JOIN vector_search vs ON vs.id = bp.id
LEFT JOIN text_search ts ON ts.id = bp.id
WHERE (vs.id IS NOT NULL OR ts.id IS NOT NULL)
ORDER BY combined_score DESC
LIMIT $2`
: `WITH vector_search AS (
SELECT id, 1 - (embedding <=> $1) AS vector_sim SELECT id, 1 - (embedding <=> $1) AS vector_sim
FROM bible_passages FROM bible_passages
WHERE embedding IS NOT NULL AND lang = $4 WHERE embedding IS NOT NULL AND lang = $4
@@ -104,10 +170,13 @@ export async function searchBibleHybrid(
LEFT JOIN text_search ts ON ts.id = bp.id LEFT JOIN text_search ts ON ts.id = bp.id
WHERE (vs.id IS NOT NULL OR ts.id IS NOT NULL) AND bp.lang = $4 WHERE (vs.id IS NOT NULL OR ts.id IS NOT NULL) AND bp.lang = $4
ORDER BY combined_score DESC ORDER BY combined_score DESC
LIMIT $2 LIMIT $2`
`,
[JSON.stringify(queryEmbedding), limit, query, language, textConfig] const params = exists
) ? [JSON.stringify(queryEmbedding), limit, query, textConfig]
: [JSON.stringify(queryEmbedding), limit, query, language, textConfig]
const result = await client.query(sql, params)
return result.rows return result.rows
} finally { } finally {
@@ -125,6 +194,8 @@ export async function getContextVerses(
verse: number, verse: number,
contextSize: number = 2 contextSize: number = 2
): Promise<BibleVerse[]> { ): Promise<BibleVerse[]> {
// For context, we can't infer language here; callers should use the main hybrid result to decide.
// For now, fallback to legacy table for context retrieval; can be extended to use per-language table.
const client = await pool.connect() const client = await pool.connect()
try { try {
const result = await client.query( const result = await client.query(

View File

@@ -18,6 +18,7 @@ DB_URL = os.getenv("DATABASE_URL")
BIBLE_MD_PATH = os.getenv("BIBLE_MD_PATH") BIBLE_MD_PATH = os.getenv("BIBLE_MD_PATH")
LANG_CODE = os.getenv("LANG_CODE", "ro") LANG_CODE = os.getenv("LANG_CODE", "ro")
TRANSLATION = os.getenv("TRANSLATION_CODE", "FIDELA") TRANSLATION = os.getenv("TRANSLATION_CODE", "FIDELA")
VECTOR_SCHEMA = os.getenv("VECTOR_SCHEMA", "ai_bible")
assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_MD_PATH, "Missing required env vars" assert AZ_ENDPOINT and AZ_API_KEY and DB_URL and BIBLE_MD_PATH, "Missing required env vars"
@@ -126,49 +127,51 @@ async def embed_batch(client, inputs):
await asyncio.sleep(backoff) await asyncio.sleep(backoff)
raise RuntimeError("Failed to embed after retries") raise RuntimeError("Failed to embed after retries")
# First, we need to create the table with proper SQL def safe_ident(s: str) -> str:
CREATE_TABLE_SQL = """ return re.sub(r"[^a-z0-9_]+", "_", s.lower()).strip("_")
CREATE TABLE IF NOT EXISTS bible_passages (
TABLE_BASENAME = f"bv_{safe_ident(LANG_CODE)}_{safe_ident(TRANSLATION)}"
TABLE_FQN = f'"{VECTOR_SCHEMA}"."{TABLE_BASENAME}"'
def create_table_sql() -> str:
return f"""
CREATE SCHEMA IF NOT EXISTS "{VECTOR_SCHEMA}";
CREATE TABLE IF NOT EXISTS {TABLE_FQN} (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(), id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
testament TEXT NOT NULL, testament TEXT NOT NULL,
book TEXT NOT NULL, book TEXT NOT NULL,
chapter INT NOT NULL, chapter INT NOT NULL,
verse INT NOT NULL, verse INT NOT NULL,
ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED, ref TEXT GENERATED ALWAYS AS (book || ' ' || chapter || ':' || verse) STORED,
lang TEXT NOT NULL DEFAULT 'ro',
translation TEXT NOT NULL DEFAULT 'FIDELA',
text_raw TEXT NOT NULL, text_raw TEXT NOT NULL,
text_norm TEXT NOT NULL, text_norm TEXT NOT NULL,
tsv tsvector, tsv tsvector,
embedding vector(1536), embedding vector({EMBED_DIMS}),
created_at TIMESTAMPTZ DEFAULT now(), created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now() updated_at TIMESTAMPTZ DEFAULT now()
); );
""" """
CREATE_INDEXES_SQL = """ def create_indexes_sql() -> str:
-- Uniqueness by canonical reference within translation/language return f"""
CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_lang ON bible_passages (translation, lang, book, chapter, verse); CREATE UNIQUE INDEX IF NOT EXISTS ux_ref_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter, verse);
CREATE INDEX IF NOT EXISTS idx_tsv_{TABLE_BASENAME} ON {TABLE_FQN} USING GIN (tsv);
CREATE INDEX IF NOT EXISTS idx_book_ch_{TABLE_BASENAME} ON {TABLE_FQN} (book, chapter);
CREATE INDEX IF NOT EXISTS idx_testament_{TABLE_BASENAME} ON {TABLE_FQN} (testament);
"""
-- Full-text index def upsert_sql() -> str:
CREATE INDEX IF NOT EXISTS idx_tsv ON bible_passages USING GIN (tsv); return f"""
INSERT INTO {TABLE_FQN} (testament, book, chapter, verse, text_raw, text_norm, tsv, embedding)
-- Other indexes VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(text_raw)s, %(text_norm)s,
CREATE INDEX IF NOT EXISTS idx_book_ch ON bible_passages (book, chapter);
CREATE INDEX IF NOT EXISTS idx_testament ON bible_passages (testament);
"""
UPSERT_SQL = """
INSERT INTO bible_passages (testament, book, chapter, verse, lang, translation, text_raw, text_norm, tsv, embedding)
VALUES (%(testament)s, %(book)s, %(chapter)s, %(verse)s, %(lang)s, %(translation)s, %(text_raw)s, %(text_norm)s,
to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s) to_tsvector(COALESCE(%(ts_lang)s,'simple')::regconfig, %(text_norm)s), %(embedding)s)
ON CONFLICT (translation, lang, book, chapter, verse) DO UPDATE ON CONFLICT (book, chapter, verse) DO UPDATE
SET text_raw=EXCLUDED.text_raw, SET text_raw=EXCLUDED.text_raw,
text_norm=EXCLUDED.text_norm, text_norm=EXCLUDED.text_norm,
tsv=EXCLUDED.tsv, tsv=EXCLUDED.tsv,
embedding=EXCLUDED.embedding, embedding=EXCLUDED.embedding,
updated_at=now(); updated_at=now();
""" """
async def main(): async def main():
print("Starting Bible embedding ingestion...") print("Starting Bible embedding ingestion...")
@@ -179,15 +182,15 @@ async def main():
batch_size = 128 batch_size = 128
# First create the table structure # First create the schema + table structure for this language/version
with psycopg.connect(DB_URL) as conn: with psycopg.connect(DB_URL) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
print("Creating bible_passages table...") print(f"Creating schema '{VECTOR_SCHEMA}' and table {TABLE_FQN} ...")
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
cur.execute(CREATE_TABLE_SQL) cur.execute(create_table_sql())
cur.execute(CREATE_INDEXES_SQL) cur.execute(create_indexes_sql())
conn.commit() conn.commit()
print("Table created successfully") print("Schema/table ready")
# Now process embeddings # Now process embeddings
async with httpx.AsyncClient() as client: async with httpx.AsyncClient() as client:
@@ -204,13 +207,11 @@ async def main():
for v, e in zip(batch, embs): for v, e in zip(batch, embs):
rows.append({ rows.append({
**v, **v,
"lang": LANG_CODE, "ts_lang": "romanian" if LANG_CODE.lower().startswith("ro") else ("english" if LANG_CODE.lower().startswith("en") else "simple"),
"translation": TRANSLATION,
"ts_lang": "romanian",
"embedding": e "embedding": e
}) })
cur.executemany(UPSERT_SQL, rows) cur.executemany(upsert_sql(), rows)
conn.commit() conn.commit()
print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}") print(f"Upserted {len(rows)} verses... {i+len(rows)}/{len(verses)}")
@@ -218,14 +219,20 @@ async def main():
print("Creating IVFFLAT index...") print("Creating IVFFLAT index...")
with psycopg.connect(DB_URL, autocommit=True) as conn: with psycopg.connect(DB_URL, autocommit=True) as conn:
with conn.cursor() as cur: with conn.cursor() as cur:
cur.execute("VACUUM ANALYZE bible_passages;") cur.execute(f"VACUUM ANALYZE {TABLE_FQN};")
cur.execute(""" cur.execute(f"""
CREATE INDEX IF NOT EXISTS idx_vec_ivfflat CREATE INDEX IF NOT EXISTS idx_vec_ivfflat_{TABLE_BASENAME}
ON bible_passages USING ivfflat (embedding vector_cosine_ops) ON {TABLE_FQN} USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 200); WITH (lists = 200);
""") """)
print("✅ Bible embedding ingestion completed successfully!") print("✅ Bible embedding ingestion completed successfully!")
# Helpful pgAdmin queries:
print("\nRun these sample queries in pgAdmin:")
print(f"SELECT count(*) FROM {TABLE_FQN};")
print(f"SELECT book, chapter, verse, left(text_raw, 80) AS preview FROM {TABLE_FQN} ORDER BY book, chapter, verse LIMIT 10;")
print(f"SELECT * FROM {TABLE_FQN} WHERE book='Geneza' AND chapter=1 AND verse=1;")
if __name__ == "__main__": if __name__ == "__main__":
asyncio.run(main()) asyncio.run(main())