Files
scotus-watch/scotus-pull.py
Christopher T. Johnson bc4f778aba Lint
2025-02-10 14:31:59 -05:00

349 lines
11 KiB
Python
Executable File

#!venv/bin/python3
import datetime
import re
import sys
from typing import Any, cast
from PySide6.QtCore import (
QCoreApplication,
QModelIndex,
QPersistentModelIndex,
QRect,
QSize,
Signal,
Slot,
)
from PySide6.QtGui import QColor, QPainter, QTextDocument
from PySide6.QtSql import (
QSqlDatabase,
QSqlQuery,
QSqlTableModel,
)
from PySide6.QtWidgets import (
QAbstractItemView,
QApplication,
QHeaderView,
QMainWindow,
QStyle,
QStyledItemDelegate,
QStyleOptionViewItem,
QWidget,
)
from docketModel import docketModel
from lib.utils import query_error
from ui.MainWindow import Ui_MainWindow
from workers import loadCases, updateThread
translate = QCoreApplication.translate
class dateDelegate(QStyledItemDelegate):
def displayText(self, value: int, _: Any) -> str:
date = datetime.date.fromtimestamp(value)
return date.strftime("%B %-d, %Y")
class QStyleOptionViewItemInit(QStyleOptionViewItem):
backgroundBrush: QColor
rect: QRect
widget: QWidget
text: str
class activeDelegate(QStyledItemDelegate):
def initStyleOption(
self,
option: QStyleOptionViewItem,
index: QModelIndex | QPersistentModelIndex,
/,
) -> None:
options = cast(QStyleOptionViewItemInit, option)
super().initStyleOption(options, index)
assert isinstance(index, QModelIndex)
if index.siblingAtColumn(6).data() == 0:
options.backgroundBrush = QColor(0x444444)
return
class documentDelegate(QStyledItemDelegate):
def initStyleOption(
self,
option: QStyleOptionViewItem,
index: QModelIndex | QPersistentModelIndex,
/,
) -> None:
options = cast(QStyleOptionViewItemInit, option)
super().initStyleOption(options, index)
assert index.isValid() and isinstance(index, QModelIndex)
if index.siblingAtColumn(6).data() == 0:
options.backgroundBrush = QColor(0x444444)
return
def paint(
self,
painter: QPainter,
option: QStyleOptionViewItem,
index: QModelIndex | QPersistentModelIndex,
) -> None:
options = cast(QStyleOptionViewItemInit, option)
self.initStyleOption(options, index)
painter.save()
doc = QTextDocument()
doc.setTextWidth(options.rect.width())
doc.setHtml(options.text)
options.text = ""
options.widget.style().drawControl(
QStyle.ControlElement.CE_ItemViewItem,
options,
painter,
)
painter.translate(options.rect.left(), options.rect.top())
clip = QRect(0, 0, options.rect.width(), options.rect.height())
doc.drawContents(painter, clip)
painter.restore()
return
def sizeHint(
self,
option: QStyleOptionViewItem,
index: QModelIndex | QPersistentModelIndex,
) -> QSize:
options = cast(QStyleOptionViewItemInit, option)
self.initStyleOption(options, index)
doc = QTextDocument()
doc.setTextWidth(options.rect.width())
doc.setHtml(options.text)
doc.setTextWidth(options.rect.width())
return QSize(int(doc.idealWidth()), int(doc.size().height()))
class MainWindow(QMainWindow, Ui_MainWindow):
show_entries = Signal(int)
loadThread = None
def __init__(self) -> None:
super(MainWindow, self).__init__()
self.setupUi(self)
self.loadThread = loadCases()
self.loadThread.finished.connect(self.updateDone)
self.loadThread.start()
# model = QSqlQueryModel()
model = QSqlTableModel()
query = QSqlQuery(
"SELECT * FROM cases ORDER BY SUBSTR(docket_id,1,3), "
"CAST(SUBSTR(docket_id,4) AS INT)"
)
if not query.exec():
query_error(query)
model.setQuery(query)
self.casesView.setModel(model)
self.casesView.setSelectionMode(
QAbstractItemView.SelectionMode.SingleSelection
)
self.casesView.setSelectionBehavior(
QAbstractItemView.SelectionBehavior.SelectRows
)
self.casesView.hideColumn(0)
self.casesView.hideColumn(2)
self.casesView.setItemDelegateForColumn(5, dateDelegate())
self.casesView.setItemDelegate(activeDelegate())
self.casesView.resizeColumnToContents(1)
self.casesView.resizeColumnToContents(5)
header = self.casesView.horizontalHeader()
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Fixed)
header.setSectionResizeMode(4, QHeaderView.ResizeMode.Fixed)
self.show()
remaining = (
self.casesView.width()
- header.sectionSize(1)
- header.sectionSize(5)
- 5
)
self.casesView.setColumnWidth(3, int(remaining * 0.5))
self.casesView.setColumnWidth(4, int(remaining * 0.5))
self.casesView.verticalHeader().hide()
self.casesView.resizeRowsToContents()
self.casesView.doubleClicked.connect(self.rowClicked)
self.casesView.clicked.connect(self.rowClicked)
self.docketView.setModel(docketModel())
self.docketView.horizontalHeader().setSectionResizeMode(
1, QHeaderView.ResizeMode.Stretch
)
self.docketView.resizeRowsToContents()
self.docketView.setItemDelegateForColumn(1, documentDelegate())
return
@Slot(QModelIndex) # type: ignore
def rowClicked(self, index: QModelIndex) -> None:
if not index.isValid():
raise Exception("Bad index")
docket = index.siblingAtColumn(1).data()
self.docketLabel.setText(docket)
self.show_entries.emit(index.siblingAtColumn(0).data())
model = docketModel(index.siblingAtColumn(0).data())
self.docketView.setModel(model)
self.docketView.resizeColumnToContents(0)
self.docketView.resizeRowsToContents()
return
updateThread = None
@Slot()
def on_updateButton_clicked(self) -> None:
text = self.docketInput.toPlainText()
print(f"on_updateButton_clicked(): {text}")
if not self.updateThread:
self.updateThread = updateThread()
assert isinstance(self.updateThread, updateThread)
self.updateThread.finished.connect(self.updateDone)
self.updateThread.setDocketId(text)
self.updateThread.start()
return
@Slot()
def updateDone(self) -> None:
self.updateThread = None
print("Done updating")
model: QSqlTableModel = cast(QSqlTableModel, self.casesView.model())
query = model.query()
query.exec()
model.setQuery(query)
return
SQL_CMDS = [
# "PRAGMA foreign_keys=ON",
"CREATE TABLE IF NOT EXISTS cases "
"(case_id INTEGER PRIMARY KEY AUTOINCREMENT, "
"docket_id TEXT, "
"linked INTEGER, "
"petitioners TEXT, respondents TEXT, date INTEGER, "
"active INTEGER, "
"FOREIGN KEY(linked) REFERENCES cases(case_id))",
#
"CREATE TABLE IF NOT EXISTS entries ("
"entry_id INTEGER PRIMARY KEY AUTOINCREMENT, "
"case_id INTEGER, "
"date INTEGER, "
"text TEXT, "
"FOREIGN KEY(case_id) REFERENCES cases(case_id))",
#
"CREATE TABLE IF NOT EXISTS documents ("
"document_id INTEGER PRIMARY KEY AUTOINCREMENT, "
"entry_id INTEGER, "
"name TEXT, "
"url TEXT, "
"FOREIGN KEY(entry_id) REFERENCES entries(entry_id))",
#
"CREATE TABLE IF NOT EXISTS history ("
"history_id INTEGER PRIMARY KEY AUTOINCREMENT, "
"year TEXT, "
"edocket INTEGER, "
"number INTEGER)",
]
def schema_update(db: QSqlDatabase) -> None:
query = QSqlQuery()
for sql in SQL_CMDS:
inlower = sql.lower().strip()
if not inlower.startswith("create table "):
if not query.exec(sql):
query_error(query)
continue
create_cmd = re.sub(r"IF NOT EXISTS ", "", sql.strip())
create_cmd = re.sub(r"\s\s*", " ", create_cmd)
matches = re.search(r"^(CREATE TABLE )([^ ]+)( \(.+)$", create_cmd)
if matches:
table_name = matches.group(2)
create_cmd = (
matches.group(1)
+ '"'
+ matches.group(2)
+ '"'
+ matches.group(3)
)
else:
raise AttributeError(f"No match found: {create_cmd}")
print(f"Table name = {table_name}")
query.prepare("SELECT sql FROM sqlite_schema WHERE tbl_name = :tbl")
query.bindValue(":tbl", table_name)
if not query.exec():
query_error(query)
if not query.next():
print(sql)
if not query.exec(sql):
query_error(query)
continue
old = query.value(0)
if old.lower() == create_cmd.lower():
continue
print(old.lower())
print(create_cmd.lower())
print(translate("MainWindow", "Updating: ") + f"{table_name}")
# Step 1 turn off foreign key constraints
if not query.exec("PRAGMA foreign_keys=OFF"):
query_error(query)
# Step 2 start a transaction
db.transaction()
# Step 3 remember old indexes, triggers, and views
# Step 4 create new table
new_table_name = table_name + "_new"
sql = matches.group(1) + new_table_name + matches.group(3)
if not query.exec(sql):
query_error(query)
# step 5 transfer content
coldefs = re.search(r"\((.+)\)", old).group(1).split(", ") # type: ignore[union-attr]
cols = [
x.split(" ")[0]
for x in filter(lambda s: not s.startswith("FOREIGN "), coldefs)
]
cols_str = ", ".join(cols)
sql = f"INSERT INTO {new_table_name} ({cols_str}) SELECT {cols_str} FROM {table_name}"
query.prepare(sql)
if not query.exec():
query_error(query)
# step 6 Drop old table
query.prepare("DROP TABLE " + table_name)
if not query.exec():
query_error(query)
# step 6 rename new table to old table
query.prepare(
"ALTER TABLE " + new_table_name + " RENAME TO " + table_name
)
if not query.exec():
query_error(query)
# step 8 create indexes, triggers, and views
# step 9 rebuild affected views
# step 10 turn foreign key constrants back on
# if not query.exec("PRAGMA foreign_keys=ON"):
# query_error(query)
# step 11 commit the changes
db.commit()
return
def main() -> int:
app = QApplication(sys.argv)
db = QSqlDatabase.addDatabase("QSQLITE")
# db.setConnectOptions("PRAGMA foreign_keys = ON")
db.setDatabaseName("scotus.db")
db.open()
schema_update(db)
MainWindow()
return app.exec()
if __name__ == "__main__":
sys.exit(main())