-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Automove boring files, modernise code and messages
Also update dependencies, and clean up a whole bunch of stuff.
- Loading branch information
Showing
9 changed files
with
467 additions
and
324 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
import shutil | ||
from pathlib import Path | ||
|
||
|
||
def delete(path: Path) -> None: | ||
""" | ||
Recursively deletes `path` without confirmation. | ||
:param path: the path to the file/directory to delete | ||
:return: `None` | ||
""" | ||
|
||
if not path.exists(): | ||
return | ||
|
||
if path.is_file(): | ||
path.unlink() | ||
else: | ||
shutil.rmtree(path) | ||
|
||
|
||
def move_into(path: Path, target: Path) -> None: | ||
""" | ||
Moves `path` into the directory `target`, retaining the name of `path`, and creating `target` and its parents if | ||
they do not exist yet. | ||
:param path: the file/directory to move into `path` | ||
:param target: the directory to move `path` into | ||
:return: `None` | ||
""" | ||
|
||
target.mkdir(exist_ok=True, parents=True) | ||
delete(target / path.name) | ||
shutil.move(path, target / path.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
import os | ||
import subprocess | ||
from pathlib import Path | ||
from typing import List | ||
|
||
from State import cfg | ||
|
||
|
||
def prompt_confirmation(message: str) -> bool: | ||
""" | ||
Prompts the user to confirm `message`, returning `True` if the user inputs `"y"`, returning `False` if the user | ||
inputs `"n"`, and repeating the prompt otherwise. | ||
:param message: the message to present to the user for confirmation | ||
:return: whether the user confirms the prompt | ||
""" | ||
|
||
while True: | ||
result = input(message).lower() | ||
if result == "y": | ||
return True | ||
elif result == "n": | ||
return False | ||
else: | ||
continue | ||
|
||
|
||
def run_executable(args: List[str], compatdata_path: str, cwd: Path = Path.cwd()) -> None: | ||
""" | ||
Runs the command `args` inside `cwd`; on Windows the command is executed directly on the command line, but on Linux | ||
the command is executed in a Proton instance using `compatdata_path`. | ||
:param args: the arguments of the command to execute | ||
:param compatdata_path: the path to the compatdata directory for Proton; may be `None` for Windows | ||
:param cwd: the directory to run the command in | ||
:return: `None` | ||
""" | ||
|
||
if cfg.windows: | ||
subprocess.Popen(args, cwd=cwd, stdout=subprocess.DEVNULL).wait() | ||
else: | ||
subprocess.Popen([cfg.proton_path, "run"] + args, cwd=cwd, stdout=subprocess.DEVNULL, | ||
env=dict(os.environ, | ||
STEAM_COMPAT_CLIENT_INSTALL_PATH=cfg.steam_path, | ||
STEAM_COMPAT_DATA_PATH=compatdata_path)).wait() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
from pathlib import Path | ||
from types import SimpleNamespace | ||
|
||
|
||
def load_config() -> SimpleNamespace: | ||
""" | ||
Loads configuration from config files and set globals. | ||
:return: the loaded configuration | ||
""" | ||
|
||
from config_default import config as cfg_default | ||
if Path("config.py").exists(): | ||
from config import config as cfg_user | ||
else: | ||
cfg_user = {} | ||
|
||
my_cfg = cfg_default | cfg_user | ||
if not my_cfg["windows"]: | ||
if ("linux_settings" in cfg_default.keys()) and ("linux_settings" in cfg_user.keys()): | ||
my_cfg["linux_settings"] = cfg_default["linux_settings"] | cfg_user["linux_settings"] | ||
my_cfg = my_cfg | my_cfg["linux_settings"] | ||
my_cfg = SimpleNamespace(**my_cfg) | ||
|
||
# Version of this script | ||
my_cfg.script_version = "3.1.0" | ||
# Path to scripts | ||
my_cfg.script_root = my_cfg.game_root / "Edit scripts/" | ||
# Path to exported dumps | ||
my_cfg.dump_root = my_cfg.script_root / "dumps/" | ||
# Path to store files that have been archived in | ||
my_cfg.dump_archived = my_cfg.dump_root / "_archived/" | ||
# Path to store dump parts in | ||
my_cfg.dump_parts = my_cfg.dump_root / "_parts/" | ||
# Path to store SQLite database at | ||
my_cfg.db_path = my_cfg.dump_root / f"fo76-dumps-v{my_cfg.script_version}-v{my_cfg.game_version}.db" | ||
# Path to `_done.txt` | ||
my_cfg.done_path = my_cfg.dump_root / "_done.txt" | ||
|
||
return my_cfg | ||
|
||
|
||
# Configuration | ||
cfg = load_config() | ||
# Unfinished subprocesses | ||
subprocesses = {} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
import shutil | ||
import sqlite3 | ||
import subprocess | ||
from pathlib import Path | ||
from typing import List | ||
|
||
import pandas as pd | ||
|
||
import Files | ||
from IO import prompt_confirmation, run_executable | ||
from State import cfg, subprocesses | ||
|
||
|
||
def xedit() -> None: | ||
""" | ||
Runs xEdit and waits until it closes. | ||
:return: `None` | ||
""" | ||
|
||
print(f"> Running xEdit using '{cfg.xedit_path}'.\n" | ||
f"> Be sure to double-check version information in the xEdit window!") | ||
|
||
# Check for existing files | ||
if cfg.done_path.exists(): | ||
if not prompt_confirmation(f"> WARNING: " | ||
f"'{cfg.done_path.name}' already exists and must be deleted. " | ||
f"Do you want to DELETE '{cfg.done_path}' and continue? (y/n) "): | ||
exit() | ||
Files.delete(cfg.done_path) | ||
|
||
# Create ini if it does not exist | ||
config_dir = (Path.home() / "Documents/My Games/Fallout 76/" if cfg.windows | ||
else cfg.xedit_compatdata_path / "pfx/drive_c/users/steamuser/Documents/My Games/Fallout 76/") | ||
config_dir.mkdir(exist_ok=True, parents=True) | ||
(config_dir / "Fallout76.ini").touch(exist_ok=True) | ||
|
||
# Store initial `_done.txt` modification time | ||
done_time = cfg.done_path.stat().st_mtime if cfg.done_path.exists() else None | ||
|
||
# Actually run xEdit | ||
run_executable(args=[cfg.xedit_path, f"-D:{cfg.game_root / 'Data/'}", "ExportAll.fo76pas"], | ||
compatdata_path=cfg.xedit_compatdata_path if not cfg.windows else "", | ||
cwd=cfg.script_root) | ||
|
||
# Check if `_done.txt` changed | ||
new_done_time = cfg.done_path.stat().st_mtime if cfg.done_path.exists() else None | ||
if new_done_time is None or done_time == new_done_time: | ||
if not prompt_confirmation(f"> WARNING: " | ||
f"xEdit did not create or update '{cfg.done_path.name}', indicating that the dump " | ||
f"scripts may have failed. " | ||
f"Continue anyway? (y/n) "): | ||
exit() | ||
|
||
# Post-processing | ||
prefix_outputs() | ||
concat_parts() | ||
create_db() | ||
if cfg.enable_archive_xedit: | ||
archive_start() | ||
|
||
print("> Done running xEdit.\n") | ||
|
||
|
||
def concat_parts_of(input_paths: List[Path], target: Path) -> None: | ||
""" | ||
Concatenates the contents of all files in `input_paths` and writes the output to `target`. | ||
:return: `None` | ||
""" | ||
if len(input_paths) == 0: | ||
return | ||
|
||
input_paths.sort() | ||
|
||
with target.open("wb") as f_out: | ||
for input_path in input_paths: | ||
with input_path.open("rb") as f_in: | ||
shutil.copyfileobj(f_in, f_out) | ||
|
||
|
||
def prefix_outputs() -> None: | ||
""" | ||
Prefixes the exported files with `"tabular."` or `"wiki."` depending on the file type, ignoring files that already | ||
have the appropriate prefix. | ||
:return: `None` | ||
""" | ||
|
||
print(">> Prefixing files.") | ||
|
||
for file in list(cfg.dump_root.glob("*.csv")) + list(cfg.dump_root.glob("*.wiki")): | ||
prefix = "tabular." if file.suffix == ".csv" else "wiki." | ||
|
||
# Skip already-prefixed files | ||
if not file.stem.startswith(prefix): | ||
file.rename(file.parent / f"{prefix}{file.name}") | ||
|
||
print(">> Done prefixing files.") | ||
|
||
|
||
def concat_parts() -> None: | ||
""" | ||
Concatenates files that have been dumped in parts by the xEdit script, and moves parts to the `"_parts"` | ||
subdirectory. | ||
:return: `None` | ||
""" | ||
|
||
print(">> Combining dumped CSV parts.") | ||
|
||
print(">>> Combining 'tabular.IDs.csv'.") | ||
parts = list(cfg.dump_root.glob("IDs.csv.*")) | ||
concat_parts_of(parts, cfg.dump_root / "tabular.IDs.csv") | ||
[Files.move_into(part, cfg.dump_parts) for part in parts] | ||
|
||
print(">>> Combining 'wiki.TERM.wiki'.") | ||
parts = list(cfg.dump_root.glob("TERM.wiki.*")) | ||
concat_parts_of(parts, cfg.dump_root / "wiki.TERM.wiki") | ||
[Files.move_into(part, cfg.dump_parts) for part in parts] | ||
|
||
print(">> Done combining dumped CSV parts.") | ||
|
||
|
||
def create_db() -> None: | ||
""" | ||
Imports the dumped CSVs into an SQLite database. | ||
:return: `None` | ||
""" | ||
|
||
print(f">> Importing CSVs into SQLite database at '{cfg.db_path}'.") | ||
|
||
# Check for existing files | ||
if cfg.db_path.exists(): | ||
if not prompt_confirmation(f">> WARNING: " | ||
f"'{cfg.db_path.name}' already exists and must be deleted. " | ||
f"Do you want to DELETE '{cfg.db_path}' and continue? (y/n) "): | ||
exit() | ||
Files.delete(cfg.db_path) | ||
|
||
# Import into database | ||
with sqlite3.connect(cfg.db_path) as con: | ||
for csv in list(cfg.dump_root.glob("*.csv")): | ||
table_name = csv.stem.split('.', 1)[1] | ||
print(f">>> Importing '{csv.name}' into table '{table_name}'.") | ||
|
||
df = pd.read_csv(csv, quotechar='"', doublequote=True, skipinitialspace=True, | ||
dtype="string", encoding="iso-8859-1") | ||
df.columns = df.columns.str.replace(" ", "_") | ||
df.to_sql(table_name, con, index=False) | ||
|
||
print(f">> Done importing CSVs into SQLite database at '{cfg.db_path}'.") | ||
|
||
|
||
def archive_start() -> None: | ||
""" | ||
Archives all xEdit dumps that are larger than 10MB, moving the files that are archived into `_archived`. | ||
:return: `None` | ||
""" | ||
|
||
print(">> Archiving large xEdit dumps in the background.") | ||
|
||
for dump in (list(cfg.dump_root.glob("*.csv")) + | ||
list(cfg.dump_root.glob("*.wiki")) + | ||
list(cfg.dump_root.glob("*.db"))): | ||
if dump.stat().st_size < 10000000: | ||
continue | ||
|
||
print(f">>> Starting archiving of '{dump.name}'.") | ||
process = subprocess.Popen([cfg.archiver_path, "a", "-mx9", "-mmt4", f"{dump.name}.7z", dump.name], | ||
cwd=cfg.dump_root, | ||
stdout=subprocess.DEVNULL, | ||
stderr=subprocess.STDOUT) | ||
subprocesses[dump.name] = {"process": process, | ||
"post": lambda it=dump: Files.move_into(it, cfg.dump_archived)} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.