mirror of
https://gh.wpcy.net/https://github.com/DecisiveDesignDE/wp-quickstart-installer.git
synced 2026-04-30 06:02:03 +08:00
1. **Installation Cancellation:**
- I've added a 'Cancel' button to Step05, which you can use during installation.
- If you click 'Cancel' or the window's 'X' button while Step05 is installing, you'll see a confirmation dialog with a sound effect.
- If you confirm, the installation will be signaled to stop.
- Any ongoing FTP/SFTP operations will check for this signal and stop promptly.
- If you cancel, I'll perform a local cleanup: `selections.json`, `connection_data.json` (obfuscated), and the temporary installation folder will be deleted.
- The UI will update to show the cancelled state.
2. **Cleanup on Successful Completion:**
- After a successful installation, `selections.json` and `connection_data.json` will now be automatically deleted.
- The temporary installation folder was already being deleted on success, and that will continue.
3. **Window Close Integration (`SetupApp.py`):
- The `on_closing` method in `SetupApp.py` now correctly interacts with Step05 to manage the cancellation confirmation when you press the 'X' button during an active installation.
These changes give you more control over the installation process and ensure better cleanup of sensitive and temporary data.
673 lines
No EOL
33 KiB
Python
673 lines
No EOL
33 KiB
Python
import tkinter as tk
|
|
from tkinter import messagebox
|
|
from tkinter import ttk
|
|
import subprocess
|
|
import threading
|
|
import winsound
|
|
import json
|
|
import sys
|
|
import tempfile
|
|
import os
|
|
import requests
|
|
import zipfile
|
|
import shutil
|
|
from ftplib import FTP
|
|
import paramiko
|
|
import base64
|
|
|
|
PLUGIN_SLUGS = {
|
|
"Elementor": "elementor",
|
|
"Contact Form 7": "contact-form-7",
|
|
"Yoast SEO": "wordpress-seo",
|
|
"WooCommerce": "woocommerce",
|
|
"LiteSpeed Cache": "litespeed-cache",
|
|
"Really Simple SSL": "really-simple-ssl",
|
|
"Yoast Duplicate Post": "duplicate-post",
|
|
"WP Mail SMTP": "wp-mail-smtp",
|
|
"Autoptimize": "autoptimize",
|
|
"Duplicator": "duplicator",
|
|
"WP Fastest Cache": "wp-fastest-cache"
|
|
}
|
|
|
|
THEME_SLUGS = {
|
|
"Hello Elementor": "hello-elementor",
|
|
"Astra": "astra",
|
|
"Kadence": "kadence",
|
|
"GeneratePress": "generatepress",
|
|
"Storefront": "storefront",
|
|
"Hello Biz": "hello-biz"
|
|
}
|
|
|
|
class Step05(tk.Frame):
|
|
def __init__(self, parent, controller, selected_plugins, selected_themes):
|
|
super().__init__(parent)
|
|
self.controller = controller
|
|
self.selected_plugins = selected_plugins
|
|
self.selected_themes = selected_themes
|
|
self.configure(bg="#f4f4f4")
|
|
|
|
self.installation_thread = None # Initialize the installation thread
|
|
self.cancel_event = threading.Event()
|
|
self.temp_folder_path = None # To store path of temp folder for cleanup
|
|
|
|
# Define status item keys
|
|
self.STATUS_DOWNLOADING_WP = "Downloading WordPress..."
|
|
self.STATUS_DOWNLOADING_THEMES = "Downloading Themes..."
|
|
self.STATUS_DOWNLOADING_PLUGINS = "Downloading Plugins..."
|
|
self.STATUS_CREATING_ZIP = "Creating .zip..."
|
|
self.STATUS_UPLOADING_FTP = "Uploading .zip to FTP..."
|
|
self.STATUS_FINISHING_UP = "Finishing up..."
|
|
|
|
# Headline
|
|
title_label = tk.Label(self, text="Installation", font=("Arial", 18, "bold"), bg="#f4f4f4")
|
|
title_label.pack(pady=(20, 10))
|
|
|
|
# Installation Notice
|
|
notice_text = (
|
|
"We will now install WordPress, the selected Plugin(s) and Theme(s) to your server. \n"
|
|
"Again, please make sure to backup your files if the folder is not empty."
|
|
)
|
|
notice_label = tk.Label(self, text=notice_text, wraplength=750, justify="left", bg="#f4f4f4", font=("Arial", 12))
|
|
notice_label.pack(pady=(10, 20), padx=20)
|
|
|
|
# Create the new checklist UI
|
|
self.checklist_frame = tk.Frame(self, bg="#f4f4f4")
|
|
self.checklist_frame.pack(pady=10, padx=20, fill=tk.X, anchor="w") # Fill horizontally, anchor west
|
|
|
|
self.status_vars = {}
|
|
self.status_labels = {}
|
|
|
|
status_items_text = [
|
|
self.STATUS_DOWNLOADING_WP,
|
|
self.STATUS_DOWNLOADING_THEMES,
|
|
self.STATUS_DOWNLOADING_PLUGINS,
|
|
self.STATUS_CREATING_ZIP,
|
|
self.STATUS_UPLOADING_FTP,
|
|
self.STATUS_FINISHING_UP
|
|
]
|
|
|
|
for item_text in status_items_text:
|
|
var = tk.StringVar()
|
|
var.set(f"❌ {item_text}") # Initial state
|
|
self.status_vars[item_text] = var
|
|
|
|
label = tk.Label(self.checklist_frame, textvariable=var, font=("Arial", 12), bg="#f4f4f4", anchor="w", justify="left")
|
|
label.pack(fill=tk.X) # Fill horizontally to use anchor
|
|
self.status_labels[item_text] = label
|
|
|
|
# Start Button
|
|
start_button = ttk.Button(self, text="START", command=self.start_installation, style="Bold.TButton")
|
|
start_button.pack(pady=10)
|
|
self.start_button = start_button # Store reference to the start button
|
|
|
|
# Button frame
|
|
button_frame = tk.Frame(self, bg="#f4f4f4")
|
|
button_frame.pack(pady=20)
|
|
|
|
prev_button = ttk.Button(button_frame, text="< Prev", command=lambda: controller.show_frame("Step04"))
|
|
prev_button.grid(row=0, column=0, padx=10)
|
|
self.prev_button = prev_button # Store reference to the prev button
|
|
|
|
# Cancel Button (new)
|
|
self.cancel_button = ttk.Button(button_frame, text="Cancel", command=self.request_cancel_confirmation, state=tk.DISABLED)
|
|
self.cancel_button.grid(row=0, column=1, padx=10)
|
|
|
|
self.next_button = ttk.Button(button_frame, text="> Next", command=lambda: controller.show_frame("Step06"), state=tk.DISABLED)
|
|
self.next_button.grid(row=0, column=2, padx=10) # Shifted to column 2
|
|
|
|
# Styling
|
|
style = ttk.Style()
|
|
style.configure("Bold.TButton", font=("Arial", 12, "bold"), padding=10)
|
|
|
|
def start_installation(self):
|
|
self.start_button.pack_forget() # Remove the start button
|
|
self.progress_label = tk.Label(self, text="Now installing your WordPress, Themes and Plugins...", font=("Arial", 12), bg="#f4f4f4")
|
|
self.progress_label.pack(pady=10)
|
|
self.progress_bar = ttk.Progressbar(self, orient="horizontal", length=400, mode="determinate")
|
|
self.progress_bar.pack(pady=10)
|
|
self.progress_bar["value"] = 0
|
|
self.progress_bar["maximum"] = 100
|
|
|
|
# self.status_list.config(state=tk.NORMAL)
|
|
# self.status_list.insert(tk.END, "Starting installation...\nPlease wait as this may take a few minutes.\n")
|
|
# self.status_list.config(state=tk.DISABLED)
|
|
self.next_button.config(state=tk.DISABLED)
|
|
self.prev_button.config(state=tk.DISABLED) # Disable the prev button
|
|
self.cancel_button.config(state=tk.NORMAL) # Enable Cancel button
|
|
self.cancel_event.clear() # Reset event for new installation
|
|
self.temp_folder_path = None # Reset temp folder path
|
|
|
|
# Update confirmation message
|
|
self.confirm_label = tk.Label(self, text="This may take several minutes depending on how many Themes and Plugins were chosen. \nPlease be patient and do not close the installation window.", font=("Arial", 12, "bold"), bg="#f4f4f4")
|
|
self.confirm_label.pack(pady=10)
|
|
|
|
def run_installation():
|
|
try:
|
|
temp_folder, _ = self.create_temp_folder()
|
|
self.temp_folder_path = temp_folder # Store for cleanup
|
|
if self.cancel_event.is_set(): return
|
|
|
|
if self.cancel_event.is_set(): return
|
|
wordpress_zip, _ = self.download_wordpress(temp_folder)
|
|
if self.cancel_event.is_set(): return
|
|
wordpress_folder, _ = self.unzip_wordpress(wordpress_zip, temp_folder)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_DOWNLOADING_WP, success=wordpress_folder is not None))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
all_plugins_downloaded = True
|
|
plugin_files = []
|
|
for plugin in self.selected_plugins:
|
|
if self.cancel_event.is_set(): return
|
|
file_path, _ = self.download_plugin(plugin, temp_folder)
|
|
if file_path is None:
|
|
all_plugins_downloaded = False
|
|
else:
|
|
plugin_files.append(file_path)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_DOWNLOADING_PLUGINS, success=all_plugins_downloaded))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
all_themes_downloaded = True
|
|
theme_files = []
|
|
for theme in self.selected_themes:
|
|
if self.cancel_event.is_set(): return
|
|
file_path, _ = self.download_theme(theme, temp_folder)
|
|
if file_path is None:
|
|
all_themes_downloaded = False
|
|
else:
|
|
theme_files.append(file_path)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_DOWNLOADING_THEMES, success=all_themes_downloaded))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
successfully_downloaded_plugin_zips = [pf for pf in plugin_files if pf is not None and os.path.exists(pf)]
|
|
successfully_downloaded_theme_zips = [tf for tf in theme_files if tf is not None and os.path.exists(tf)]
|
|
|
|
if self.cancel_event.is_set(): return
|
|
messages = self.install_plugins_and_themes(successfully_downloaded_plugin_zips, successfully_downloaded_theme_zips, wordpress_folder)
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_CREATING_ZIP, success=True))
|
|
if self.cancel_event.is_set(): return
|
|
|
|
with open("connection_data.json", "r") as f:
|
|
connection_data = json.load(f)
|
|
if self.cancel_event.is_set(): return
|
|
|
|
if "password" in connection_data:
|
|
encoded_password = connection_data["password"]
|
|
decoded_password_bytes = base64.b64decode(encoded_password.encode('utf-8'))
|
|
connection_data["password"] = decoded_password_bytes.decode('utf-8')
|
|
|
|
server = connection_data.get("server")
|
|
username = connection_data.get("username")
|
|
password = connection_data.get("password")
|
|
port = connection_data.get("port")
|
|
connection_type = connection_data.get("connection_type")
|
|
if self.cancel_event.is_set(): return
|
|
|
|
upload_successful = False
|
|
if connection_type == "FTP":
|
|
upload_successful = self.upload_via_ftp(server, username, password, port, wordpress_folder, self.cancel_event)
|
|
else:
|
|
upload_successful = self.upload_via_sftp(server, username, password, port, wordpress_folder, self.cancel_event)
|
|
|
|
if self.cancel_event.is_set(): return # Check immediately after upload methods which might return due to cancellation
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_UPLOADING_FTP, success=upload_successful))
|
|
if not upload_successful and not self.cancel_event.is_set(): # If upload failed for non-cancel reason
|
|
raise Exception("File upload failed.")
|
|
|
|
|
|
if self.cancel_event.is_set(): return
|
|
try:
|
|
if wordpress_zip and os.path.exists(wordpress_zip):
|
|
os.remove(wordpress_zip)
|
|
if temp_folder and os.path.exists(temp_folder): # temp_folder is self.temp_folder_path
|
|
shutil.rmtree(temp_folder)
|
|
self.temp_folder_path = None # Reset after successful deletion
|
|
except Exception as e:
|
|
print(f"Error during successful cleanup: {e}")
|
|
|
|
if self.cancel_event.is_set(): return
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_FINISHING_UP, success=True))
|
|
self.next_button.config(state=tk.NORMAL)
|
|
self.prev_button.config(state=tk.NORMAL)
|
|
self.cancel_button.config(state=tk.DISABLED) # Installation complete
|
|
self.show_success_message()
|
|
|
|
if self.cancel_event.is_set(): return # Final check before sensitive file cleanup
|
|
|
|
print("Performing cleanup of sensitive files after successful installation...")
|
|
try:
|
|
if os.path.exists("selections.json"):
|
|
os.remove("selections.json")
|
|
print("Deleted selections.json on success.")
|
|
except Exception as e_sel:
|
|
print(f"Error deleting selections.json on success: {e_sel}")
|
|
|
|
try:
|
|
if os.path.exists("connection_data.json"):
|
|
os.remove("connection_data.json")
|
|
print("Deleted connection_data.json on success.")
|
|
except Exception as e_conn:
|
|
print(f"Error deleting connection_data.json on success: {e_conn}")
|
|
|
|
except Exception as e:
|
|
if not self.cancel_event.is_set(): # Only process as error if not cancelled
|
|
print(f"An error occurred during installation: {e}")
|
|
for key, var in self.status_vars.items():
|
|
if not var.get().startswith("✅"):
|
|
self.controller.after(0, lambda k=key: self.update_checklist_item(k, success=False))
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_FINISHING_UP, success=False))
|
|
# UI cleanup for error state
|
|
self.prev_button.config(state=tk.NORMAL)
|
|
self.cancel_button.config(state=tk.DISABLED)
|
|
|
|
|
|
self.installation_thread = threading.Thread(target=run_installation)
|
|
self.installation_thread.start()
|
|
|
|
def update_checklist_item(self, item_key, success):
|
|
if item_key in self.status_vars:
|
|
current_text = item_key # Get the base text without the icon
|
|
if success:
|
|
self.status_vars[item_key].set(f"✅ {current_text}")
|
|
else:
|
|
self.status_vars[item_key].set(f"⚠️ {current_text} (Failed)")
|
|
|
|
def stop(self):
|
|
if self.installation_thread and self.installation_thread.is_alive():
|
|
self.installation_thread.join(timeout=1) # Attempt to stop the thread
|
|
|
|
def create_temp_folder(self):
|
|
temp_dir = tempfile.mkdtemp()
|
|
return temp_dir, f"Temporary folder created at: {temp_dir}"
|
|
|
|
def download_wordpress(self, temp_folder):
|
|
url = "https://wordpress.org/latest.zip"
|
|
file_path = os.path.join(temp_folder, "wordpress.zip")
|
|
|
|
response = requests.get(url, stream=True)
|
|
|
|
if response.status_code == 200:
|
|
with open(file_path, "wb") as file:
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
file.write(chunk)
|
|
return file_path, f"WordPress downloaded successfully to {file_path}"
|
|
else:
|
|
return None, "Failed to download WordPress."
|
|
|
|
def unzip_wordpress(self, zip_path, temp_folder):
|
|
if zip_path and os.path.exists(zip_path):
|
|
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
|
|
zip_ref.extractall(temp_folder)
|
|
extracted_path = os.path.join(temp_folder, "wordpress")
|
|
if os.path.exists(extracted_path):
|
|
return extracted_path, f"WordPress extracted to {extracted_path}"
|
|
|
|
return None, "WordPress zip file not found or extraction failed."
|
|
|
|
def extract_zip(self, file_path, destination_folder):
|
|
if file_path and os.path.exists(file_path):
|
|
with zipfile.ZipFile(file_path, 'r') as zip_ref:
|
|
zip_ref.extractall(destination_folder)
|
|
return f"Extracted {file_path} to {destination_folder}"
|
|
return f"Extraction failed for {file_path}"
|
|
|
|
def get_plugin_download_url(self, plugin_name):
|
|
plugin_slug = PLUGIN_SLUGS.get(plugin_name, plugin_name.lower().replace(" ", "-")) # Use correct slug
|
|
api_url = f"https://api.wordpress.org/plugins/info/1.2/?action=plugin_information&slug={plugin_slug}"
|
|
response = requests.get(api_url)
|
|
|
|
print(f"🔎 Plugin API URL: {api_url}") # Debugging
|
|
print(f"🔎 API Response Status: {response.status_code}")
|
|
print(f"🔎 API Response Text: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
data = response.json()
|
|
if "download_link" in data:
|
|
return data["download_link"]
|
|
else:
|
|
print(f"❌ No download link found in API response for {plugin_name}: {data}")
|
|
return None
|
|
except json.JSONDecodeError:
|
|
print(f"❌ JSON decoding error for {plugin_name}. Response: {response.text}")
|
|
return None
|
|
else:
|
|
print(f"❌ API request failed for {plugin_name}. Status code: {response.status_code}")
|
|
return None
|
|
|
|
def get_theme_download_url(self, theme_name):
|
|
theme_slug = THEME_SLUGS.get(theme_name, theme_name.lower().replace(" ", "-")) # Use correct slug
|
|
api_url = f"https://api.wordpress.org/themes/info/1.2/?action=theme_information&slug={theme_slug}"
|
|
response = requests.get(api_url)
|
|
|
|
print(f"🔎 Theme API URL: {api_url}") # Debugging
|
|
print(f"🔎 API Response Status: {response.status_code}")
|
|
print(f"🔎 API Response Text: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
data = response.json()
|
|
if "download_link" in data:
|
|
return data["download_link"]
|
|
else:
|
|
print(f"❌ No download link found in API response for {theme_name}: {data}")
|
|
return None
|
|
except json.JSONDecodeError:
|
|
print(f"❌ JSON decoding error for {theme_name}. Response: {response.text}")
|
|
return None
|
|
else:
|
|
print(f"❌ API request failed for {theme_name}. Status code: {response.status_code}")
|
|
return None
|
|
|
|
def download_plugin(self, plugin_slug, temp_folder):
|
|
plugin_url = self.get_plugin_download_url(plugin_slug)
|
|
if not plugin_url:
|
|
return None, f"Failed to retrieve download URL for {plugin_slug}"
|
|
|
|
file_path = os.path.join(temp_folder, f"{plugin_slug}.zip")
|
|
response = requests.get(plugin_url, stream=True)
|
|
|
|
if response.status_code == 200:
|
|
with open(file_path, "wb") as file:
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
file.write(chunk)
|
|
if os.path.exists(file_path) and os.path.getsize(file_path) > 0: # ✅ Ensure file exists before returning success
|
|
return file_path, f"{plugin_slug} downloaded successfully to {file_path}"
|
|
else:
|
|
return None, f"❌ Download failed: {plugin_slug}.zip is empty!"
|
|
else:
|
|
return None, f"❌ Failed to download {plugin_slug}. Status code: {response.status_code}"
|
|
|
|
def download_theme(self, theme_slug, temp_folder):
|
|
theme_url = self.get_theme_download_url(theme_slug)
|
|
if not theme_url:
|
|
return None, f"Failed to retrieve download URL for {theme_slug}"
|
|
|
|
file_path = os.path.join(temp_folder, f"{theme_slug}.zip")
|
|
response = requests.get(theme_url, stream=True)
|
|
|
|
if response.status_code == 200:
|
|
with open(file_path, "wb") as file:
|
|
for chunk in response.iter_content(chunk_size=1024):
|
|
file.write(chunk)
|
|
if os.path.exists(file_path) and os.path.getsize(file_path) > 0: # ✅ Ensure file exists before returning success
|
|
return file_path, f"{theme_slug} downloaded successfully to {file_path}"
|
|
else:
|
|
return None, f"❌ Download failed: {theme_slug}.zip is empty!"
|
|
else:
|
|
return None, f"❌ Failed to download {theme_slug}. Status code: {response.status_code}"
|
|
|
|
def install_plugins_and_themes(self, plugin_files, theme_files, wordpress_folder):
|
|
plugins_folder = os.path.join(wordpress_folder, "wp-content", "plugins")
|
|
themes_folder = os.path.join(wordpress_folder, "wp-content", "themes")
|
|
os.makedirs(plugins_folder, exist_ok=True)
|
|
os.makedirs(themes_folder, exist_ok=True)
|
|
|
|
messages = []
|
|
for plugin in plugin_files:
|
|
if plugin:
|
|
messages.append(self.extract_zip(plugin, plugins_folder))
|
|
|
|
for theme in theme_files:
|
|
if theme:
|
|
messages.append(self.extract_zip(theme, themes_folder))
|
|
|
|
messages.append("All plugins and themes installed successfully.")
|
|
return messages
|
|
|
|
def upload_via_ftp(self, server, username, password, port, wordpress_folder, cancel_event):
|
|
ftp = None
|
|
try:
|
|
ftp = FTP()
|
|
# Add cancel check before potentially long connection attempt
|
|
if cancel_event.is_set(): return False
|
|
ftp.connect(server, int(port)) # This can block, consider timeout for ftp.connect if possible
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
ftp.login(username, password)
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
|
|
def ftp_mkdirs(ftp_obj, path):
|
|
dirs = path.split("/")
|
|
current_path = ""
|
|
for dir_item in dirs:
|
|
if cancel_event.is_set(): return False # Check inside loop
|
|
if dir_item:
|
|
current_path += f"/{dir_item}"
|
|
try:
|
|
ftp_obj.cwd(current_path)
|
|
except Exception:
|
|
try:
|
|
ftp_obj.mkd(current_path)
|
|
ftp_obj.cwd(current_path)
|
|
except Exception as e:
|
|
print(f"FTP: Failed to create directory {current_path}: {e}")
|
|
if ftp_obj: ftp_obj.quit()
|
|
return False
|
|
return True
|
|
|
|
total_files = sum([len(files) for _, _, files in os.walk(wordpress_folder)])
|
|
uploaded_files = 0
|
|
|
|
for root, dirs, files in os.walk(wordpress_folder):
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
relative_path = os.path.relpath(root, wordpress_folder).replace("\\", "/")
|
|
if relative_path != ".":
|
|
if not ftp_mkdirs(ftp, relative_path): # This now checks cancel_event
|
|
if cancel_event.is_set(): ftp.quit(); return False # ftp_mkdirs might return False due to cancellation
|
|
print(f"FTP: Skipping folder due to directory creation failure: {relative_path}")
|
|
continue
|
|
|
|
for file_item in files:
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
file_path = os.path.join(root, file_item)
|
|
remote_path = f"{relative_path}/{file_item}".replace("\\", "/")
|
|
if remote_path.startswith("/."): remote_path = remote_path[1:]
|
|
|
|
if os.path.exists(file_path):
|
|
try:
|
|
print(f"FTP Uploading: {file_path} -> {remote_path}")
|
|
with open(file_path, "rb") as f:
|
|
ftp.storbinary(f"STOR {remote_path}", f) # This can block
|
|
if cancel_event.is_set(): ftp.quit(); return False # Check after blocking operation
|
|
uploaded_files += 1
|
|
self.controller.after(0, lambda v=(uploaded_files / total_files) * 100: self.progress_bar.config(value=v))
|
|
except Exception as e:
|
|
if cancel_event.is_set(): ftp.quit(); return False
|
|
print(f"FTP: Failed to upload {file_path} -> {remote_path}: {e}")
|
|
else:
|
|
print(f"FTP: Skipping missing file: {file_path}")
|
|
ftp.quit()
|
|
return True
|
|
except Exception as e:
|
|
if ftp: ftp.quit()
|
|
if cancel_event.is_set(): return False # Check if exception was due to cancellation signal
|
|
print(f"❌ An error occurred during FTP upload: {e}")
|
|
return False
|
|
|
|
def upload_via_sftp(self, server, username, password, port, wordpress_folder, cancel_event):
|
|
transport = None
|
|
sftp = None
|
|
try:
|
|
if cancel_event.is_set(): return False
|
|
transport = paramiko.Transport((server, int(port)))
|
|
# transport.connect can block, consider how to make it interruptible if too long.
|
|
# For now, check event before and after.
|
|
if cancel_event.is_set(): return False
|
|
transport.connect(username=username, password=password)
|
|
if cancel_event.is_set(): transport.close(); return False
|
|
sftp = paramiko.SFTPClient.from_transport(transport)
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
|
|
def sftp_mkdirs(sftp_obj, path):
|
|
dirs = path.split("/")
|
|
current_path = ""
|
|
for dir_item in dirs:
|
|
if cancel_event.is_set(): return False
|
|
if dir_item:
|
|
current_path += f"/{dir_item}"
|
|
try:
|
|
sftp_obj.stat(current_path)
|
|
except FileNotFoundError:
|
|
try:
|
|
sftp_obj.mkdir(current_path)
|
|
except Exception as e:
|
|
print(f"SFTP: Failed to create directory {current_path}: {e}")
|
|
if sftp_obj: sftp_obj.close()
|
|
if transport: transport.close()
|
|
return False
|
|
return True
|
|
|
|
total_files = sum([len(files) for _, _, files in os.walk(wordpress_folder)])
|
|
uploaded_files = 0
|
|
|
|
for root, dirs, files in os.walk(wordpress_folder):
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
relative_path = os.path.relpath(root, wordpress_folder).replace("\\", "/")
|
|
if relative_path != ".":
|
|
if not sftp_mkdirs(sftp, relative_path): # This now checks cancel_event
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
print(f"SFTP: Skipping folder due to directory creation failure: {relative_path}")
|
|
continue
|
|
|
|
for file_item in files:
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
file_path = os.path.join(root, file_item)
|
|
remote_path = os.path.join(relative_path, file_item).replace("\\", "/")
|
|
if remote_path.startswith("./"): remote_path = remote_path[2:]
|
|
if remote_path.startswith("/"): remote_path = remote_path[1:]
|
|
|
|
if os.path.exists(file_path):
|
|
try:
|
|
print(f"SFTP Uploading: {file_path} -> {remote_path}")
|
|
sftp.put(file_path, remote_path) # This can block
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
uploaded_files += 1
|
|
self.controller.after(0, lambda v=(uploaded_files / total_files) * 100: self.progress_bar.config(value=v))
|
|
except Exception as e:
|
|
if cancel_event.is_set(): sftp.close(); transport.close(); return False
|
|
print(f"SFTP: Failed to upload {file_path} -> {remote_path}: {e}")
|
|
else:
|
|
print(f"SFTP: Skipping missing file: {file_path}")
|
|
sftp.close()
|
|
transport.close()
|
|
return True
|
|
except Exception as e:
|
|
if sftp: sftp.close()
|
|
if transport: transport.close()
|
|
if cancel_event.is_set(): return False
|
|
print(f"❌ An error occurred during SFTP upload: {e}")
|
|
return False
|
|
|
|
def show_success_message(self):
|
|
success_window = tk.Toplevel(self)
|
|
success_window.title("Success")
|
|
success_window.geometry("300x150")
|
|
success_window.configure(bg="#f4f4f4")
|
|
|
|
success_label = tk.Label(success_window, text="Congratulations! Installation succeeded.", font=("Arial", 12), bg="#f4f4f4")
|
|
success_label.pack(pady=20)
|
|
|
|
ok_button = ttk.Button(success_window, text="OK", command=success_window.destroy, style="Bold.TButton")
|
|
ok_button.pack(pady=10)
|
|
|
|
winsound.PlaySound("SystemAsterisk", winsound.SND_ALIAS)
|
|
|
|
def update_selections(self, selected_plugins, selected_themes):
|
|
self.selected_plugins = selected_plugins
|
|
self.selected_themes = selected_themes
|
|
# self.populate_install_list()
|
|
|
|
def request_cancel_confirmation(self):
|
|
# Play sound effect (same as success for now, as per user request)
|
|
winsound.PlaySound("SystemAsterisk", winsound.SND_ALIAS | winsound.SND_ASYNC) # SND_ASYNC to avoid blocking
|
|
|
|
response = messagebox.askyesno(
|
|
title="Confirm Cancellation",
|
|
message="This will abort the installation process completely. If you wanna use the tool again, you will have to restart from scratch. Is that okay?",
|
|
parent=self # Ensure dialog is on top of the current window
|
|
)
|
|
|
|
if response: # True if "Yes" is clicked
|
|
self.initiate_cancellation_process()
|
|
else:
|
|
# User clicked "No", do nothing and let installation continue
|
|
print("User chose not to cancel.")
|
|
pass
|
|
|
|
def initiate_cancellation_process(self):
|
|
print("Initiating cancellation process...")
|
|
# This method will be fleshed out in the next plan step to:
|
|
print("Initiating cancellation process...")
|
|
self.cancel_event.set() # Signal the thread to stop
|
|
|
|
if self.installation_thread and self.installation_thread.is_alive():
|
|
print("Waiting for installation thread to acknowledge cancellation...")
|
|
self.installation_thread.join(timeout=10) # Wait for thread up to 10s. Adjust timeout as needed.
|
|
if self.installation_thread.is_alive():
|
|
print("Warning: Installation thread did not terminate gracefully after cancel signal.")
|
|
|
|
# Perform cleanup
|
|
print("Performing cleanup after cancellation...")
|
|
try:
|
|
if os.path.exists("selections.json"):
|
|
os.remove("selections.json")
|
|
print("Deleted selections.json")
|
|
except Exception as e:
|
|
print(f"Error deleting selections.json: {e}")
|
|
|
|
try:
|
|
if os.path.exists("connection_data.json"):
|
|
os.remove("connection_data.json")
|
|
print("Deleted connection_data.json")
|
|
except Exception as e:
|
|
print(f"Error deleting connection_data.json: {e}")
|
|
|
|
try:
|
|
if self.temp_folder_path and os.path.exists(self.temp_folder_path):
|
|
shutil.rmtree(self.temp_folder_path)
|
|
print(f"Deleted temporary folder: {self.temp_folder_path}")
|
|
self.temp_folder_path = None # Reset path
|
|
except Exception as e:
|
|
print(f"Error deleting temporary folder {self.temp_folder_path}: {e}")
|
|
|
|
# Update UI
|
|
if hasattr(self, 'cancel_button'):
|
|
self.cancel_button.config(state=tk.DISABLED)
|
|
if hasattr(self, 'prev_button'):
|
|
self.prev_button.config(state=tk.NORMAL) # Allow going back
|
|
if hasattr(self, 'next_button'):
|
|
self.next_button.config(state=tk.DISABLED) # Cannot proceed
|
|
|
|
# Update checklist: Mark current or 'Finishing up' as cancelled
|
|
self.controller.after(0, lambda: self.update_checklist_item(self.STATUS_FINISHING_UP, success=False))
|
|
if self.STATUS_FINISHING_UP in self.status_vars:
|
|
self.controller.after(0, lambda: self.status_vars[self.STATUS_FINISHING_UP].set(f"⚠️ {self.STATUS_FINISHING_UP} (Cancelled)"))
|
|
|
|
# Reset progress bar and labels that appear during installation
|
|
if hasattr(self, 'progress_label') and self.progress_label.winfo_exists():
|
|
self.progress_label.pack_forget()
|
|
if hasattr(self, 'progress_bar') and self.progress_bar.winfo_exists():
|
|
self.progress_bar.pack_forget()
|
|
if hasattr(self, 'confirm_label') and hasattr(self.confirm_label, 'winfo_exists') and self.confirm_label.winfo_exists():
|
|
self.confirm_label.pack_forget()
|
|
|
|
print("Cancellation process complete.")
|
|
|
|
def request_cancel_confirmation_from_x(self):
|
|
# This method is called when the main window's 'X' is clicked during installation.
|
|
winsound.PlaySound("SystemAsterisk", winsound.SND_ALIAS | winsound.SND_ASYNC)
|
|
|
|
response = messagebox.askyesno(
|
|
title="Confirm Cancellation",
|
|
message="This will abort the installation process completely. If you wanna use the tool again, you will have to restart from scratch. Is that okay?",
|
|
parent=self
|
|
)
|
|
|
|
if response: # True if "Yes" is clicked
|
|
self.initiate_cancellation_process() # This handles event setting, thread join, cleanup
|
|
# After cancellation process is done (including thread join), destroy the main app window.
|
|
self.controller.destroy()
|
|
return True # Indicates that "Yes, Cancel" was chosen and processed
|
|
else:
|
|
# User clicked "No", do nothing to the installation, window should not close.
|
|
print("User chose not to cancel via X button.")
|
|
return False # Indicates that "No, Continue" was chosen |