#!/usr/bin/env python3 """ Import JSON Bible files into the database. Skips files under 500KB and handles database constraints properly. """ import os import json import psycopg from urllib.parse import urlparse from dotenv import load_dotenv from typing import Dict, List, Optional import sys from datetime import datetime import uuid # Load environment variables load_dotenv() def get_db_connection(): """Get connection to biblical-guide database""" db_url = os.getenv("DATABASE_URL") if not db_url: raise ValueError("DATABASE_URL environment variable not found") parsed = urlparse(db_url) conn_str = f"host={parsed.hostname} port={parsed.port or 5432} user={parsed.username} password={parsed.password} dbname=biblical-guide" return psycopg.connect(conn_str) def get_file_size_kb(file_path: str) -> float: """Get file size in KB""" return os.path.getsize(file_path) / 1024 def load_json_file(file_path: str) -> Optional[Dict]: """Load and parse JSON file""" try: with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"āŒ Error loading {file_path}: {e}") return None def get_language_code(language: str) -> str: """Convert language to proper ISO code""" lang_map = { 'english': 'en', 'spanish': 'es', 'french': 'fr', 'german': 'de', 'portuguese': 'pt', 'italian': 'it', 'dutch': 'nl', 'russian': 'ru', 'chinese': 'zh', 'japanese': 'ja', 'korean': 'ko', 'arabic': 'ar', 'hindi': 'hi', 'romanian': 'ro' } lower_lang = language.lower() for key, code in lang_map.items(): if key in lower_lang: return code # Default to first 2 characters if no mapping found return lower_lang[:2] if len(lower_lang) >= 2 else 'xx' def delete_existing_bible_version(conn, abbreviation: str, language: str) -> bool: """Delete existing Bible version if it exists""" try: with conn.cursor() as cur: # Get the version ID first cur.execute(''' SELECT id FROM "BibleVersion" WHERE abbreviation = %s AND language = %s ''', (abbreviation, language)) result = cur.fetchone() if not result: return False # No existing version version_id = result[0] # Delete in order of foreign key dependencies # First get book IDs for this version cur.execute('SELECT id FROM "BibleBook" WHERE "versionId" = %s', (version_id,)) book_ids = [row[0] for row in cur.fetchall()] if book_ids: # Get chapter IDs for these books cur.execute('SELECT id FROM "BibleChapter" WHERE "bookId" = ANY(%s)', (book_ids,)) chapter_ids = [row[0] for row in cur.fetchall()] if chapter_ids: # Delete verses for these chapters cur.execute('DELETE FROM "BibleVerse" WHERE "chapterId" = ANY(%s)', (chapter_ids,)) verses_deleted = cur.rowcount # Delete chapters cur.execute('DELETE FROM "BibleChapter" WHERE "bookId" = ANY(%s)', (book_ids,)) chapters_deleted = cur.rowcount else: verses_deleted = chapters_deleted = 0 # Delete books cur.execute('DELETE FROM "BibleBook" WHERE "versionId" = %s', (version_id,)) books_deleted = cur.rowcount else: verses_deleted = chapters_deleted = books_deleted = 0 # Finally delete the version cur.execute('DELETE FROM "BibleVersion" WHERE id = %s', (version_id,)) conn.commit() print(f"šŸ”„ Replaced existing version {abbreviation} ({language}): {books_deleted} books, {chapters_deleted} chapters, {verses_deleted} verses") return True except Exception as e: conn.rollback() print(f"āŒ Error deleting existing version {abbreviation}: {e}") return False def bible_version_exists(conn, abbreviation: str, language: str) -> bool: """Check if Bible version already exists""" with conn.cursor() as cur: cur.execute(''' SELECT COUNT(*) FROM "BibleVersion" WHERE abbreviation = %s AND language = %s ''', (abbreviation, language)) return cur.fetchone()[0] > 0 def import_bible_version(conn, bible_data: Dict) -> Optional[str]: """Import a Bible version and return its ID""" try: # Extract and clean data name = bible_data.get('name', '').strip() abbreviation = bible_data.get('abbreviation', '').strip() language = get_language_code(bible_data.get('language', '')) description = bible_data.get('description', '').strip() country = bible_data.get('country', '').strip() english_title = bible_data.get('englishTitle', '').strip() zip_file_url = bible_data.get('zipFileUrl', '').strip() flag_image_url = bible_data.get('flagImageUrl', '').strip() is_default = bible_data.get('isDefault', False) # Validate required fields if not name or not abbreviation: print(f"āš ļø Skipping Bible: missing name or abbreviation") return None # Replace existing version if it exists delete_existing_bible_version(conn, abbreviation, language) # Insert Bible version version_id = str(uuid.uuid4()) with conn.cursor() as cur: cur.execute(''' INSERT INTO "BibleVersion" ( id, name, abbreviation, language, description, country, "englishTitle", "zipFileUrl", "flagImageUrl", "isDefault", "createdAt", "updatedAt" ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) ''', ( version_id, name, abbreviation, language, description, country, english_title, zip_file_url, flag_image_url, is_default )) conn.commit() print(f"āœ… Created Bible version: {name} ({abbreviation})") return version_id except Exception as e: conn.rollback() print(f"āŒ Error importing Bible version: {e}") return None def import_bible_books(conn, version_id: str, books_data: List[Dict]) -> int: """Import Bible books for a version""" imported_count = 0 try: for book_data in books_data: book_key = book_data.get('bookKey', '').strip() name = book_data.get('name', '').strip() testament = book_data.get('testament', '').strip() order_num = book_data.get('orderNum', 0) chapters_data = book_data.get('chapters', []) if not book_key or not name or not testament: print(f"āš ļø Skipping book: missing required fields") continue # Insert book book_id = str(uuid.uuid4()) with conn.cursor() as cur: cur.execute(''' INSERT INTO "BibleBook" ( id, "versionId", name, testament, "orderNum", "bookKey" ) VALUES (%s, %s, %s, %s, %s, %s) ''', (book_id, version_id, name, testament, order_num, book_key)) # Import chapters for this book chapters_imported = import_bible_chapters(conn, book_id, chapters_data) if chapters_imported > 0: imported_count += 1 print(f" šŸ“– {name}: {chapters_imported} chapters") conn.commit() return imported_count except Exception as e: conn.rollback() print(f"āŒ Error importing books: {e}") return 0 def import_bible_chapters(conn, book_id: str, chapters_data: List[Dict]) -> int: """Import Bible chapters for a book""" imported_count = 0 try: for chapter_data in chapters_data: chapter_num = chapter_data.get('chapterNum', 0) verses_data = chapter_data.get('verses', []) if chapter_num <= 0: print(f"āš ļø Skipping chapter: invalid chapter number") continue # Insert chapter chapter_id = str(uuid.uuid4()) with conn.cursor() as cur: cur.execute(''' INSERT INTO "BibleChapter" ( id, "bookId", "chapterNum" ) VALUES (%s, %s, %s) ''', (chapter_id, book_id, chapter_num)) # Import verses for this chapter verses_imported = import_bible_verses(conn, chapter_id, verses_data) if verses_imported > 0: imported_count += 1 return imported_count except Exception as e: print(f"āŒ Error importing chapters: {e}") return 0 def import_bible_verses(conn, chapter_id: str, verses_data: List[Dict]) -> int: """Import Bible verses for a chapter""" imported_count = 0 try: # Batch insert verses for better performance verses_to_insert = [] for verse_data in verses_data: verse_num = verse_data.get('verseNum', 0) text = verse_data.get('text', '').strip() if verse_num <= 0 or not text: continue verse_id = str(uuid.uuid4()) verses_to_insert.append((verse_id, chapter_id, verse_num, text)) if verses_to_insert: with conn.cursor() as cur: cur.executemany(''' INSERT INTO "BibleVerse" ( id, "chapterId", "verseNum", text ) VALUES (%s, %s, %s, %s) ''', verses_to_insert) imported_count = len(verses_to_insert) return imported_count except Exception as e: print(f"āŒ Error importing verses: {e}") return 0 def main(): """Main import function""" print("šŸš€ Starting JSON Bible import...") json_dir = os.path.join(os.getcwd(), 'bibles', 'json') if not os.path.exists(json_dir): print(f"āŒ JSON directory not found: {json_dir}") sys.exit(1) # Get all JSON Bible files json_files = [f for f in os.listdir(json_dir) if f.endswith('_bible.json')] print(f"šŸ“ Found {len(json_files)} JSON Bible files") # Filter by file size (skip files under 500KB) valid_files = [] skipped_small = 0 for file in json_files: file_path = os.path.join(json_dir, file) size_kb = get_file_size_kb(file_path) if size_kb >= 500: valid_files.append((file, file_path, size_kb)) else: skipped_small += 1 print(f"šŸ“ Filtered files: {len(valid_files)} valid (≄500KB), {skipped_small} skipped (<500KB)") # Sort by file size (largest first for better progress visibility) valid_files.sort(key=lambda x: x[2], reverse=True) # Connect to database try: conn = get_db_connection() print("šŸ”— Connected to database") except Exception as e: print(f"āŒ Database connection failed: {e}") sys.exit(1) # Import statistics stats = { 'total_files': len(valid_files), 'imported': 0, 'skipped': 0, 'errors': 0, 'total_books': 0, 'total_chapters': 0, 'total_verses': 0 } # Process each file for i, (filename, file_path, size_kb) in enumerate(valid_files, 1): print(f"\nšŸ“– [{i}/{len(valid_files)}] Processing {filename} ({size_kb:.1f}KB)") try: # Load JSON data bible_data = load_json_file(file_path) if not bible_data: stats['errors'] += 1 continue # Import Bible version version_id = import_bible_version(conn, bible_data) if not version_id: stats['skipped'] += 1 continue # Import books books_data = bible_data.get('books', []) books_imported = import_bible_books(conn, version_id, books_data) if books_imported > 0: stats['imported'] += 1 stats['total_books'] += books_imported # Count chapters and verses for book in books_data: chapters = book.get('chapters', []) stats['total_chapters'] += len(chapters) for chapter in chapters: stats['total_verses'] += len(chapter.get('verses', [])) print(f"āœ… Successfully imported {books_imported} books") else: stats['errors'] += 1 # Progress update every 10 files if i % 10 == 0: progress = (i / len(valid_files)) * 100 print(f"\nšŸ“ˆ Progress: {progress:.1f}% ({stats['imported']} imported, {stats['skipped']} skipped, {stats['errors']} errors)") except Exception as e: print(f"āŒ Error processing {filename}: {e}") stats['errors'] += 1 # Close database connection conn.close() # Final summary print(f"\nšŸŽ‰ JSON Bible import completed!") print(f"šŸ“Š Final Statistics:") print(f" - Total files processed: {stats['total_files']}") print(f" - Successfully imported: {stats['imported']}") print(f" - Skipped (duplicates): {stats['skipped']}") print(f" - Errors: {stats['errors']}") print(f" - Files skipped (<500KB): {skipped_small}") print(f" - Total books imported: {stats['total_books']}") print(f" - Total chapters imported: {stats['total_chapters']}") print(f" - Total verses imported: {stats['total_verses']}") if __name__ == "__main__": try: main() except KeyboardInterrupt: print("\nāš ļø Import interrupted by user") sys.exit(1) except Exception as e: print(f"āŒ Fatal error: {e}") import traceback traceback.print_exc() sys.exit(1)