Utilização de subprocesso para execução, evitando o travamento da interface.

This commit is contained in:
Clovis Fabricio Costa 2022-10-04 13:33:13 -03:00
parent 2f89ab3d01
commit 18cd0ce064
1 changed files with 198 additions and 100 deletions

View File

@ -13,19 +13,21 @@ of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero Gener
You should have received a copy of the GNU Affero General Public License along with zipasta. If not, You should have received a copy of the GNU Affero General Public License along with zipasta. If not,
see https://www.gnu.org/licenses/ see https://www.gnu.org/licenses/
""" """
import argparse
import base64 import base64
import itertools import itertools
import random import random
import shutil import shutil
import string import string
import subprocess
import sys import sys
import zlib import zlib
import pathlib import pathlib
import tkinter as tk import tkinter as tk
from tkinter import filedialog as fdlg from tkinter import filedialog as fdlg, ttk
PROGRAM_TITLE = 'Zipasta 0.2 © Clóvis Fabrício Costa'
MAX_SIZE = 230 MAX_SIZE = 230
@ -36,6 +38,41 @@ def _random_chars(number=1):
return ''.join(random.choice(_order_digits) for _ign in range(number)) return ''.join(random.choice(_order_digits) for _ign in range(number))
class CommandDialog:
def __init__(self, parent, command, callback, timeout=200):
self.window = tk.Toplevel(parent)
self.window.geometry('600x100')
self.window.title('Aguarde...')
self.command = [sys.executable]
if not getattr(sys, 'frozen', False):
self.command.append(sys.argv[0])
self.command.extend(command)
self._callback = callback
self._timeout = timeout
self._p: subprocess.Popen | None = None
self._init_window()
self._devnull = None # open(os.devnull, 'rb+')
def _init_window(self):
self.progress = ttk.Progressbar(self.window, orient=tk.HORIZONTAL, length=100, mode='indeterminate')
self.progress.pack(expand=True)
self.progress.start()
self.window.grab_set()
def start(self):
self._p = subprocess.Popen(self.command, stdin=self._devnull, stdout=self._devnull,
stderr=subprocess.STDOUT)
self.window.after(self._timeout, self.wait)
def wait(self):
if self._p.poll() is not None:
self.progress.stop()
self.window.destroy()
self._callback(self)
else:
self.window.after(self._timeout, self.wait)
class App: class App:
def __init__(self, parent, default_path: pathlib.Path = None): def __init__(self, parent, default_path: pathlib.Path = None):
self.parent = parent self.parent = parent
@ -83,28 +120,28 @@ class App:
def _build_gui(self): def _build_gui(self):
main_frame = self._make_frame(self.parent, main_row=1) main_frame = self._make_frame(self.parent, main_row=1)
self._build_frame_caminho(main_frame).grid(row=0, column=0) self._build_frame_caminho(main_frame).grid(row=0, column=0, sticky='nswe')
self._build_frame_arquivos(main_frame).grid(row=1, column=0) self._build_frame_arquivos(main_frame).grid(row=1, column=0, sticky='nswe')
main_frame.grid(row=0, column=0) main_frame.grid(row=0, column=0, sticky='nswe')
return main_frame return main_frame
def _build_frame_caminho(self, parent): def _build_frame_caminho(self, parent):
_frame = self._make_frame(parent) _frame = self._make_frame(parent)
self._make_label(_frame, 'caminho').grid(row=0, column=0) self._make_label(_frame, 'caminho').grid(row=0, column=0, sticky='nswe')
if self.path is not None: if self.path is not None:
self['caminho'] = self.path self['caminho'] = self.path
self._make_button(_frame, 'caminho', '').grid(row=0, column=1) self._make_button(_frame, 'caminho', '').grid(row=0, column=1, sticky='nswe')
return _frame return _frame
def _build_frame_arquivos(self, parent): def _build_frame_arquivos(self, parent):
_frame = self._make_frame(parent, main_row=1) _frame = self._make_frame(parent, main_row=1)
self._make_label(_frame, 'arquivos', 'Arquivos:').grid(row=0, column=0, columnspan=3) self._make_label(_frame, 'arquivos', 'Arquivos:').grid(row=0, column=0, columnspan=3, sticky='nswe')
self._arquivos = tk.Listbox(_frame) self._arquivos = tk.Listbox(_frame)
self._arquivos.grid(row=1, column=0, columnspan=3) self._arquivos.grid(row=1, column=0, columnspan=3, sticky='nswe')
self._make_button(_frame, 'adicionar', '').grid(row=2, column=0) self._make_button(_frame, 'adicionar', '').grid(row=2, column=0, sticky='e')
self._make_button(_frame, 'extrair', '').grid(row=2, column=1) self._make_button(_frame, 'extrair', '').grid(row=2, column=1, sticky='e')
self._make_button(_frame, 'deletar', '🗑').grid(row=2, column=2) self._make_button(_frame, 'deletar', '🗑').grid(row=2, column=2, sticky='e')
return _frame return _frame
def btn_caminho(self): def btn_caminho(self):
@ -119,31 +156,110 @@ class App:
def _read_folder(self): def _read_folder(self):
self._arquivos.delete(0, tk.END) self._arquivos.delete(0, tk.END)
_mainpath = self.path / '_z' self.arquivos_prefixos = {}
if _mainpath.is_dir(): for filename, prefix in _read_folder(self.path):
for filename in _mainpath.iterdir(): self.arquivos_prefixos[filename.name] = (filename, prefix)
if filename.is_dir() and filename.name != '_':
self.arquivos_prefixos[filename.name] = (filename, next(filename.iterdir()).name)
self._arquivos.insert(tk.END, filename.name) self._arquivos.insert(tk.END, filename.name)
def btn_adicionar(self): def btn_adicionar(self):
_paths = fdlg.askopenfilenames(parent=self.parent, title='Selecione arquivos a codificar') _paths = fdlg.askopenfilenames(parent=self.parent, title='Selecione arquivos a codificar')
if not _paths:
return
_cmd = [str(self.path), 'add']
_cmd.extend(str(path) for path in _paths)
_dl = CommandDialog(self.parent, _cmd, self._cmd_ok)
_dl.start()
def _cmd_ok(self, _dl=None):
self._read_folder()
def btn_deletar(self):
_datapath = self.path / '_z' / '_'
arquivos_deletar = [self._arquivos.get(i) for i in self._arquivos.curselection()]
if not arquivos_deletar:
return
_cmd = [str(self.path), 'delete']
_cmd.extend(arquivos_deletar)
_dl = CommandDialog(self.parent, _cmd, self._cmd_ok)
_dl.start()
def btn_extrair(self):
# Traverse the tuple returned by
# curselection method and print
# corresponding value(s) in the listbox
_sel = self._arquivos.curselection()
for i in self._arquivos.curselection():
_filename = self._arquivos.get(_sel[0])
_new_filename = fdlg.asksaveasfilename(parent=self.parent, title='Salvar como...', initialfile=_filename)
if not _new_filename:
return
_cmd = [str(self.path), 'extract', _filename, _new_filename]
_dl = CommandDialog(self.parent, _cmd, self._cmd_ok)
_dl.start()
def _read_folder(origem):
origem = pathlib.Path(origem) / '_z'
if origem.is_dir():
for filename in origem.iterdir():
if filename.is_dir() and filename.name != '_':
yield filename, next(filename.iterdir()).name
def _cmd_extract(origem, arquivo, new_filename):
origem = pathlib.Path(origem)
new_filename = pathlib.Path(new_filename)
arquivos_prefixos = {
_path.name: (_path, _prefix) for _path, _prefix in _read_folder(origem)
}
_fullname, prefix = arquivos_prefixos[arquivo]
_datapath = origem / '_z' / '_' / prefix
data = bytearray()
for folder in sorted(_datapath.rglob('*.*'), key=str):
data += folder.name.rpartition('.')[-1].encode('ascii')
data = base64.urlsafe_b64decode(data)
data = zlib.decompress(data)
with new_filename.open('wb') as f:
f.write(data)
def _cmd_delete(destino, _arquivos):
destino = pathlib.Path(destino)
arquivos_prefixos = {
_path.name: (_path, _prefix) for _path, _prefix in _read_folder(destino)
}
for _filename in _arquivos:
_path, _prefix = arquivos_prefixos[_filename]
# pastas_deletar = []
# pastas_deletar.append(_path / _prefix)
# pastas_deletar.append(_path)
# for pasta in (destino / _prefix).rglob('*'):
# pastas_deletar.append(pasta)
# print(f'Deletando {len(pastas_deletar)} pastas...')
# pastas_deletar.sort(key=lambda x: len(str(x)), reverse=True)
# for pasta in pastas_deletar:
# pasta.rmdir()
shutil.rmtree(destino / '_z' / _prefix, ignore_errors=True)
shutil.rmtree(_path, ignore_errors=True)
def _cmd_add(destino, _arquivos):
destino = pathlib.Path(destino)
pastas_criar = [] pastas_criar = []
for _path in _paths: for _path in _arquivos:
_path = pathlib.Path(_path) _path = pathlib.Path(_path)
with _path.open('rb') as f: with _path.open('rb') as f:
data = f.read() data = f.read()
original_size = len(data) original_size = len(data)
data = zlib.compress(data, level=8) data = zlib.compress(data, level=8)
data = base64.urlsafe_b64encode(data) data = base64.urlsafe_b64encode(data)
_mainpath = self.path / '_z' _mainpath = destino / '_z'
while True: while True:
prefix = _random_chars(3) prefix = _random_chars(3)
_datapath = _mainpath / '_' / prefix _datapath = _mainpath / '_' / prefix
if not _datapath.exists(): if not _datapath.exists():
break break
# _datapath.mkdir(parents=True, exist_ok=False) # _datapath.mkdir(parents=True, exist_ok=False)
print(f'Salvando {_path.name}({original_size // 1024}k) como {prefix}')
pastas_criar.append(_datapath) pastas_criar.append(_datapath)
pastas_criar.append(_mainpath / str(_path.name) / prefix) pastas_criar.append(_mainpath / str(_path.name) / prefix)
@ -155,7 +271,7 @@ class App:
if num_pastas < len(_order_digits) ** digitos_usar: if num_pastas < len(_order_digits) ** digitos_usar:
break break
digitos_usar += 1 digitos_usar += 1
print(f'{digitos_usar} digitos para {num_pastas} pastas')
for pos, prefixos in enumerate(itertools.product(_order_digits, repeat=digitos_usar)): for pos, prefixos in enumerate(itertools.product(_order_digits, repeat=digitos_usar)):
pedaco = data[pos * bytes_por_pasta:(pos + 1) * bytes_por_pasta].decode("ascii") pedaco = data[pos * bytes_por_pasta:(pos + 1) * bytes_por_pasta].decode("ascii")
if not pedaco: if not pedaco:
@ -168,65 +284,47 @@ class App:
nome_pasta = f'{"".join(prefixos)}.{pedaco}' nome_pasta = f'{"".join(prefixos)}.{pedaco}'
pasta_criar = pasta_criar / nome_pasta pasta_criar = pasta_criar / nome_pasta
pastas_criar.append(pasta_criar) pastas_criar.append(pasta_criar)
for pasta in pastas_criar: for pasta in pastas_criar:
pasta.mkdir(parents=True) pasta.mkdir(parents=True)
self._read_folder()
def btn_deletar(self):
_datapath = self.path / '_z' / '_'
for i in self._arquivos.curselection():
pastas_deletar = []
_filename = self._arquivos.get(i)
_path, _prefix = self.arquivos_prefixos[_filename]
pastas_deletar.append(_path / _prefix)
pastas_deletar.append(_path)
for pasta in (_datapath / _prefix).rglob('*'):
pastas_deletar.append(pasta)
print(f'Deletando {len(pastas_deletar)} pastas...')
# pastas_deletar.sort(key=lambda x: len(str(x)), reverse=True)
# for pasta in pastas_deletar:
# pasta.rmdir()
shutil.rmtree(_datapath / _prefix)
shutil.rmtree(_path)
self._read_folder()
def btn_extrair(self): def parse_arguments(argv):
# Traverse the tuple returned by parser = argparse.ArgumentParser(description=PROGRAM_TITLE)
# curselection method and print parser.add_argument('drive', metavar='DRIVE', type=pathlib.Path, help='Caminho do drive')
# corresponding value(s) in the listbox subparsers = parser.add_subparsers(help='Comando', dest='cmd')
_sel = self._arquivos.curselection()
if len(_sel) == 1:
_filename = self._arquivos.get(_sel[0])
_new_filename = fdlg.asksaveasfilename(parent=self.parent, title='Salvar como...', initialfile=_filename)
if not _new_filename:
return
_new_filename = pathlib.Path(_new_filename).absolute()
self._extract_file(_filename, _new_filename)
else:
pasta_salvar = fdlg.askdirectory(parent=self.parent, mustexist=True, title='Pasta a salvar arquivos')
if not pasta_salvar:
return
pasta_salvar = pathlib.Path(pasta_salvar)
for i in self._arquivos.curselection():
_filename = self._arquivos.get(i)
_new_filename = pasta_salvar / _filename
self._extract_file(_filename, _new_filename)
def _extract_file(self, filename, new_filename): cmd_add = subparsers.add_parser('add')
_fullname, prefix = self.arquivos_prefixos[filename] cmd_add.add_argument('filenames', metavar='FILENAME', type=pathlib.Path, nargs='+', help='Files to add')
_datapath = self.path / '_z' / '_' / prefix
data = bytearray() cmd_extract = subparsers.add_parser('extract')
for folder in sorted(_datapath.rglob('*.*'), key=str): cmd_extract.add_argument('filename', metavar='FILENAME', type=str, help='File to extract', )
data += folder.name.rpartition('.')[-1].encode('ascii') cmd_extract.add_argument('destination', metavar='DESTINATION', type=pathlib.Path, help='Where to extract')
data = base64.urlsafe_b64decode(data)
data = zlib.decompress(data) cmd_delete = subparsers.add_parser('delete')
with new_filename.open('wb') as f: cmd_delete.add_argument('filenames', metavar='FILENAME', type=str, nargs='+', help='Files to add')
f.write(data)
opts = parser.parse_args(argv)
return opts
def main(argv=None): def main(argv=None):
if len(argv) > 1:
opts = parse_arguments(argv[1:])
if opts.cmd == 'add':
_cmd_add(opts.drive, opts.filenames)
elif opts.cmd == 'extract':
_cmd_extract(opts.drive, opts.filename, opts.destination)
elif opts.cmd == 'delete':
_cmd_delete(opts.drive, opts.filenames)
else:
root = tk.Tk() root = tk.Tk()
root.title('Zipasta 0.1 © Clóvis Fabrício Costa') root.title(PROGRAM_TITLE)
root.columnconfigure(0, weight=1)
root.rowconfigure(0, weight=1)
root.geometry('800x600')
app = App(root) app = App(root)
root.mainloop() root.mainloop()