Files
scotus-watch/scotus-pull.py
Christopher T. Johnson c7a9ca89f1 Initial Commit
2025-02-07 09:37:14 -05:00

386 lines
14 KiB
Python
Executable File

#!venv/bin/python3
import datetime
import re
import sys
import dateparser
import requests
from typing import NoReturn
from PySide6.QtCore import QCoreApplication, QModelIndex, Signal, Qt
from PySide6.QtSql import QSqlDatabase, QSqlQuery, QSqlQueryModel
from PySide6.QtWidgets import QAbstractItemView, QApplication, QHeaderView, QMainWindow, QStyledItemDelegate, QTableWidgetItem
from bs4 import BeautifulSoup, Tag
from ui.MainWindow import Ui_MainWindow
translate = QCoreApplication.translate
def query_error(query: QSqlQuery) -> NoReturn:
"""Standarized query error reporter."""
print(
translate("MainWindow", "SQL Error:\n")
+ "{}\n{}\n{}:{}".format(
query.executedQuery(),
query.boundValues(),
query.lastError().type(),
query.lastError().text(),
)
)
raise Exception(translate("MainWindow", "SQL Error"))
class dateDelegate(QStyledItemDelegate):
def displayText(self, value, locale) -> str:
date = datetime.date.fromtimestamp(value)
return date.strftime("%B %-d, %Y")
class MainWindow(QMainWindow, Ui_MainWindow):
show_entries = Signal(int)
def __init__(self) -> None:
super(MainWindow, self).__init__()
self.setupUi(self)
model = QSqlQueryModel()
query = QSqlQuery("SELECT * FROM cases ORDER BY docket_id")
if not query.exec():
query_error(query)
model.setQuery(query)
self.casesView.setModel(model)
self.casesView.setSelectionMode(QAbstractItemView.SelectionMode.SingleSelection)
self.casesView.setSelectionBehavior(QAbstractItemView.SelectionBehavior.SelectRows)
self.casesView.hideColumn(0)
self.casesView.hideColumn(2)
self.casesView.setItemDelegateForColumn(5, dateDelegate())
self.casesView.resizeColumnToContents(1)
self.casesView.resizeColumnToContents(5)
header = self.casesView.horizontalHeader()
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Fixed)
header.setSectionResizeMode(4, QHeaderView.ResizeMode.Fixed)
self.show()
remaining = self.casesView.width() - header.sectionSize(1) - header.sectionSize(5) - 5
self.casesView.setColumnWidth(3,int(remaining * 0.5))
self.casesView.setColumnWidth(4,int(remaining * 0.5))
self.casesView.verticalHeader().hide()
self.casesView.resizeRowsToContents()
self.casesView.doubleClicked.connect(self.rowClicked)
self.casesView.clicked.connect(self.rowClicked)
self.docketWidget.setColumnCount(2)
self.docketWidget.setHorizontalHeaderLabels([
'Date','Proceedings and Orders',
])
self.docketWidget.resizeColumnToContents(0)
self.docketWidget.horizontalHeader().setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch)
return
def populateDocket(self, case_id:int) -> None:
query = QSqlQuery()
query.prepare("SELECT * FROM entries WHERE case_id=:cid ORDER BY entry_id")
query.bindValue(":cid", case_id)
if not query.exec():
query_error(query)
self.docketWidget.clearContents()
row = 0
while query.next():
print(query.value(0), query.value(1), query.value(2), query.value(3))
item = QTableWidgetItem()
item.setData(Qt.ItemDataRole.DisplayRole, query.value(2))
self.docketWidget.setItem(row,0, item)
item = QTableWidgetItem()
item.setData(Qt.ItemDataRole.DisplayRole, query.value(3))
self.docketWidget.setItem(row, 1, item)
row += 1
self.docketWidget.setRowCount(row)
return
def rowClicked(self, index:QModelIndex) -> None:
docket = index.siblingAtColumn(1).data()
print(docket)
self.show_entries.emit(index.siblingAtColumn(0).data())
self.populateDocket(index.siblingAtColumn(0).data())
return
SQL_CMDS = [
#"PRAGMA foreign_keys=ON",
"CREATE TABLE IF NOT EXISTS cases "
"(case_id INTEGER PRIMARY KEY AUTOINCREMENT, "
"docket_id TEXT, "
"linked INTEGER, "
"petitioners TEXT, respondents TEXT, date INTEGER, "
"FOREIGN KEY(linked) REFERENCES cases(case_id))",
#
"CREATE TABLE IF NOT EXISTS entries ("
"entry_id INTEGER PRIMARY KEY AUTOINCREMENT, "
"case_id INTEGER, "
"date INTEGER, "
"text TEXT, "
"FOREIGN KEY(case_id) REFERENCES cases(case_id))",
#
"CREATE TABLE IF NOT EXISTS documents ("
"document_id INTEGER PRIMARY KEY AUTOINCREMENT, "
"entry_id INTEGER, "
"name TEXT, "
"url TEXT, "
"FOREIGN KEY(entry_id) REFERENCES entries(entry_id))",
]
def schema_update(db: QSqlDatabase) -> None:
query = QSqlQuery()
for sql in SQL_CMDS:
inlower = sql.lower().strip()
if not inlower.startswith("create table "):
if not query.exec(sql):
query_error(query)
continue
create_cmd = re.sub(r"IF NOT EXISTS ", "", sql.strip())
create_cmd = re.sub(r"\s\s*", " ", create_cmd)
matches = re.search(r"^(CREATE TABLE )([^ ]+)( \(.+)$", create_cmd)
if matches:
table_name = matches.group(2)
create_cmd = (
matches.group(1)
+ matches.group(2)
+ matches.group(3)
)
else:
raise AttributeError(f"No match found: {create_cmd}")
print("Table name = {}".format(table_name))
query.prepare("SELECT sql FROM sqlite_schema WHERE tbl_name = :tbl")
query.bindValue(":tbl", table_name)
if not query.exec():
query_error(query)
if not query.next():
print(sql)
if not query.exec(sql):
query_error(query)
continue
old = query.value(0)
if old.lower() == create_cmd.lower():
continue
print(old.lower())
print(create_cmd.lower())
print(translate("MainWindow", "Updating: ") + f"{table_name}")
# Step 1 turn off foreign key constraints
if not query.exec("PRAGMA foreign_keys=OFF"):
query_error(query)
# Step 2 start a transaction
db.transaction()
# Step 3 remember old indexes, triggers, and views
# Step 4 create new table
new_table_name = table_name + "_new"
sql = matches.group(1) + new_table_name + matches.group(3)
print(sql)
if not query.exec(sql):
query_error(query)
# step 5 transfer content
coldefs = re.search(r"\((.+)\)", old).group(1).split(", ") # type: ignore[union-attr]
cols = [x.split(" ")[0] for x in filter(lambda s: not s.startswith('FOREIGN '),coldefs)]
cols_str = ", ".join(cols)
sql = f"INSERT INTO {new_table_name} ({cols_str}) SELECT {cols_str} FROM {table_name}"
query.prepare(sql)
if not query.exec():
query_error(query)
# step 6 Drop old table
query.prepare("DROP TABLE " + table_name)
if not query.exec():
query_error(query)
# step 6 rename new table to old table
query.prepare("ALTER TABLE " + new_table_name + " RENAME TO " + table_name)
if not query.exec():
query_error(query)
# step 8 create indexes, triggers, and views
# step 9 rebuild affected views
# step 10 turn foreign key constrants back on
# if not query.exec("PRAGMA foreign_keys=ON"):
# query_error(query)
# step 11 commit the changes
db.commit()
return
def update_proceedings(case_id: int, bs: BeautifulSoup) -> None:
table = bs.find('table', id="proceedings")
assert isinstance(table, Tag)
trs = table.find_all('tr')
tr = trs.pop(0)
query = QSqlQuery()
while len(trs) > 0:
tr = trs.pop(0)
assert isinstance(tr, Tag)
td = tr.contents[0]
assert isinstance(td, Tag) and isinstance(td.string, str)
date = dateparser.parse(td.string)
td = tr.contents[1]
assert isinstance(td, Tag) and isinstance(td.string, str)
text = td.string.strip()
query.prepare("SELECT * FROM entries WHERE case_id = :cid AND date = :date AND text=:text")
query.bindValue(':cid', case_id)
query.bindValue(':text', text)
assert isinstance(date, datetime.date)
query.bindValue(':date', date.timestamp())
if not query.exec():
query_error(query)
if not query.next():
query.prepare("INSERT INTO entries (case_id, date, text) VALUES (:cid,:date,:text)")
query.bindValue(':cid', case_id)
query.bindValue(':date', date.timestamp)
query.bindValue(':text', text)
if not query.exec():
query_error(query)
entry_id = query.lastInsertId()
else:
entry_id = query.value(0)
tr = trs.pop(0)
assert isinstance(tr, Tag)
assert isinstance(tr.contents[1], Tag)
print(tr.contents[1])
for a in tr.contents[1]:
assert isinstance(a, Tag)
url = a.attrs['href']
name = a.string
query.prepare("SELECT * FROM documents WHERE url=:url AND entry_id = :eid")
query.bindValue(':url', url)
query.bindValue(":eid", entry_id)
if not query.exec():
query_error(query)
if not query.next():
query.prepare("INSERT INTO documents (entry_id, name, url) "
"VALUES (:eid, :name, :url)")
query.bindValue(":eid", entry_id)
query.bindValue(":name", name)
query.bindValue(":url", url)
if not query.exec():
query_error(query)
break
return
def update_db(case_id) -> int:
r = requests.get('https://www.supremecourt.gov/docket/docketfiles/html/public/{}.html'.format(case_id))
if r.status_code != 200:
print(r.status_code)
exit(1)
bs = BeautifulSoup(r.text,'lxml')
#
# docket_id, previous_docket, petitioners, respondents, date
# all come from the docketinfo table
#
di = bs.find('table',id='docketinfo')
assert di is not None and isinstance(di, Tag)
#
# docket_id is first row, first column
docket_id = di.find('span')
assert docket_id is not None and isinstance(docket_id, Tag)
docket_id = docket_id.contents[0]
assert isinstance(docket_id, str)
docket_id = docket_id.strip()
docket_id = docket_id.replace('No. ','')
#
# Title is second row, first column
tr = di.contents[1]
assert isinstance(tr, Tag) and isinstance(tr.contents[0], Tag)
assert tr.contents[0].string == 'Title:'
td = tr.contents[1]
assert isinstance(td, Tag)
span = td.contents[0]
assert isinstance(span, Tag) and isinstance(span.contents[0], str)
petitioners = span.contents[0].strip()
#
# XXX - We need to deal with other titles. Change this to an RE
# UPDATED: we are just handling the two we know about.
#
petitioners = petitioners.replace(', Petitioners','')
petitioners = petitioners.replace(', Applicants','')
assert isinstance(span.contents[4], str)
respondent = span.contents[4].strip()
#
# Date on which the case was docketed
tr = di.contents[2]
assert isinstance(tr,Tag) and isinstance(tr.contents[1], Tag)
td = tr.contents[1]
assert isinstance(td, Tag) and td.string is not None
docket_date = td.string.strip()
date = dateparser.parse(docket_date)
#
# linked case is row 3, column 0
tr = di.contents[3]
assert isinstance(tr, Tag) and isinstance(tr.contents[0], Tag)
linked = tr.contents[0].string
print(docket_id, petitioners, respondent, date, linked)
#
# See if this case already exists.
#
query = QSqlQuery()
query.prepare("SELECT * FROM cases WHERE docket_id = :did")
query.bindValue(':did', docket_id)
if not query.exec():
query_error(query)
#
# if it does not exists, create it. This stops a recursion loop.
#
if not query.next():
query.prepare("INSERT INTO cases (docket_id, petitioners, respondents, date, linked) "
"VALUES (:did, :pet, :resp, :date, NULL)")
query.bindValue(':did', docket_id)
query.bindValue(':pet', petitioners)
query.bindValue(':resp', respondent)
assert isinstance(date, datetime.date)
query.bindValue(':date', date.timestamp())
if not query.exec():
query_error(query)
case_id = query.lastInsertId()
linked_id = None
else:
case_id = query.value(0)
linked_id = query.value('linked')
assert isinstance(case_id, int)
#
# If there is a linked case, we need to get the ID for that case.
if linked is not None:
linked = linked.replace('Linked with ','')
query.prepare("SELECT * FROM cases WHERE docket_id = :did")
query.bindValue(':did', linked)
if not query.exec():
query_error(query)
if not query.next():
new_id = update_db(linked)
else:
new_id = query.value(0)
if new_id != linked_id:
query.prepare("UPDATE cases SET linked=:lid WHERE case_id = :cid")
query.bindValue(':lid', new_id)
query.bindValue(':cid', case_id)
if not query.exec():
query_error(query)
#
# XXX - Process lower courts
#
update_proceedings(case_id, bs)
return(case_id)
def main() -> int:
app = QApplication(sys.argv)
db = QSqlDatabase.addDatabase("QSQLITE")
#db.setConnectOptions("PRAGMA foreign_keys = ON")
db.setDatabaseName("scotus.db")
db.open()
schema_update(db)
#update_db('24-203')
#update_db('23A1058')
window = MainWindow()
return app.exec()
if __name__ == "__main__":
sys.exit(main())