330 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			330 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| import os
 | |
| import re
 | |
| import sys
 | |
| from typing import cast
 | |
| 
 | |
| from PyQt6.QtCore import QModelIndex, Qt, pyqtSlot
 | |
| from PyQt6.QtGui import (
 | |
|     QAction,
 | |
|     QFont,
 | |
|     QTextCharFormat,
 | |
|     QTextCursor,
 | |
|     QTextDocument,
 | |
|     QTextListFormat,
 | |
| )
 | |
| from PyQt6.QtSql import QSqlDatabase, QSqlQuery, QSqlQueryModel
 | |
| from PyQt6.QtWidgets import QApplication, QFileDialog, QMainWindow, QMessageBox
 | |
| 
 | |
| from lib import *
 | |
| from ui.MainWindow import Ui_MainWindow
 | |
| 
 | |
| 
 | |
| def query_error(query: QSqlQuery) -> None:
 | |
|     print(
 | |
|         "SQL Error:\n{}\n{}\n{}:{}".format(
 | |
|             query.executedQuery(),
 | |
|             query.boundValues(),
 | |
|             query.lastError().type(),
 | |
|             query.lastError().text(),
 | |
|         )
 | |
|     )
 | |
|     raise Exception("SQL Error")
 | |
| 
 | |
| 
 | |
| class ModelOverride(QSqlQueryModel):
 | |
|     enableFlag = False
 | |
| 
 | |
|     def flags(self, index: QModelIndex) -> Qt.ItemFlag:
 | |
|         itemFlags = super(ModelOverride, self).flags(index)
 | |
|         if self.enableFlag:
 | |
|             return itemFlags
 | |
|         value = index.siblingAtColumn(3).data()
 | |
|         if not value or value < 1:
 | |
|             itemFlags &= ~(Qt.ItemFlag.ItemIsSelectable | Qt.ItemFlag.ItemIsEnabled)
 | |
|         return itemFlags
 | |
| 
 | |
|     @pyqtSlot()
 | |
|     def toggle(self) -> None:
 | |
|         self.enableFlag = not self.enableFlag
 | |
|         return
 | |
| 
 | |
| 
 | |
| class MainWindow(QMainWindow, Ui_MainWindow):
 | |
|     def __init__(self) -> None:
 | |
|         super(MainWindow, self).__init__()
 | |
|         self.setupUi(self)
 | |
|         model = ModelOverride()
 | |
|         query = QSqlQuery("SELECT * FROM people ORDER BY name")
 | |
|         model.setQuery(query)
 | |
|         self.peopleView.setModel(model)
 | |
|         self.peopleView.setModelColumn(1)
 | |
|         self.ReadButton.clicked.connect(model.toggle)
 | |
|         self.bookBtn.clicked.connect(self.bookAction)
 | |
|         self.peopleView.doubleClicked.connect(self.readAction)
 | |
|         self.actionQuit.triggered.connect(self.close)
 | |
|         self.createActions()
 | |
|         self.show()
 | |
|         return
 | |
| 
 | |
|     def createActions(self) -> None:
 | |
|         query = QSqlQuery()
 | |
|         query.prepare("SELECT * FROM books ORDER BY title")
 | |
|         if not query.exec():
 | |
|             query_error(query)
 | |
|         while query.next():
 | |
|             action = QAction(query.value("title"), self)
 | |
|             action.setData(query.value("book_id"))
 | |
|             action.triggered.connect(self.setBookAction)
 | |
|             self.menuBooks.addAction(action)
 | |
|         return
 | |
| 
 | |
|     @pyqtSlot()
 | |
|     def setBookAction(self) -> None:
 | |
|         action = cast(QAction, self.sender())
 | |
|         book_id = action.data()
 | |
|         indexes = self.peopleView.selectedIndexes()
 | |
|         if len(indexes) < 1:
 | |
|             return
 | |
|         person_id = indexes[0].siblingAtColumn(0).data()
 | |
|         query = QSqlQuery()
 | |
|         query.prepare(
 | |
|             "SELECT * FROM person_book "
 | |
|             "WHERE person_id = :person_id "
 | |
|             "AND book_id = :book_id"
 | |
|         )
 | |
|         query.bindValue(":person_id", person_id)
 | |
|         query.bindValue(":book_id", book_id)
 | |
|         if not query.exec():
 | |
|             query_error(query)
 | |
|         if not query.next():
 | |
|             query.prepare(
 | |
|                 "SELECT * FROM sections WHERE sequence = 0 " "AND book_id = :book_id"
 | |
|             )
 | |
|             query.bindValue(":book_id", book_id)
 | |
|             if not query.exec():
 | |
|                 query_error(query)
 | |
|             if not query.next():
 | |
|                 raise Exception(f"book_id: {book_id} has no section 0!")
 | |
|             section_id = query.value("section_id")
 | |
|             query.prepare(
 | |
|                 "INSERT INTO person_book "
 | |
|                 "VALUES (:person_id, :book_id, :section_id, 0)"
 | |
|             )
 | |
|             query.bindValue(":person_id", person_id)
 | |
|             query.bindValue(":book_id", book_id)
 | |
|             query.bindValue(":section_id", section_id)
 | |
|             if not query.exec():
 | |
|                 query_error(query)
 | |
|         query.prepare(
 | |
|             "UPDATE people SET book_id = :book_id " "WHERE person_id = :person_id"
 | |
|         )
 | |
|         query.bindValue(":book_id", book_id)
 | |
|         query.bindValue(":person_id", person_id)
 | |
|         if not query.exec():
 | |
|             query_error(query)
 | |
|         query.prepare(
 | |
|             "SELECT p.name,b.title FROM people p "
 | |
|             "LEFT JOIN books b "
 | |
|             "ON (p.book_id = b.book_id) "
 | |
|             "WHERE p.person_id = :person_id"
 | |
|         )
 | |
|         query.bindValue(":person_id", person_id)
 | |
|         if not query.exec():
 | |
|             query_error(query)
 | |
|         query.next()
 | |
|         title = query.value("title")
 | |
|         name = query.value("name")
 | |
|         QMessageBox.information(
 | |
|             self, "Book Assignment", f"{title} was assigned to {name}"
 | |
|         )
 | |
|         self.resetPeopleModel()
 | |
|         return
 | |
| 
 | |
|     def resetPeopleModel(self) -> None:
 | |
|         query = QSqlQuery("SELECT * FROM people ORDER BY name")
 | |
|         self.peopleView.model().setQuery(query)
 | |
|         return
 | |
| 
 | |
|     @pyqtSlot()
 | |
|     def bookAction(self) -> None:
 | |
|         directory = QFileDialog.getExistingDirectory()
 | |
|         self.book = Book(directory)
 | |
|         return
 | |
| 
 | |
|     def load_definition(self, word: str, definition: dict) -> None:
 | |
|         document = None  # self.textEdit.document()
 | |
|         myCursor = QTextCursor(document)
 | |
|         myCursor.movePosition(QTextCursor.MoveOperation.Start)
 | |
|         myCursor.movePosition(
 | |
|             QTextCursor.MoveOperation.End, QTextCursor.MoveMode.KeepAnchor
 | |
|         )
 | |
|         myCursor.removeSelectedText()
 | |
|         word_format = QTextCharFormat()
 | |
|         # word_format.setFontFamily("Caveat")
 | |
|         word_format.setFontPointSize(48)
 | |
|         word_format.setFontWeight(QFont.Weight.Bold)
 | |
|         myCursor.insertText(word, word_format)
 | |
|         # word_format.setFont(document.defaultFont())
 | |
|         typeFormat = QTextListFormat()
 | |
|         typeFormat.setStyle(QTextListFormat.Style.ListDisc)
 | |
|         typeFormat.setIndent(1)
 | |
|         defFormat = QTextListFormat()
 | |
|         defFormat.setStyle(QTextListFormat.Style.ListCircle)
 | |
|         defFormat.setIndent(2)
 | |
|         myCursor.setCharFormat(word_format)
 | |
|         for key in definition.keys():
 | |
|             myCursor.insertList(typeFormat)
 | |
|             myCursor.insertText(key)
 | |
|             myCursor.insertList(defFormat)
 | |
|             first = True
 | |
|             for a_def in definition[key]:
 | |
|                 if not first:
 | |
|                     myCursor.insertBlock()
 | |
|                 myCursor.insertText(a_def)
 | |
|                 first = False
 | |
|         return
 | |
| 
 | |
|     @pyqtSlot()
 | |
|     @pyqtSlot(QModelIndex)
 | |
|     def readAction(self, index: QModelIndex | None = None) -> None:
 | |
|         if index:
 | |
|             person_id = index.siblingAtColumn(0).data()
 | |
|         else:
 | |
|             indexes = self.peopleView.selectedIndexes()
 | |
|             if len(indexes) < 1:
 | |
|                 return
 | |
|             person_id = indexes[0].siblingAtColumn(0).data()
 | |
|         dlg = EditDialog(person_id)
 | |
|         dlg.exec()
 | |
|         return
 | |
| 
 | |
| 
 | |
| SQL_CMDS = [
 | |
|     "PRAGMA foreign_keys=ON",
 | |
|     "CREATE TABLE IF NOT EXISTS words "
 | |
|     "(word_id INTEGER PRIMARY KEY AUTOINCREMENT, word TEXT, definition TEXT)",
 | |
|     "CREATE TABLE IF NOT EXISTS books "
 | |
|     "(book_id INTEGER PRIMARY KEY AUTOINCREMENT, title TEXT, author TEXT, "
 | |
|     "uuid TEXT, level INTEGER)",
 | |
|     "CREATE TABLE IF NOT EXISTS sections "
 | |
|     "(section_id INTEGER PRIMARY KEY AUTOINCREMENT, "
 | |
|     "sequence INTEGER, content TEXT, "
 | |
|     "book_id INTEGER REFERENCES books ON DELETE CASCADE)",
 | |
|     "CREATE TABLE IF NOT EXISTS people "
 | |
|     "(person_id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, "
 | |
|     "organization TEXT, book_id INTEGER REFERENCES books ON DELETE CASCADE)",
 | |
|     "CREATE TABLE IF NOT EXISTS person_book "
 | |
|     "(person_id INTEGER REFERENCES people ON DELETE CASCADE, "
 | |
|     "book_id INTEGER REFERENCES books ON DELETE CASCADE, "
 | |
|     "section_id INTEGER REFERENCES sections, "
 | |
|     "block INTEGER)",
 | |
|     "CREATE TABLE IF NOT EXISTS word_block "
 | |
|     "(word_id INTEGER REFERENCES words ON DELETE CASCADE, "
 | |
|     "section_id INTEGER REFERENCES sections ON DELETE CASCADE, "
 | |
|     "block INTEGER NOT NULL, start INTEGER NOT NULL, end INTEGER NOT NULL)",
 | |
| ]
 | |
| 
 | |
| 
 | |
| def schema_update(db: QSqlDatabase) -> None:
 | |
|     query = QSqlQuery()
 | |
| 
 | |
|     for sql in SQL_CMDS:
 | |
|         inlower = sql.lower().strip()
 | |
|         if not inlower.startswith("create table "):
 | |
|             if not query.exec(sql):
 | |
|                 query_error(query)
 | |
|             continue
 | |
|         create_cmd = re.sub(r"IF NOT EXISTS ", "", sql.strip())
 | |
|         create_cmd = re.sub(r"\s\s*", " ", create_cmd)
 | |
|         matches = re.search(r"^(CREATE TABLE )([^ ]+)( \(.+)$", create_cmd)
 | |
|         if matches:
 | |
|             table_name = matches.group(2)
 | |
|             create_cmd = (
 | |
|                 matches.group(1) + '"' + matches.group(2) + '"' + matches.group(3)
 | |
|             )
 | |
|         else:
 | |
|             raise AttributeError(f"No match found: {create_cmd}")
 | |
| 
 | |
|         query.prepare("SELECT sql FROM sqlite_schema WHERE tbl_name = :tbl")
 | |
|         query.bindValue(":tbl", table_name)
 | |
|         if not query.exec():
 | |
|             query_error(query)
 | |
|         query.next()
 | |
|         old = query.value(0)
 | |
|         if not old:
 | |
|             if not query.exec(sql):
 | |
|                 query_error(query)
 | |
|             continue
 | |
|         if old.lower() == create_cmd.lower():
 | |
|             continue
 | |
|         print(old.lower())
 | |
|         print(create_cmd.lower())
 | |
|         print(f"Updating: {table_name}")
 | |
| 
 | |
|         # Step 1 turn off foreign key constraints
 | |
|         if not query.exec("PRAGMA foreign_keys=OFF"):
 | |
|             query_error(query)
 | |
|         # Step 2 start a transaction
 | |
|         db.transaction()
 | |
|         # Step 3 remember old indexes, triggers, and views
 | |
|         # Step 4 create new table
 | |
|         new_table_name = table_name + "_new"
 | |
|         if not query.exec(matches.group(1) + new_table_name + matches.group(3)):
 | |
|             query_error(query)
 | |
|         # step 5 transfer content
 | |
|         coldefs = re.search(r"\((.+)\)", old).group(1).split(", ")  # type: ignore[union-attr]
 | |
|         cols = [x.split(" ")[0] for x in coldefs]
 | |
|         if not query.exec(
 | |
|             "INSERT INTO "
 | |
|             + new_table_name
 | |
|             + "("
 | |
|             + ", ".join(cols)
 | |
|             + ") SELECT "
 | |
|             + ", ".join(cols)
 | |
|             + " FROM "
 | |
|             + table_name
 | |
|         ):
 | |
|             query_error(query)
 | |
| 
 | |
|         # step 6 Drop old table
 | |
|         if not query.exec("DROP TABLE " + table_name):
 | |
|             query_error(query)
 | |
|         # step 6 rename new table to old table
 | |
|         if not query.exec("ALTER TABLE " + new_table_name + " RENAME TO " + table_name):
 | |
|             query_error(query)
 | |
| 
 | |
|         # step 8 create indexes, triggers, and views
 | |
|         # step 9 rebuild affected views
 | |
|         # step 10 turn foreign key constrants back on
 | |
|         if not query.exec("PRAGMA foreign_keys=ON"):
 | |
|             query_error(query)
 | |
|         # step 11 commit the changes
 | |
|         db.commit()
 | |
|     return
 | |
| 
 | |
| 
 | |
| def main() -> int:
 | |
|     db = QSqlDatabase()
 | |
|     db = db.addDatabase("QSQLITE")
 | |
|     db.setDatabaseName("twel.db")
 | |
|     db.open()
 | |
|     app = QApplication(sys.argv)
 | |
|     schema_update(db)
 | |
|     window: QMainWindow = MainWindow()
 | |
|     return app.exec()
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     outOfDate = []
 | |
|     for fileName in os.listdir("ui"):
 | |
|         if not fileName.endswith(".py"):
 | |
|             continue
 | |
|         uiName = fileName[:-3] + ".ui"
 | |
|         if os.path.getmtime("ui/" + uiName) > os.path.getmtime("ui/" + fileName):
 | |
|             outOfDate.append(fileName)
 | |
|     if len(outOfDate) > 0:
 | |
|         print(f"UI out of date: {', '.join(outOfDate)}")
 | |
|         sys.exit(1)
 | |
|     sys.exit(main())
 |