Files
biblical-guide.com/scripts/import_json_bibles.py
Andrei 95070e5369 Add comprehensive page management system to admin dashboard
Features added:
- Database schema for pages and media files with content types (Rich Text, HTML, Markdown)
- Admin API routes for full page CRUD operations
- Image upload functionality with file management
- Rich text editor using TinyMCE with image insertion
- Admin interface for creating/editing pages with SEO options
- Dynamic navigation and footer integration
- Public page display routes with proper SEO metadata
- Support for featured images and content excerpts

Admin features:
- Create/edit/delete pages with rich content editor
- Upload and manage images through media library
- Configure pages to appear in navigation or footer
- Set page status (Draft, Published, Archived)
- SEO title and description management
- Real-time preview of content changes

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-24 07:26:25 +00:00

353 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Import JSON Bible files into the database.
Skips files under 500KB and handles database constraints properly.
"""
import os
import json
import psycopg
from urllib.parse import urlparse
from dotenv import load_dotenv
from typing import Dict, List, Optional
import sys
from datetime import datetime
import uuid
# Load environment variables
load_dotenv()
def get_db_connection():
"""Get connection to biblical-guide database"""
db_url = os.getenv("DATABASE_URL")
if not db_url:
raise ValueError("DATABASE_URL environment variable not found")
parsed = urlparse(db_url)
conn_str = f"host={parsed.hostname} port={parsed.port or 5432} user={parsed.username} password={parsed.password} dbname=biblical-guide"
return psycopg.connect(conn_str)
def get_file_size_kb(file_path: str) -> float:
"""Get file size in KB"""
return os.path.getsize(file_path) / 1024
def load_json_file(file_path: str) -> Optional[Dict]:
"""Load and parse JSON file"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"❌ Error loading {file_path}: {e}")
return None
def get_language_code(language: str) -> str:
"""Convert language to proper ISO code"""
lang_map = {
'english': 'en',
'spanish': 'es',
'french': 'fr',
'german': 'de',
'portuguese': 'pt',
'italian': 'it',
'dutch': 'nl',
'russian': 'ru',
'chinese': 'zh',
'japanese': 'ja',
'korean': 'ko',
'arabic': 'ar',
'hindi': 'hi',
'romanian': 'ro'
}
lower_lang = language.lower()
for key, code in lang_map.items():
if key in lower_lang:
return code
# Default to first 2 characters if no mapping found
return lower_lang[:2] if len(lower_lang) >= 2 else 'xx'
def bible_version_exists(conn, abbreviation: str, language: str) -> bool:
"""Check if Bible version already exists"""
with conn.cursor() as cur:
cur.execute('''
SELECT COUNT(*) FROM "BibleVersion"
WHERE abbreviation = %s AND language = %s
''', (abbreviation, language))
return cur.fetchone()[0] > 0
def import_bible_version(conn, bible_data: Dict) -> Optional[str]:
"""Import a Bible version and return its ID"""
try:
# Extract and clean data
name = bible_data.get('name', '').strip()
abbreviation = bible_data.get('abbreviation', '').strip()
language = get_language_code(bible_data.get('language', ''))
description = bible_data.get('description', '').strip()
country = bible_data.get('country', '').strip()
english_title = bible_data.get('englishTitle', '').strip()
zip_file_url = bible_data.get('zipFileUrl', '').strip()
flag_image_url = bible_data.get('flagImageUrl', '').strip()
is_default = bible_data.get('isDefault', False)
# Validate required fields
if not name or not abbreviation:
print(f"⚠️ Skipping Bible: missing name or abbreviation")
return None
# Check for duplicates
if bible_version_exists(conn, abbreviation, language):
print(f"⚠️ Bible version {abbreviation} ({language}) already exists, skipping...")
return None
# Insert Bible version
version_id = str(uuid.uuid4())
with conn.cursor() as cur:
cur.execute('''
INSERT INTO "BibleVersion" (
id, name, abbreviation, language, description, country,
"englishTitle", "zipFileUrl", "flagImageUrl", "isDefault",
"createdAt", "updatedAt"
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
''', (
version_id, name, abbreviation, language, description, country,
english_title, zip_file_url, flag_image_url, is_default
))
conn.commit()
print(f"✅ Created Bible version: {name} ({abbreviation})")
return version_id
except Exception as e:
conn.rollback()
print(f"❌ Error importing Bible version: {e}")
return None
def import_bible_books(conn, version_id: str, books_data: List[Dict]) -> int:
"""Import Bible books for a version"""
imported_count = 0
try:
for book_data in books_data:
book_key = book_data.get('bookKey', '').strip()
name = book_data.get('name', '').strip()
testament = book_data.get('testament', '').strip()
order_num = book_data.get('orderNum', 0)
chapters_data = book_data.get('chapters', [])
if not book_key or not name or not testament:
print(f"⚠️ Skipping book: missing required fields")
continue
# Insert book
book_id = str(uuid.uuid4())
with conn.cursor() as cur:
cur.execute('''
INSERT INTO "BibleBook" (
id, "versionId", name, testament, "orderNum", "bookKey"
) VALUES (%s, %s, %s, %s, %s, %s)
''', (book_id, version_id, name, testament, order_num, book_key))
# Import chapters for this book
chapters_imported = import_bible_chapters(conn, book_id, chapters_data)
if chapters_imported > 0:
imported_count += 1
print(f" 📖 {name}: {chapters_imported} chapters")
conn.commit()
return imported_count
except Exception as e:
conn.rollback()
print(f"❌ Error importing books: {e}")
return 0
def import_bible_chapters(conn, book_id: str, chapters_data: List[Dict]) -> int:
"""Import Bible chapters for a book"""
imported_count = 0
try:
for chapter_data in chapters_data:
chapter_num = chapter_data.get('chapterNum', 0)
verses_data = chapter_data.get('verses', [])
if chapter_num <= 0:
print(f"⚠️ Skipping chapter: invalid chapter number")
continue
# Insert chapter
chapter_id = str(uuid.uuid4())
with conn.cursor() as cur:
cur.execute('''
INSERT INTO "BibleChapter" (
id, "bookId", "chapterNum"
) VALUES (%s, %s, %s)
''', (chapter_id, book_id, chapter_num))
# Import verses for this chapter
verses_imported = import_bible_verses(conn, chapter_id, verses_data)
if verses_imported > 0:
imported_count += 1
return imported_count
except Exception as e:
print(f"❌ Error importing chapters: {e}")
return 0
def import_bible_verses(conn, chapter_id: str, verses_data: List[Dict]) -> int:
"""Import Bible verses for a chapter"""
imported_count = 0
try:
# Batch insert verses for better performance
verses_to_insert = []
for verse_data in verses_data:
verse_num = verse_data.get('verseNum', 0)
text = verse_data.get('text', '').strip()
if verse_num <= 0 or not text:
continue
verse_id = str(uuid.uuid4())
verses_to_insert.append((verse_id, chapter_id, verse_num, text))
if verses_to_insert:
with conn.cursor() as cur:
cur.executemany('''
INSERT INTO "BibleVerse" (
id, "chapterId", "verseNum", text
) VALUES (%s, %s, %s, %s)
''', verses_to_insert)
imported_count = len(verses_to_insert)
return imported_count
except Exception as e:
print(f"❌ Error importing verses: {e}")
return 0
def main():
"""Main import function"""
print("🚀 Starting JSON Bible import...")
json_dir = os.path.join(os.getcwd(), 'bibles', 'json')
if not os.path.exists(json_dir):
print(f"❌ JSON directory not found: {json_dir}")
sys.exit(1)
# Get all JSON Bible files
json_files = [f for f in os.listdir(json_dir) if f.endswith('_bible.json')]
print(f"📁 Found {len(json_files)} JSON Bible files")
# Filter by file size (skip files under 500KB)
valid_files = []
skipped_small = 0
for file in json_files:
file_path = os.path.join(json_dir, file)
size_kb = get_file_size_kb(file_path)
if size_kb >= 500:
valid_files.append((file, file_path, size_kb))
else:
skipped_small += 1
print(f"📏 Filtered files: {len(valid_files)} valid (≥500KB), {skipped_small} skipped (<500KB)")
# Sort by file size (largest first for better progress visibility)
valid_files.sort(key=lambda x: x[2], reverse=True)
# Connect to database
try:
conn = get_db_connection()
print("🔗 Connected to database")
except Exception as e:
print(f"❌ Database connection failed: {e}")
sys.exit(1)
# Import statistics
stats = {
'total_files': len(valid_files),
'imported': 0,
'skipped': 0,
'errors': 0,
'total_books': 0,
'total_chapters': 0,
'total_verses': 0
}
# Process each file
for i, (filename, file_path, size_kb) in enumerate(valid_files, 1):
print(f"\n📖 [{i}/{len(valid_files)}] Processing {filename} ({size_kb:.1f}KB)")
try:
# Load JSON data
bible_data = load_json_file(file_path)
if not bible_data:
stats['errors'] += 1
continue
# Import Bible version
version_id = import_bible_version(conn, bible_data)
if not version_id:
stats['skipped'] += 1
continue
# Import books
books_data = bible_data.get('books', [])
books_imported = import_bible_books(conn, version_id, books_data)
if books_imported > 0:
stats['imported'] += 1
stats['total_books'] += books_imported
# Count chapters and verses
for book in books_data:
chapters = book.get('chapters', [])
stats['total_chapters'] += len(chapters)
for chapter in chapters:
stats['total_verses'] += len(chapter.get('verses', []))
print(f"✅ Successfully imported {books_imported} books")
else:
stats['errors'] += 1
# Progress update every 10 files
if i % 10 == 0:
progress = (i / len(valid_files)) * 100
print(f"\n📈 Progress: {progress:.1f}% ({stats['imported']} imported, {stats['skipped']} skipped, {stats['errors']} errors)")
except Exception as e:
print(f"❌ Error processing {filename}: {e}")
stats['errors'] += 1
# Close database connection
conn.close()
# Final summary
print(f"\n🎉 JSON Bible import completed!")
print(f"📊 Final Statistics:")
print(f" - Total files processed: {stats['total_files']}")
print(f" - Successfully imported: {stats['imported']}")
print(f" - Skipped (duplicates): {stats['skipped']}")
print(f" - Errors: {stats['errors']}")
print(f" - Files skipped (<500KB): {skipped_small}")
print(f" - Total books imported: {stats['total_books']}")
print(f" - Total chapters imported: {stats['total_chapters']}")
print(f" - Total verses imported: {stats['total_verses']}")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n⚠️ Import interrupted by user")
sys.exit(1)
except Exception as e:
print(f"❌ Fatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)