import json import os import secrets import smtplib from datetime import datetime, timedelta from email import policy from email.message import EmailMessage from html.parser import HTMLParser from io import StringIO import css_inline from PyQt6.QtCore import QResource, QSize, Qt, QUrl, pyqtSlot from PyQt6.QtGui import QStandardItem, QStandardItemModel from PyQt6.QtMultimedia import QMediaDevices, QSoundEffect from PyQt6.QtSql import QSqlQuery, QSqlQueryModel from PyQt6.QtWidgets import QDialog, QDialogButtonBox, QStyledItemDelegate from main import query_error from ui.PeopleDialog import Ui_Dialog class blockHandler(HTMLParser): text = "" blocks = [] active = 0 tags = [ "h1", "h2", "h3", "h4", "h5", "h6", "p", "b", "i", "em", "st", "span", ] space = ["b", "i", "em", "st", "span"] def __init__(self): super().__init__() self.reset() self.strict = False self.convert_charrefs = True self.text = "" self.blocks = [] self.active = 0 return def handle_starttag(self, tag, attrs): if not tag in self.tags: return self.active += 1 if tag in self.space: self.text += " " self.text += f"<{tag}>" return def handle_endtag(self, tag): if not tag in self.tags: return self.active -= 1 self.text += f"" if tag in self.space: self.text += " " if self.active <= 0: self.blocks.append(self.text) self.text = "" self.active = 0 return def handle_data(self, data): self.text += data return def get_block(self, block): return self.blocks[block] class MLStripper(HTMLParser): def __init__(self): super().__init__() self.reset() return def reset(self): super().reset() self.strict = False self.convert_charrefs = True self.text = StringIO() self.first = True return def handle_data(self, d): if self.first: self.text.write(d) self.first = False return def get_data(self): return self.text.getvalue() class PersonDialog(QDialog, Ui_Dialog): SectionIdRole = Qt.ItemDataRole.UserRole SectionSequenceRole = Qt.ItemDataRole.UserRole + 1 BookIdRole = Qt.ItemDataRole.UserRole + 2 person_id = 0 inliner = css_inline.CSSInliner(keep_style_tags=True, keep_link_tags=True) def __init__(self, *args, **kwargs): self.person_id = kwargs.pop("person_id", 0) super(PersonDialog, self).__init__(*args, **kwargs) self.setupUi(self) self.show() QResource.registerResource( os.path.join(os.path.dirname(__file__), "../ui/resources.rcc"), "/" ) model = QSqlQueryModel() query = QSqlQuery() query.prepare("SELECT book_id, title " "FROM books " "ORDER BY title") if not query.exec(): query_error(query) model.setQuery(query) self.bookCombo.setPlaceholderText("Select A Book") self.bookCombo.setModel(model) self.bookCombo.setModelColumn(1) self.bookCombo.setCurrentIndex(-1) model = QStandardItemModel() self.sectionCombo.setPlaceholderText("Select A Section") self.sectionCombo.setModel(model) self.sectionCombo.setEnabled(False) self.sectionCombo.setCurrentIndex(-1) self.printBtn.setEnabled(False) self.emailBtn.setEnabled(False) button = self.buttonBox.button(QDialogButtonBox.StandardButton.Ok) button.setEnabled(False) # # Connections # self.bookCombo.currentIndexChanged.connect(self.bookSelected) self.sectionCombo.currentIndexChanged.connect(self.sectionSelected) self.nameEdit.editingFinished.connect(self.checkLineEdits) self.orgEdit.editingFinished.connect(self.checkLineEdits) self.printBtn.clicked.connect(self.senditAction) self.emailBtn.clicked.connect(self.senditAction) if self.person_id > 0: self.setPerson(self.person_id) return def setPerson(self, person_id: int) -> None: query = QSqlQuery() query.prepare( "SELECT p.name, p.organization, p.book_id, p.email, s.sequence " "FROM people p " "LEFT JOIN person_book pb " "ON (p.book_id = pb.book_id " "AND p.person_id = pb.person_id) " "LEFT JOIN sections s " "ON (s.section_id = pb.section_id) " "WHERE p.person_id = :person_id" ) query.bindValue(":person_id", person_id) if not query.exec(): query_error(query) if not query.next(): raise Exception(f"No person record for {person_id}") self.person_id = person_id self.nameEdit.setText(query.value("name")) self.orgEdit.setText(query.value("organization")) self.emailEdit.setText(query.value("email")) model = self.bookCombo.model() matches = model.match( model.createIndex(0, 0), Qt.ItemDataRole.DisplayRole, query.value("book_id"), 1, Qt.MatchFlag.MatchExactly, ) if len(matches) != 1: raise Exception( f"Match failed looking for book_id: {query.value('book_id')}" ) row = int(matches[0].row()) self.bookCombo.setCurrentIndex(row) self.sectionCombo.setCurrentIndex(query.value("sequence")) query.prepare( "SELECT * FROM sessions " "WHERE person_id = :person_id " "ORDER BY start DESC" ) query.bindValue(":person_id", person_id) if not query.exec(): query_error(query) model = QSqlQueryModel() model.setQuery(query) self.sessionCombo.setModel(model) self.sessionCombo.setModelColumn(2) self.printBtn.setEnabled(True) self.emailBtn.setEnabled(True) return @pyqtSlot() def senditAction(self) -> None: title = self.sessionCombo.currentText() html = f"\n{title}\n" html += ( '\n" ) html += "\n" html += f"

{title}

\n" html += self.makeDefinitions() html += self.makeText() html += "\n\n" if self.sender() == self.printBtn: dev = None for output in QMediaDevices.audioOutputs(): if output.id().data().decode("UTF-8") == "virt-input": dev = output break self.alert = QSoundEffect() if dev: self.alert.setAudioDevice(dev) self.alert.setSource(QUrl.fromLocalFile("ui/beep.wav")) self.alert.setLoopCount(1) self.alert.play() print(html) return msg = EmailMessage(policy=policy.default) start = datetime.fromisoformat(self.sessionCombo.currentText()) msg["Subject"] = f"TT English, Session: {start.date().isoformat()}" msg["From"] = "Christopher T. Johnson " msg["To"] = self.emailEdit.text().strip() msg.set_content("There is a html message you should read") msg.add_alternative(self.inliner.inline(html), subtype="html") server = smtplib.SMTP(secrets.SMTP_HOST, secrets.SMTP_PORT) server.set_debuglevel(1) if secrets.SMTP_STARTTLS: server.starttls() server.login(secrets.SMTP_USER, secrets.SMTP_PASSWORD) server.send_message(msg) server.quit() return @pyqtSlot(int) def bookSelected(self, index: int) -> None: book_index = self.bookCombo.model().createIndex(index, 0) book_id = book_index.data() model = self.sectionCombo.model() query = QSqlQuery() query.prepare( "SELECT section_id, sequence, content " "FROM sections " "WHERE book_id = :book_id " "ORDER BY sequence" ) query.bindValue(":book_id", book_id) if not query.exec(): query_error(query) model.clear() stripper = MLStripper() while query.next(): stripper.feed(query.value("content")) content = stripper.get_data() stripper.reset() item = QStandardItem() item.setData(content[:40], Qt.ItemDataRole.DisplayRole) item.setData( query.value("sequence"), PersonDialog.SectionSequenceRole ) item.setData(query.value("section_id"), PersonDialog.SectionIdRole) model.appendRow(item) self.sectionCombo.setEnabled(True) return @pyqtSlot(int) def sectionSelected(self, row: int) -> None: self.checkLineEdits() return @pyqtSlot() def accept(self) -> None: query = QSqlQuery() if self.person_id > 0: query.prepare( "UPDATE people SET " "name = :name, " "organization = :org, " "email = :email, " "book_id = :book_id " "WHERE person_id = :person_id" ) query.bindValue(":person_id", self.person_id) else: query.prepare( "INSERT INTO people " "(name, organization, email, book_id) " "VALUES (:name, :org, :email, :book_id)" ) query.bindValue(":name", self.nameEdit.text().strip()) query.bindValue(":org", self.nameEdit.text().strip()) query.bindValue(":email", self.emailEdit.text().strip()) row = self.bookCombo.currentIndex() model = self.bookCombo.model() book_id = model.data(model.createIndex(row, 0)) query.bindValue(":book_id", book_id) section_id = self.sectionCombo.currentData(PersonDialog.SectionIdRole) print(f"section_id: {section_id}") if not section_id: raise Exception(f"Section id is null") if not query.exec(): query_error(query) if self.person_id <= 0: self.person_id = query.lastInsertId() query.prepare( "SELECT * FROM person_book pb " "WHERE pb.person_id = :person_id " "AND pb.book_id = :book_id" ) query.bindValue(":person_id", self.person_id) query.bindValue(":book_id", book_id) if not query.exec(): query_error(query) if query.next(): query.prepare( "UPDATE person_book SET " "section_id = :section_id " "WHERE person_id = :person_id " "AND book_id = :book_id" ) else: query.prepare( "INSERT INTO person_book " "(person_id, book_id, section_id, block) " "VALUES (:person_id, :book_id, :section_id, 0 )" ) query.bindValue(":person_id", self.person_id) query.bindValue(":book_id", book_id) query.bindValue(":section_id", section_id) if not query.exec(): query_error(query) super().accept() return @pyqtSlot() def checkLineEdits(self): name = self.nameEdit.text().strip() org = self.orgEdit.text().strip() button = self.buttonBox.button(QDialogButtonBox.StandardButton.Ok) if name and org: button.setEnabled(True) else: button.setEnabled(False) return def makeDefinitions(self) -> str: query = QSqlQuery() query.prepare( "SELECT w.word, w.definition " "FROM session_word sw " "LEFT JOIN words w " "ON (sw.word_id = w.word_id) " "WHERE sw.session_id = :session_id " "ORDER BY w.word" ) row = self.sessionCombo.currentIndex() model = self.sessionCombo.model() index = model.index(row, 0) session_id = index.data() query.bindValue(":session_id", session_id) if not query.exec(): query_error(query) html = '
\n
\n' while query.next(): html += "
" + query.value("word") + "
\n
\n" data = json.loads(query.value("definition")) if "phonetics" in data: for p in data["phonetics"]: if "text" in p: html += '

' + p["text"] + "

\n" html += '
\n' for meaning in data["meanings"]: html += "
" + meaning["partOfSpeech"] + "
\n
    \n" for definition in meaning["definitions"]: html += "
  • " + definition["definition"] + "
  • \n" html += "
\n" html += "
\n
\n" html += "
\n
\n" return html def makeText(self) -> str: query = QSqlQuery() section_query = QSqlQuery() html = '
' session_id = ( self.sessionCombo.model() .index(self.sessionCombo.currentIndex(), 0) .data() ) query.prepare( "SELECT * FROM session_block sb " "WHERE sb.session_id = :session_id " "ORDER BY sb.section_id, sb.block" ) query.bindValue(":session_id", session_id) if not query.exec(): query_error(query) section_query.prepare( "SELECT * FROM sections " "WHERE section_id = :section_id" ) section_id = 0 while query.next(): if section_id != query.value("section_id"): section_id = query.value("section_id") section_query.bindValue(":section_id", section_id) if not section_query.exec(): query_error(section_query) if not section_query.next(): raise Exception(f"Missing section {section_id}") section = blockHandler() section.feed(section_query.value("content")) html += section.get_block(query.value("block")) + "\n" html += "
\n" return html def makeStats(self) -> str: html = '
' html += "
\n" return html