Files
biblical-guide.com/scripts/import_json_bibles.py
Andrei 4303e48fac Fix Next.js 15 compatibility and TypeScript errors
- Update API route handlers to use async params for Next.js 15 compatibility
- Fix MUI DataGrid deprecated props (pageSize -> initialState.pagination)
- Replace Material-UI Grid components with Box for better compatibility
- Fix admin authentication system with proper request parameters
- Update permission constants to match available AdminPermission enum values
- Add missing properties to Page interface for type safety
- Update .gitignore to exclude venv/, import logs, and large data directories
- Optimize Next.js config to reduce memory usage during builds

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-24 09:54:13 +00:00

408 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Import JSON Bible files into the database.
Skips files under 500KB and handles database constraints properly.
"""
import os
import json
import psycopg
from urllib.parse import urlparse
from dotenv import load_dotenv
from typing import Dict, List, Optional
import sys
from datetime import datetime
import uuid
# Load environment variables
load_dotenv()
def get_db_connection():
"""Get connection to biblical-guide database"""
db_url = os.getenv("DATABASE_URL")
if not db_url:
raise ValueError("DATABASE_URL environment variable not found")
parsed = urlparse(db_url)
conn_str = f"host={parsed.hostname} port={parsed.port or 5432} user={parsed.username} password={parsed.password} dbname=biblical-guide"
return psycopg.connect(conn_str)
def get_file_size_kb(file_path: str) -> float:
"""Get file size in KB"""
return os.path.getsize(file_path) / 1024
def load_json_file(file_path: str) -> Optional[Dict]:
"""Load and parse JSON file"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"❌ Error loading {file_path}: {e}")
return None
def get_language_code(language: str) -> str:
"""Convert language to proper ISO code"""
lang_map = {
'english': 'en',
'spanish': 'es',
'french': 'fr',
'german': 'de',
'portuguese': 'pt',
'italian': 'it',
'dutch': 'nl',
'russian': 'ru',
'chinese': 'zh',
'japanese': 'ja',
'korean': 'ko',
'arabic': 'ar',
'hindi': 'hi',
'romanian': 'ro'
}
lower_lang = language.lower()
for key, code in lang_map.items():
if key in lower_lang:
return code
# Default to first 2 characters if no mapping found
return lower_lang[:2] if len(lower_lang) >= 2 else 'xx'
def delete_existing_bible_version(conn, abbreviation: str, language: str) -> bool:
"""Delete existing Bible version if it exists"""
try:
with conn.cursor() as cur:
# Get the version ID first
cur.execute('''
SELECT id FROM "BibleVersion"
WHERE abbreviation = %s AND language = %s
''', (abbreviation, language))
result = cur.fetchone()
if not result:
return False # No existing version
version_id = result[0]
# Delete in order of foreign key dependencies
# First get book IDs for this version
cur.execute('SELECT id FROM "BibleBook" WHERE "versionId" = %s', (version_id,))
book_ids = [row[0] for row in cur.fetchall()]
if book_ids:
# Get chapter IDs for these books
cur.execute('SELECT id FROM "BibleChapter" WHERE "bookId" = ANY(%s)', (book_ids,))
chapter_ids = [row[0] for row in cur.fetchall()]
if chapter_ids:
# Delete verses for these chapters
cur.execute('DELETE FROM "BibleVerse" WHERE "chapterId" = ANY(%s)', (chapter_ids,))
verses_deleted = cur.rowcount
# Delete chapters
cur.execute('DELETE FROM "BibleChapter" WHERE "bookId" = ANY(%s)', (book_ids,))
chapters_deleted = cur.rowcount
else:
verses_deleted = chapters_deleted = 0
# Delete books
cur.execute('DELETE FROM "BibleBook" WHERE "versionId" = %s', (version_id,))
books_deleted = cur.rowcount
else:
verses_deleted = chapters_deleted = books_deleted = 0
# Finally delete the version
cur.execute('DELETE FROM "BibleVersion" WHERE id = %s', (version_id,))
conn.commit()
print(f"🔄 Replaced existing version {abbreviation} ({language}): {books_deleted} books, {chapters_deleted} chapters, {verses_deleted} verses")
return True
except Exception as e:
conn.rollback()
print(f"❌ Error deleting existing version {abbreviation}: {e}")
return False
def bible_version_exists(conn, abbreviation: str, language: str) -> bool:
"""Check if Bible version already exists"""
with conn.cursor() as cur:
cur.execute('''
SELECT COUNT(*) FROM "BibleVersion"
WHERE abbreviation = %s AND language = %s
''', (abbreviation, language))
return cur.fetchone()[0] > 0
def import_bible_version(conn, bible_data: Dict) -> Optional[str]:
"""Import a Bible version and return its ID"""
try:
# Extract and clean data
name = bible_data.get('name', '').strip()
abbreviation = bible_data.get('abbreviation', '').strip()
language = get_language_code(bible_data.get('language', ''))
description = bible_data.get('description', '').strip()
country = bible_data.get('country', '').strip()
english_title = bible_data.get('englishTitle', '').strip()
zip_file_url = bible_data.get('zipFileUrl', '').strip()
flag_image_url = bible_data.get('flagImageUrl', '').strip()
is_default = bible_data.get('isDefault', False)
# Validate required fields
if not name or not abbreviation:
print(f"⚠️ Skipping Bible: missing name or abbreviation")
return None
# Replace existing version if it exists
delete_existing_bible_version(conn, abbreviation, language)
# Insert Bible version
version_id = str(uuid.uuid4())
with conn.cursor() as cur:
cur.execute('''
INSERT INTO "BibleVersion" (
id, name, abbreviation, language, description, country,
"englishTitle", "zipFileUrl", "flagImageUrl", "isDefault",
"createdAt", "updatedAt"
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
''', (
version_id, name, abbreviation, language, description, country,
english_title, zip_file_url, flag_image_url, is_default
))
conn.commit()
print(f"✅ Created Bible version: {name} ({abbreviation})")
return version_id
except Exception as e:
conn.rollback()
print(f"❌ Error importing Bible version: {e}")
return None
def import_bible_books(conn, version_id: str, books_data: List[Dict]) -> int:
"""Import Bible books for a version"""
imported_count = 0
try:
for book_data in books_data:
book_key = book_data.get('bookKey', '').strip()
name = book_data.get('name', '').strip()
testament = book_data.get('testament', '').strip()
order_num = book_data.get('orderNum', 0)
chapters_data = book_data.get('chapters', [])
if not book_key or not name or not testament:
print(f"⚠️ Skipping book: missing required fields")
continue
# Insert book
book_id = str(uuid.uuid4())
with conn.cursor() as cur:
cur.execute('''
INSERT INTO "BibleBook" (
id, "versionId", name, testament, "orderNum", "bookKey"
) VALUES (%s, %s, %s, %s, %s, %s)
''', (book_id, version_id, name, testament, order_num, book_key))
# Import chapters for this book
chapters_imported = import_bible_chapters(conn, book_id, chapters_data)
if chapters_imported > 0:
imported_count += 1
print(f" 📖 {name}: {chapters_imported} chapters")
conn.commit()
return imported_count
except Exception as e:
conn.rollback()
print(f"❌ Error importing books: {e}")
return 0
def import_bible_chapters(conn, book_id: str, chapters_data: List[Dict]) -> int:
"""Import Bible chapters for a book"""
imported_count = 0
try:
for chapter_data in chapters_data:
chapter_num = chapter_data.get('chapterNum', 0)
verses_data = chapter_data.get('verses', [])
if chapter_num <= 0:
print(f"⚠️ Skipping chapter: invalid chapter number")
continue
# Insert chapter
chapter_id = str(uuid.uuid4())
with conn.cursor() as cur:
cur.execute('''
INSERT INTO "BibleChapter" (
id, "bookId", "chapterNum"
) VALUES (%s, %s, %s)
''', (chapter_id, book_id, chapter_num))
# Import verses for this chapter
verses_imported = import_bible_verses(conn, chapter_id, verses_data)
if verses_imported > 0:
imported_count += 1
return imported_count
except Exception as e:
print(f"❌ Error importing chapters: {e}")
return 0
def import_bible_verses(conn, chapter_id: str, verses_data: List[Dict]) -> int:
"""Import Bible verses for a chapter"""
imported_count = 0
try:
# Batch insert verses for better performance
verses_to_insert = []
for verse_data in verses_data:
verse_num = verse_data.get('verseNum', 0)
text = verse_data.get('text', '').strip()
if verse_num <= 0 or not text:
continue
verse_id = str(uuid.uuid4())
verses_to_insert.append((verse_id, chapter_id, verse_num, text))
if verses_to_insert:
with conn.cursor() as cur:
cur.executemany('''
INSERT INTO "BibleVerse" (
id, "chapterId", "verseNum", text
) VALUES (%s, %s, %s, %s)
''', verses_to_insert)
imported_count = len(verses_to_insert)
return imported_count
except Exception as e:
print(f"❌ Error importing verses: {e}")
return 0
def main():
"""Main import function"""
print("🚀 Starting JSON Bible import...")
json_dir = os.path.join(os.getcwd(), 'bibles', 'json')
if not os.path.exists(json_dir):
print(f"❌ JSON directory not found: {json_dir}")
sys.exit(1)
# Get all JSON Bible files
json_files = [f for f in os.listdir(json_dir) if f.endswith('_bible.json')]
print(f"📁 Found {len(json_files)} JSON Bible files")
# Filter by file size (skip files under 500KB)
valid_files = []
skipped_small = 0
for file in json_files:
file_path = os.path.join(json_dir, file)
size_kb = get_file_size_kb(file_path)
if size_kb >= 500:
valid_files.append((file, file_path, size_kb))
else:
skipped_small += 1
print(f"📏 Filtered files: {len(valid_files)} valid (≥500KB), {skipped_small} skipped (<500KB)")
# Sort by file size (largest first for better progress visibility)
valid_files.sort(key=lambda x: x[2], reverse=True)
# Connect to database
try:
conn = get_db_connection()
print("🔗 Connected to database")
except Exception as e:
print(f"❌ Database connection failed: {e}")
sys.exit(1)
# Import statistics
stats = {
'total_files': len(valid_files),
'imported': 0,
'skipped': 0,
'errors': 0,
'total_books': 0,
'total_chapters': 0,
'total_verses': 0
}
# Process each file
for i, (filename, file_path, size_kb) in enumerate(valid_files, 1):
print(f"\n📖 [{i}/{len(valid_files)}] Processing {filename} ({size_kb:.1f}KB)")
try:
# Load JSON data
bible_data = load_json_file(file_path)
if not bible_data:
stats['errors'] += 1
continue
# Import Bible version
version_id = import_bible_version(conn, bible_data)
if not version_id:
stats['skipped'] += 1
continue
# Import books
books_data = bible_data.get('books', [])
books_imported = import_bible_books(conn, version_id, books_data)
if books_imported > 0:
stats['imported'] += 1
stats['total_books'] += books_imported
# Count chapters and verses
for book in books_data:
chapters = book.get('chapters', [])
stats['total_chapters'] += len(chapters)
for chapter in chapters:
stats['total_verses'] += len(chapter.get('verses', []))
print(f"✅ Successfully imported {books_imported} books")
else:
stats['errors'] += 1
# Progress update every 10 files
if i % 10 == 0:
progress = (i / len(valid_files)) * 100
print(f"\n📈 Progress: {progress:.1f}% ({stats['imported']} imported, {stats['skipped']} skipped, {stats['errors']} errors)")
except Exception as e:
print(f"❌ Error processing {filename}: {e}")
stats['errors'] += 1
# Close database connection
conn.close()
# Final summary
print(f"\n🎉 JSON Bible import completed!")
print(f"📊 Final Statistics:")
print(f" - Total files processed: {stats['total_files']}")
print(f" - Successfully imported: {stats['imported']}")
print(f" - Skipped (duplicates): {stats['skipped']}")
print(f" - Errors: {stats['errors']}")
print(f" - Files skipped (<500KB): {skipped_small}")
print(f" - Total books imported: {stats['total_books']}")
print(f" - Total chapters imported: {stats['total_chapters']}")
print(f" - Total verses imported: {stats['total_verses']}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n⚠️ Import interrupted by user")
sys.exit(1)
except Exception as e:
print(f"❌ Fatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)